Output

定义所有的数据输出抽象(Target),用于Pipeline.write()方法

class bigflow.output.FileBase(path, **options)

基类:object

用于Pipeline.write()方法读取文件的基类

参数:path (str) — 写文件的path,必须为str类型
  1. >>> # 对数据按照tab分割的第一个元素进行hash,将结果分割成2000个文件,并且保证每个文件内有序。
  2. >>> # 代码示例:
  3. >>> pipeline.write(
  4. >>> data.map(lambda x: x),
  5. >>> output.TextFile("your output dir")
  6. >>> .sort()
  7. >>> .partition(n=2000, partition_fn=lambda x, n: hash(x.split("\t", 1)[0]) % n)
  8. >>> )
  9. >>>
  • partition(n=None, partition_fn=None)

    对输出结果进行分组

    参数:
    • n (int) — 输出结果的分组个数,具体表现为产生n个输出文件,文件内容为各组数据
    • partition_fn (callable) — 用于指定分组方式的函数
    返回:

    返回self

    返回类型:

    FileBase

  • sort(reverse=False)

    根据数据实际值对数据进行排序(默认为升序)

    参数:reverse (bool) — 是否降序排序
    返回:返回self
    返回类型:FileBase
  • sort_by(key_read_fn=None, reverse=False)

    通过key_read_fn获取key,并根据key对数据进行排序(默认为升序)

    参数:
    • key_reader_fn (callable) — 用户获取key的函数
    • reverse (bool) — 是否降序排序
    返回:

    返回self

    返回类型:

    FileBase

class bigflow.output.SchemaTextFile(path, **options)

基类:bigflow.output.TextFile

读取文本文件生成支持字段操作的SchemaPCollection

参数:
  • path (str) — 写文件的path,必须为str类型
  • **options — Arbitrary keyword arguments, 其中关键参数, 若SchemaPCollection的元素是dict, 必须指定columns(list)表示输出的字段名, 若SchemaPCollection的元素是tuple, 可以直接输出所有数据 separator(str)表示每行数据字段分隔符,默认分隔符是Tab(“ “)

Example

  1. >>> from bigflow import schema
  2. >>> tps = _pipeline.parallelize([('XiaoA', 20), ('XiaoB', 21)])
  3. >>> dicts = tps.map(lambda (name, age): {'name': name, 'age': age})
  4. >>> dicts = dicts.map(lambda _: _, serde=schema.of(['name', 'age'])) # 下一个版本中这行可以省去
  5. >>> _pipeline.write(dicts, output.SchemaTextFile('./output', columns = ['name', 'age']))
  6. >>> _pipeline.run()
  7. >>> print open('./output/part-00000').read()
  8. XiaoA 20
  9. XiaoB 21
  • transform_to_node(ptype)

    内部接口

class bigflow.output.SequenceFile(path, **options)

基类:bigflow.output.FileBase

输出到SequenceFile文件的Target,SequenceFile的(Key, Value)将被写为BytesWritable,用户使用as_type()函数自行将数据序列化

参数:
  • path (str) — 写文件的path,必须为str类型
  • **options — 其中关键参数有: overwrite: 如果目标位置已经存在,是否进行覆盖写操作。默认为True。 async_mode: 是否使用异步写。默认为True。 key_serde: key如何被序列化为字符串。 value_serde: value如果被序列化为字符串。 需要注意, key_serde/value_serde如果设置,则数据必须是一个两个元素的tuple。 如果不设置,则认为全部的数据使用默认序列化器写到sequence file的value中,key为空。
  • as_type(kv_serializer)

    通过kv_serializer将数据序列化为(Key, Value)

    参数:kv_serializer (callable) — 序列化函数
    返回:返回self
    返回类型:SequenceFile

    注解

    kv_deserializer的期望签名为:

    kv_deserializer(object) => (str, str)

  • transform_to_node(ptype)

class bigflow.output.TextFile(path, **options)

基类:bigflow.output.FileBase

输出到文本文件的Target

  • Args:

    path (str): 写文件的path,必须为str类型 **options: 其中关键参数有:

    overwrite: 如果目标位置已经存在,是否进行覆盖写操作。默认为True。 async_mode: 是否使用异步写。默认为True。

    record_delimiter: 输出文本的分隔符,默认’

  • ‘;

    若指定为None,则将所有数据按字节流连续输出

  • compression_types = {‘gzip’: 1}

  • transform_to_node(ptype)

    内部接口

  • with_compression(compression_type)

    对输出文件进行压缩

    参数:compression_type (str) — 压缩格式,目前仅支持”gzip”
    返回:返回self
    返回类型:TextFile

class bigflow.output.UserOutputBase

基类:object

用户Output基类

  • close()

    用户可以重写该方法。

  • get_commiter()

    用户可以重写该方法。 返回一个commiter, 默认表示不需要commit阶段 commiter应该是一个无参函数。

  • open(partition)

    用户可以重写该方法。 传入参数partition表示这是第几个partition

  • partition_fn()

    用户可以重写该方法。 返回一个partition fn。 partition_fn原型应为:(data, total_partition) => partition 返回None则表示不太关心如何partition。

    如果partition_number

  • partition_number()

    用户可以重写该方法。 返回一个int型的数,表示总共要把数据partition成多少份。

  • pre_process(pval)

    用户可以重写该方法。 进行前处理,默认不处理

  • sink(data)

    用户可以重写该方法。

    该方法对每条数据调用一次

bigflow.output.user_define_format(user_output_base)

内部函数