示例

示例1 – 单Stage(‘map-only’)作业

  1. from bigflow import base
  2. def foo(line):
  3. return "".join(set(line))
  4. p = base.Pipeline.create('spark', spark_conf=configs, runtime="JNI")
  5. ret = p \
  6. .read(input.TextFile('/test.in')) \
  7. .map(foo)
  8. p.write(ret, output.TextFile('/test.out'))
  9. p.run()

生成的计划示意图如下:

../_images/plan1.png

示例2 – WordCount

  1. words = p \
  2. .read(input.TextFile("/home/wangcong09/modif.small")) \
  3. .flat_map(lambda _: _.split())
  4. cnt1 = words \
  5. .apply(transforms.count)
  6. cnt2 = words \
  7. .group_by(lambda _: _) \
  8. .apply_values(transforms.count) \
  9. .flatten()
  10. p.write(cnt1, output.TextFile('/test.out1'))
  11. p.write(cnt2, output.TextFile('/test.out2'))

生成的计划示意图如下:

../_images/plan2.png