Output
定义所有的数据输出抽象(Target),用于Pipeline.write()方法
class bigflow.output.FileBase
(path, **options)
基类:object
用于Pipeline.write()方法读取文件的基类
参数: | path (str) — 写文件的path,必须为str类型 |
---|
>>> # 对数据按照tab分割的第一个元素进行hash,将结果分割成2000个文件,并且保证每个文件内有序。
>>> # 代码示例:
>>> pipeline.write(
>>> data.map(lambda x: x),
>>> output.TextFile("your output dir")
>>> .sort()
>>> .partition(n=2000, partition_fn=lambda x, n: hash(x.split("\t", 1)[0]) % n)
>>> )
>>>
partition
(n=None, partition_fn=None)对输出结果进行分组
参数: - n (int) — 输出结果的分组个数,具体表现为产生n个输出文件,文件内容为各组数据
- partition_fn (callable) — 用于指定分组方式的函数
返回: 返回self
返回类型: -
根据数据实际值对数据进行排序(默认为升序)
参数: reverse (bool) — 是否降序排序 返回: 返回self 返回类型: FileBase sort_by
(key_read_fn=None, reverse=False)通过key_read_fn获取key,并根据key对数据进行排序(默认为升序)
参数: - key_reader_fn (callable) — 用户获取key的函数
- reverse (bool) — 是否降序排序
返回: 返回self
返回类型:
class bigflow.output.SchemaTextFile
(path, **options)
读取文本文件生成支持字段操作的SchemaPCollection
参数: |
|
---|
Example
>>> from bigflow import schema
>>> tps = _pipeline.parallelize([('XiaoA', 20), ('XiaoB', 21)])
>>> dicts = tps.map(lambda (name, age): {'name': name, 'age': age})
>>> dicts = dicts.map(lambda _: _, serde=schema.of(['name', 'age'])) # 下一个版本中这行可以省去
>>> _pipeline.write(dicts, output.SchemaTextFile('./output', columns = ['name', 'age']))
>>> _pipeline.run()
>>> print open('./output/part-00000').read()
XiaoA 20
XiaoB 21
class bigflow.output.SequenceFile
(path, **options)
输出到SequenceFile文件的Target,SequenceFile的(Key, Value)将被写为BytesWritable,用户使用as_type()函数自行将数据序列化
参数: |
|
---|
-
通过kv_serializer将数据序列化为(Key, Value)
参数: kv_serializer (callable) — 序列化函数 返回: 返回self 返回类型: SequenceFile 注解
kv_deserializer的期望签名为:
kv_deserializer(object) => (str, str)
class bigflow.output.TextFile
(path, **options)
输出到文本文件的Target
Args:
path (str): 写文件的path,必须为str类型 **options: 其中关键参数有:
overwrite: 如果目标位置已经存在,是否进行覆盖写操作。默认为True。 async_mode: 是否使用异步写。默认为True。
record_delimiter: 输出文本的分隔符,默认’
‘;
若指定为None,则将所有数据按字节流连续输出
-
内部接口
with_compression
(compression_type)对输出文件进行压缩
参数: compression_type (str) — 压缩格式,目前仅支持”gzip” 返回: 返回self 返回类型: TextFile
class bigflow.output.UserOutputBase
基类:object
用户Output基类
-
用户可以重写该方法。
-
用户可以重写该方法。 返回一个commiter, 默认表示不需要commit阶段 commiter应该是一个无参函数。
-
用户可以重写该方法。 传入参数partition表示这是第几个partition
-
用户可以重写该方法。 返回一个partition fn。 partition_fn原型应为:(data, total_partition) => partition 返回None则表示不太关心如何partition。
如果partition_number
-
用户可以重写该方法。 返回一个int型的数,表示总共要把数据partition成多少份。
-
用户可以重写该方法。 进行前处理,默认不处理
-
用户可以重写该方法。
该方法对每条数据调用一次
bigflow.output.user_define_format
(user_output_base)
内部函数