Input

定义所有的数据源(Source),用于Pipeline.read()方法

实现一个Source需要实现四个接口:

  1. 有一个input_format属性,是一个flume::Loader
  2. 有一个objector属性,是一个Objector
  3. 有一个uris属性,返回一个uri列表
  4. 有一个transform_from_node方法,把一个Node变换成一个PType
  5. 有一个get_size方法,计算本文件读出数据量有多少。可以返回-1(?)表示未知大小。

class bigflow.input.FileBase(path, options*)

基类:object

用于Pipeline.read()方法读取文件的基类

参数:*path — 读取文件的path,必须均为str或unicode类型
  • get_size(pipeline)

    获得所有读取文件在文件系统中的大小

    返回:文件大小,以字节为单位
    返回类型:int

class bigflow.input.SchemaTextFile(path, options*)

基类:bigflow.input.TextFile

读取文本文件生成支持字段操作的SchemaPCollection

参数:
  • path — 读取文件的path, 必须均为str类型
  • *options

    Arbitrary keyword arguments, 其中关键参数, (1). 若columns(list), 每一项为字段名,则生成SchemaPCollection的元素是dict,dict中的value类型都是str;

    (2). 若columns(list), 每一项为(字段名,类型),则生成SchemaPCollection的元素是dict, dict中的值类型是字段对应的类型;

    (3). 若columns(int),表示分割的列数,则生成SchemaPCollection的元素是tuple,tuple中的每个元素的类型都是str, separator(str)表示每行数据字段分隔符,默认分隔符是Tab(“ “);

    (4). 若columns(list), 每一项为python基本类型(int, str, float),则生成SchemaPcollection的元素是tuple, 每个tuple中的元素的类型和columns中的类型一一对应;separator(str)表示每行数据字段分隔符,默认分隔符是Tab(“ “);

    ignore_overflow(bool)表示如果文件有多余的列,是否可以忽略掉。默认为False,即出现多余的列时即会报错。

    ignore_illegal_line(bool): 表示当文件某一行的列数少于提供的字段数时,是否可以忽略该文件行。若不设置,则抛出异常

Example

  1. >>> open("input-data", "w").write("XiaoA\t20\nXiaoB\t21\n")
  2. >>> persons = _pipeline.read(input.SchemaTextFile("input-data", columns = ['name', 'age']))
  3. >>> persons.get()
  4. [{'age': '20', 'name': 'XiaoA'}, {'age': '21', 'name': 'XiaoB'}]
  1. >>> open("input-data", "w").write("XiaoA\t20\nXiaoB\t21\n")
  2. >>> persons = _pipeline.read(input.SchemaTextFile("input-data", columns = [('name', str), ('age', int)]))
  3. >>> persons.get()
  4. [{'age': 20, 'name': 'XiaoA'}, {'age': 21, 'name': 'XiaoB'}]
  1. >>> open("temp_data.txt", "w").write("1\t2.0\tbiflow\n10\t20.10\tinf")
  2. >>> data = p.read(input.SchemaTextFile("./temp_data.txt", columns=3))
  3. >>> data.get()
  4. [('1', '2.0', 'biflow'), ('10', '20.1', 'inf')]
  1. >>> open("temp_data.txt", "w").write("1\t2.0\tbiflow\n10\t20.10\tinf")
  2. >>> data = p.read(input.SchemaTextFile("./temp_data.txt", columns=[int, float, str]))
  3. >>> data.get()
  4. [(1, 2.0, 'biflow'), (10, 20.1, 'inf')]
  • transform_from_node(load_node, pipeline)

    内部接口

class bigflow.input.SequenceFile(path, options*)

基类:bigflow.input.FileBase

表示读取SequenceFile的数据源,SequenceFile的(Key, Value)必须均为BytesWritable,并由用户自行解析

参数:
  • path — 读取文件的path,必须均为str类型
  • *options

    其中关键参数: combine_multi_file: 是否可以合并多个文件到一个mapper中处理。默认为True。 partitioned: 默认为False,如果置为True,则返回的数据集为一个ptable,

    ptable的key是split_info,value这个split上的全部数据所组成的pcollection。

    key_serde: key如何反序列化 value_serde: value如何反序列化 如果未设定key_serde/value_serde,则会忽略掉key,只把value用默认序列化器反序列化并返回。

