PipelineFactory

定义用于创建 bigflow.pipeline 的工厂类

class bigflow.base.Pipeline

Pipeline是用户一个分布式计算任务的抽象

  • static create(pipeline_type, **job_config)

    根据用户所指定的后端引擎类型以及基本配置构造一个Pipeline实例

    参数:
    • pipeline_type (str) — 指定Pipeline类型,目前支持”local”“hadoop”
    • **job_config

      Pipeline配置

      hadoop模式:

      job_name: 用于指定Hadoop作业名

      tmp_data_path: 用于指定保存运行包等信息的HDFS路径,请确保ugi有操作权限

      hadoop_job_conf: 用于设置Hadoop作业相关配置,优先级关系:pipeline创建时指定的参数 > bigflow自动算出的参数 > hadoop-site.xml里的参数

    • default_concurrency : 默认并发数。
      如果某级reduce数据量过小,bigflow会利用hadoop相关feature在运行时自动调小并发。
    • 通用配置:

      pass_default_encoding_to_remote (bool/None): 是否传递defaultencoding配置值 (默认编码)到远端。

      默认情况下(或配置为None),仅在sys模块被reload过的情况下(通过sys.setdefaultencoding判断), 会将本地的sys.getdefaultencoding()传递到remote端。 设置为True,则强制将本地编码透传至远端。 设置为False,则不透传。

    返回:

    Pipeline实例

    返回类型:

    Pipeline

    创建一个local作业: >>> base.Pipeline.create(‘local’)

    创建一个hadoop作业: >>> base.Pipeline.create(‘hadoop’,

    job_name=”test_app”, tmp_data_path=”hdfs:///app/test/“, hadoop_job_conf={“mapred.job.map.capacity”: “1000”})

class bigflow.base.Options

基类:object

for all options

  • class Explain

    基类:object

    for explain options in base.Pipeline.create

    • DEFAULT: Default setting.

      At most engine (Hadoop, Spark[not support yet]), the explaining file will be shown at the log directory of the 1st task of every vertex.

    ON: Explain file will be shown at all task. This may be very expensive if the explaining file is too big.

    OFF: Do not show any explaining file.

    eg.

    1. base.Pipeline.create('dagmr', explain=base.Options.Explain.ON)
    • DEFAULT = 0

    • OFF = 2

    • ON = 1

class bigflow.base.Transformer

基类:object

Transformser基类

用户在使用 bigflow.transforms.transform(self, data, Transformer, *side_inputs, **options) 时,需要实现一个本类的子类,并重写相关方法,此类相关样例也见前述变换文档页。

  • begin_process(*side_inputs)

    此方法在开始处理数据之前被调用,以通知用户要开始处理数据了。

    用户必须返回一个可迭代的对象,其中值将会被放入结果的PCollection中。

  • end_process(*side_inputs)

    此方法在结束处理数据时被调用,以通知用户要开始处理数据了。

    用户必须返回一个可迭代的对象,其中值将会被放入结果的PCollection中。

  • process(record, *side_inputs)

    此方法在处理数据之时被调用,以通知用户要开始处理数据了。 其中record即为待处理的数据。

    用户必须返回一个可迭代的对象,其中值将会被放入结果的PCollection中。