Example

  1. >>> from bigflow import serde
  2. >>> StrSerde = serde.StrSerde
  3. >>> lines = _pipeline.read(
  4. input.SequenceFile('path', key_serde=StrSerde(), value_serde=StrSerde()))
  5. >>> lines.get()
  6. [("key1", "value1"), ("key2", "value2")]
  1. >>> import mytest_proto_pb2
  2. >>> msg_type = mytest_proto_pb2.MyTestPbType
  3. >>> _pipeline.add_file("mytest_proto_pb2.py", "mytest_proto_pb2.py")
  4. >>> pbs = _pipeline.read(input.SequenceFile('path2', serde=serde.ProtobufSerde(msg_type)))
  5. >>> pbs.get() # 如果未设置key_serde/value_serde,则key会被丢弃。
  6. >>> [<mytest_proto_pb2.MyTestPbType at 0x7fa9e262a870>,
  7. <mytest_proto_pb2.MyTestPbType at 0x7fa9e262a870>]

有时,Pb包在本地没有,例如,py文件在hdfs,则可以使用下边的方法:

  1. >>> _pipeline.add_archive("hdfs:///proto.tar.gz", "proto") #add_archive暂时不支持本地模式
  2. >>> def get_pb_msg_creator(module_name, class_name):
  3. ... import importlib
  4. ... return lambda: importlib.import_module(module_name).__dict__[class_name]()
  5. >>> pbs = _pipeline.read(input.SequenceFile('path2', serde=serde.ProtobufSerde(get_pb_msg_creator("proto.mytest_proto_pb2", "MyTestPbType"))))
  6. >>> pbs.get()
  7. >>> [<mytest_proto_pb2.MyTestPbType at 0x7fa9e262a870>,
  8. <mytest_proto_pb2.MyTestPbType at 0x7fa9e262a870>]

如果需要自定义Serde,参见:bigflow.serde.Serde

  • as_type(kv_deserializer)

    通过kv_deserializer反序列化读取的(Key, Value)

    kv_deserializer的期望签名为:

    kv_deserializer(key: str, value: str) => object

  • transform_from_node(load_node, pipeline)

    内部接口

class bigflow.input.SequenceFileStream(path, options*)

基类:bigflow.input.FileBase

表示读取SequenceFile的无穷数据源,SequenceFile的(Key, Value)必须均为BytesWritable,并由用户自行解析

参数:
  • path — 读取文件的path,必须均为str类型
  • *options

    可选的参数。

    [Hint] max_record_num_per_round: 用于指定每轮订阅的日志条数,默认值为1000

    [Hint] timeout_per_round: 用于指定每轮订阅的超时时间(单位为s),默认为10s

    key_serde: key如何反序列化

    value_serde: value如何反序列化

    如果未设定key_serde/value_serde,则会忽略掉key,只把value用默认序列化器反序列化并返回。

注解

  1. 如果path中含有子目录,则以子目录作为数据源;如果path中没有子目录,则以path作为数据源
  2. 目录中有效的文件名为从0开始的正整数;如果文件不存在,会一直等待该文件
  3. 目录中的文件不允许被修改,添加需要保证原子(可以先写成其他文件名,然后进行mv)
  • as_type(kv_deserializer)

    通过kv_deserializer反序列化读取的(Key, Value)

    kv_deserializer的期望签名为:

    kv_deserializer(key: str, value: str) => object

  • transform_from_node(load_node, pipeline)

    内部方法

class bigflow.input.TextFile(path, options*)

基类:bigflow.input.FileBase

表示读取的文本文件的数据源

参数:*path — 读取文件的path,必须均为str类型

读取文件数据示例::

  1. >>> lines1 = _pipeline.read(input.TextFile('hdfs:///my_hdfs_dir/'))
  2. >>> lines2 = _pipeline.read(input.TextFile('hdfs://host:port/my_hdfs_file'))
  3. >>> lines3 = _pipeline.read(input.TextFile('hdfs:///multi_path1', 'hdfs:///multi_path2'))
  4. >>> lines4 = _pipeline.read(input.TextFile('./local_file_by_rel_path/'))
  5. >>> lines5 = _pipeline.read(input.TextFile('/home/work/local_file_by_abs_path/'))
  6. >>> lines6 = _pipeline.read(input.TextFile(*['hdfs:///multi_path1', 'hdfs:///multi_path2']))
  • **options: 其中关键参数:

    combine_multi_file: 是否可以合并多个文件到一个mapper中处理。默认为True。

    • partitioned: 默认为False,如果置为True,则返回的数据集为一个ptable,

      ptable的key是split_info,value这个split上的全部数据所组成的pcollection。:

    1. >>> f1 = open('data1.txt', 'w')
    2. >>> f1.write('1 2 1')
    3. >>> f1.close()
    4. >>> f2 = open('data2.txt', 'w')
    5. >>> f2.write('1 2 2')
    6. >>> f2.close()
    7. >>> table = _pipeline.read(input.TextFile('./data1.txt', './data2.txt', partitioned = True))
    8. >>> def wordcount(p):
    9. return p.flat_map(lambda line: line.split()) \
    10. .group_by(lambda word: word) \
    11. .apply_values(transforms.count)
    1. >>> table.apply_values(wordcount).get()
    2. {'/home/data1.txt': {'1': 2, '2': 1}, '/home/data2.txt': {'1': 1, '2', 2}}
    3. # 需要注意,MR模式下,key可能不是这个格式的,切分方法也不一定是按照文件来的。
  • transform_from_node(load_node, pipeline)

    内部接口

class bigflow.input.TextFileStream(path, options*)

基类:bigflow.input.FileBase

表示读取的文本文件的无穷数据源。

参数:*path — 读取文件目录的path,必须均为str类型

读取文件数据示例::

  1. >>> lines1 = _pipeline.read(input.TextFileStream('hdfs:///my_hdfs_dir/'))
  2. >>> lines2 = _pipeline.read(input.TextFileStream('hdfs://host:port/my_hdfs_dir/'))
  3. >>> lines3 = _pipeline.read(input.TextFileStream('hdfs:///multi_path1', 'hdfs:///multi_path2'))
  4. >>> lines4 = _pipeline.read(input.TextFileStream('./local_file_by_rel_path/'))
  5. >>> lines5 = _pipeline.read(input.TextFileStream('/home/work/local_file_by_abs_path/'))
  6. >>> lines6 = _pipeline.read(input.TextFileStream(*['hdfs:///multi_path1', 'hdfs:///multi_path2']))
  7. **options: 可选的参数。
  8. [Hint] max_record_num_per_round: 用于指定每轮订阅的日志条数,默认值为1000
  9. [Hint] timeout_per_round: 用于指定每轮订阅的超时时间(单位为s),默认为10s

注解

  1. 如果path中含有子目录,则以子目录作为数据源;如果path中没有子目录,则以path作为数据源
  2. 目录中有效的文件名为从0开始的正整数;如果文件不存在,会一直等待该文件
  3. 目录中的文件不允许被修改,添加需要保证原子(可以先写成其他文件名,然后进行mv)
  • transform_from_node(load_node, pipeline)

    内部接口

class bigflow.input.UserInputBase

基类:object

用户输入抽象基类

用户需要按以下方法重写split/load函数

Eg.

  1. class LocalFileInput(UserInputBase):
  2. def __init__(self, dir):
  3. self._dir = dir
  4. def split(self):
  5. return [os.path.join(self._dir, filename) for filename in os.listdir(self._dir)]
  6. def load(split):
  7. with open(split) as f:
  8. for line in f.readline():
  9. yield line.strip()

用户可以重写post_process以实现一些后处理, post_process有一个传入参数,是一个PTable,这个PTable的key是split string, value是这个split上的数据。 默认post_process方法是`bigflow.transforms.flatten_values`.

  • get_serde()

    User can override this method to set the serde

  • get_size()

    user can override this method to calculate the size of the input data

  • load(split)

    Load data from a split. The return value will be flattened into a PCollection.

  • post_process(ptable)

    User can override post_process method to do some post_process.

  • split()

    splits urls as some splits. User should override this method.

bigflow.input.user_define_format(user_input_base)

return a FileBase object from a UserInputBase