PCollection

bigflow.pcollection.PCollection 定义

class bigflow.pcollection.PCollection(node, pipeline)

基类:bigflow.ptype.PType

用于表示分布式数据集的 bigflow.ptype.PType

注解

用户不应当直接使用其构造方法

参数:node (Node) — LogicalPlan.Node
  • accumulate(zero, accumulate_fn, side_inputs, options*)

    等同于 bigflow.transforms.accumulate(self, zero, accumulate_fn, *side_inputs, **options)

    参数:
    • zero (value or function) — 初始值,或是一个返回初始值的方法
    • accumulate_fn (function) — 聚合方法
    • side_inputs — 参与运算的SideInputs
    • *options — 可配置选项
    返回:

    聚合结果

    返回类型:

    PObject

    1. >>> from operator import add
    2. >>> p.parallelize([1, 2, 3]).accumulate(0, add).get()
    3. 6
  • agg(io_description, fn, args, kargs*)

    选择一些字段去做一些聚合操作。

    参数:
    • p (pcollection) — 输入数据集,需要是一个每个元素都是dict的pcollection
    • io_description (str) — 格式为: a,b=>c,d,e 即,输入字段=>输出字段
    • fn (callable) — 函数原型为 (input_pcollections) => (output_pcollection_or_pobjects) 即,该函数的输入参数为多个pcollection, 每个pcollection表示数据的一个字段的全部行所拼成的一个pcollection。 该函数的返回值是一些pobject或 pcollection所组成的tuple(如果只有一个元素可以不必返回tuple)。
    返回:

    返回一个每个元素是一个dict的pcollection。 这个pcollection中所有元素输出的几个pcollection进行笛卡尔积并添加字段名后的结果。

    例如::

    1. >>> x = _pipeline.parallelize([{'a' : 1, 'b': 2.0}, {'a': 2, 'b': 3.0}])
    2. >>> print x.apply(fields.agg,
    3. >>> 'a, b => c, d, e',
    4. >>> lambda a, b: (
    5. >>> a.count(),
    6. >>> b.sum(),
    7. >>> a.flat_map(lambda x: xrange(x))
    8. >>> )
    9. >>> ).get()
    10. [{'c': 2, 'd': 5.0, 'e': 0}, {'c': 2, 'd': 5.0, 'e': 0}, {'c': 2, 'd': 5.0, 'e': 1}]
  • aggregate(zero, aggregate_fn, combine_fn, side_inputs, options*)

    等同于 bigflow.transforms.aggregate(self, aggregate_fn, combine_fn, *side_inputs, **options)

    参数:
    • pcollection (PCollection) — 输入PCollection
    • zero (value or function) — 初始值,或是一个返回初始值的方法
    • accumulate_fn (function) — 聚合方法
    • side_inputs — 参与运算的SideInputs
    • *options — 可配置选项
    返回:

    聚合结果

    返回类型:

    PObject

  • as_pobject()

    等同于 to self.first()

    返回:转换结果
    返回类型:PObject
    1. >>> _pipeline.parallelize(["A"]).as_pobject().get()
    2. "A"
  • as_schema(fields)

    根据字段,返回一个SchemaPCollection

    参数:fields

    类型可以是,tuple,list,dict; 当fields是tuple或list时, 会判断每个元素的类型:

    fields中的每个元素是python基本类型或一个serde; 接口将构造TupleSerde设置到PCollection每个元素

    fields中的每个元素是python string,抛出异常

  • 当fields是dict时:
    fields的key标识字段类型,value标识该字段的类型,如 {“name”: str, “age”: int} 当前PCollection中的每个元素必须是dict,dict内的key必须相同。 fields内的key要和PCollection内的key必须相同
  • 返回:表示转化后的PCollection
    返回类型:PCollection

    Examples

    1. >>> data = self._pipeline.parallelize([("xiaoming", "PKU", 20)])
    2. >>> d1 = data.as_schema((str, str, int))
    3. >>> d2 = data.as_schema([str, str, int])
    4. >>> print d1.get()
    5. [('xiaoming', 'PKU', 20)]
    6. >>>
    7. >>> print d2.get()
    8. [('xiaoming', 'PKU', 20)]
    9. >>>
    10. >>> data = self._pipeline.parallelize([{"name": "xiaoming", "school": "PKU", "age": 20}])
    11. >>> d5 = data.as_schema({"name": str, "school": str, "age": int})
    12. >>> print d5.get()
    13. [{'age': 20, 'name': 'xiaoming', 'school': 'PKU'}]
    14. >>>
  • cartesian(other, others, options*)

    与其他的PCollection做笛卡尔积

    参数:
    • other (PCollection) — 其他的PCollection
    • *others — 更多的PCollection
    返回:

    表示结果的PCollection

    返回类型:

    PCollection

    1. >>> _p1 = _pipeline.parallelize([1, 2, 3])
    2. >>> _p2 = _pipeline.parallelize([4, 5])
    3. >>> _p1.cartesian(_p2).get()
    4. [(1, 4), (1, 5), (2, 4), (2, 5), (3, 4), (3, 5)]
  • cogroup(other, *others)

    等同于 bigflow.transforms.cogroup(self, other, *others),

    参数:
    • other (PCollection) — 用于协同分组的PCollection
    • *others — 更多的PCollection
    返回:

    分组结果

    返回类型:

    PTable

  • combine(fn, **options)

    等同于 bigflow.transforms.combine(self, fn)

    参数:
    • fn (callable) — 合并函数
    • **options — 可配置选项
    返回:

    合并结果

    返回类型:

    PObject

    1. >>> _pipeline.parallelize([2, 4, 6, 10]).combine(sum).get()
    2. 22
  • count(**options)

    返回元素的数量,等同于 bigflow.transforms.count(self)

    返回:元素数量
    返回类型:PObject
    1. >>> p.parallelize([1, 2, 3, 4]).count().get()
    2. 4
  • diff(other)

    返回与另一个PCollection中不相同的元素

    参数:other (PCollection) — 另一个PCollection
    返回:表示结果的PCollection
    返回类型:PCollection
    1. >>> a = _pipeline.parallelize([1, 1, 2, 3])
    2. >>> b = _pipeline.parallelize([1, 1, 2, 2])
    3. >>> a.diff(b).get()
    4. [(2, (1, 2)), (3, (1, 0))]
  • distinct()

    元素去重,等同于 bigflow.transforms.distinct(self)

    参数:**options — 可配置选项
    返回:不重复元素,以PCollection给出
    返回类型:PCollection
    1. >>> p.parallelize([2, 2, 1, 9, 3, 3]).distinct().get()
    2. [2, 3, 1, 9]
  • filter(fn, side_inputs, options*)

    过滤元素,等同于 bigflow.transforms.filter(self, fn, *side_inputs, **options),

    参数:
    • fn (function) — 断言函数
    • side_inputs — 参与运算的SideInputs
    • *options — 可配置选项
    返回:

    过滤结果

    返回类型:

    PCollection

    1. >>> p.parallelize([3, 7, 1, 3, 2, 8]).filter(lambda x: x % 2 == 0).get()
    2. [2, 8]
  • first(**options)

    取第一个元素

    返回:表示结果的PObject
    返回类型:PObject
    1. >>> p.parallelize([3, 7, 1, 3, 2, 8]).first.get()
    2. 3
  • flat_map(fn, side_inputs, options*)

    对所有元素进行一对多映射,等同于 bigflow.transforms.flat_map(self, fn, *side_inputs, **options)

    参数:
    • fn (function) — 变换函数
    • side_inputs — 参与运算的SideInputs
    • *options — 可配置选项
    返回:

    变换结果

    返回类型:

    PCollection

    1. >>> _pipeline.parallelize([1, 3, 5, 7]).flat_map(lambda x: [x, x * 2]).get()
    2. [1, 2, 3, 5, 6, 7, 10, 14]
  • foreach(fn, side_inputs, options*)

    等同于 bigflow.transforms.foreach(self, fn, *side_inputs, **options)

    参数:
    • fn (function) — 变换函数
    • *side_inputs — 参与运算的SideInputs
    返回:

    None

  • full_join(other, others, options*)

    与其他PCollection做全连接操作,等同于 bigflow.transforms.full_join(self, other, *others)

    参数:
    • other (PCollection) — 做连接操作的PCollection
    • *others — 更多的PCollection
    返回:

    全连接结果

    返回类型:

    PCollection

    1. >>> x = _pipeline.parallelize([("a", 1)])
    2. >>> y = _pipeline.parallelize([("b", 2)])
    3. >>> x.full_join(y).get()
    4. [("a", (1, None)), ("b", (None, 2))]
  • group_by(key_extractor, value_extractor=None, **options)

    对元素分组,等同于 bigflow.transforms.group_by(self, key_extractor, value_extractor)

    参数:
    • key_extractor (function) — 用于提取key的函数
    • value_extractor (function, optional) — 用于提取value的函数
    • **options — 可配置选项
    返回:

    分组结果

    返回类型:

    PTable

    1. >>> _pcollection = _pipeline.parallelize([("A", 4), ("A", 3), ("B", 2), ("A", 1)])
    2. >>> _grouped = _pcollection.group_by(lambda x: x[0], lambda x: x[1]).get()
    3. >>> _grouped.get()
    4. {"A": [4, 3, 1], "B": [2]}
  • group_by_key(**options)

    group_by 变换类似,但使用默认的key/value提取函数对元素分组

    参数:**options — 可配置参数
    返回:分组结果
    返回类型:PTable
    1. >>> _pipeline.parallelize([("A", 4), ("A", 3), ("B", 2), ("A", 1)]).group_by_key().get()
    2. {"A": [4, 3, 1], "B": [2]}
  • intersection(other, output_duplicated=False)

    返回与另一个PCollection的交集

    参数:other (PCollection) — 另一个PCollection
    返回:表示交集的PCollection
    返回类型:PCollection
    1. >>> a = _pipeline.parallelize([1, 1, 2, 3])
    2. >>> b = _pipeline.parallelize([1, 1, 2, 2, 5])
    3. >>> a.intersection(b).get()
    4. [1, 2]
    5. >>> a.intersection(b, output_duplicated = True).get()
    6. [1, 1, 2]
  • is_empty()

    判断此PCollection是否为空

    返回:表示结果的PObject
    返回类型:PObject
    1. >>> a = _pipeline.parallelize([1, 2, 3, 4])
    2. >>> transforms.is_empty(a).get()
    3. False
    4. >>> b = _pipeline.parallelize([])
    5. >>> transforms.is_empty(b).get()
    6. True
  • join(other, others, options*)

    与其他PCollection做连接操作,等同于 bigflow.transforms.join(self, other, *others)

    参数:
    • other (PCollection) — 做连接操作的PCollection
    • *others — 更多的PCollection
    返回:

    连接结果

    返回类型:

    PCollection

    1. >>> x = _pipeline.parallelize([("a", 1), ("b", 4)])
    2. >>> y = _pipeline.parallelize([("a", 2), ("a", 3)])
    3. >>> x.join(y).get()
    4. [("a", (1, 2)), ("a", (1, 3))]
  • left_join(other, others, options*)

    与其他PCollection做左连接操作,等同于 bigflow.transforms.left_join(self, other, *others)

    参数:
    • other (PCollection) — 做连接操作的PCollection
    • *others — 更多的PCollection
    返回:

    左连接结果

    返回类型:

    PCollection

    1. >>> x = _pipeline.parallelize([("a", 1), ("b", 4)])
    2. >>> y = _pipeline.parallelize([("a", 2)])
    3. >>> x.left_join(y).get()
    4. [("a", (1, 2)), ("b", (4, None))]
  • map(fn, side_inputs, options*)

    对所有元素进行一对一映射变换,等同于 bigflow.transforms.map(self, fn, *side_inputs, **options)

    参数:
    • fn (function) — 变换函数
    • side_inputs — 参与运算的SideInputs
    • *options — 可配置选项
    返回:

    变换结果

    返回类型:

    PCollection

    1. >>> p.parallelize([1, 3, 5, 7]).map(lambda x: x + 1).get()
    2. [2, 4, 6, 8]
  • max(key=None, **options)

    取最大元素,等同于 bigflow.transforms.max(self, key)

    参数:
    • key (function, optional) — 用于提取key的函数,与Python内置max()中的 key 参数相同
    • **options — 可配置选项
    返回:

    包含最大元素的PObject

    返回类型:

    PObject

    1. >>> p.parallelize([3, 7, 1, 3, 2, 8]).max().get()
    2. 8
  • max_elements(n, key=None, **options)

    取前n大元素,等同于 bigflow.transforms.max_elements(self, n, key)

    参数:
    • n (int) — 必须大于0
    • key (function, optional) — 用于提取key的函数,与Python内置max()中的 key 参数相同
    • **options — 可配置选项
    返回:

    包含前n大元素的PCollection,注意对于n=1,这里仍然返回PCollection 而非PObject

    返回类型:

    PCollection

    1. >>> p.parallelize([3, 7, 1, 3, 2, 8]).max_elements(2).get()
    2. [8, 7]
  • min(key=None, **options)

    取最小元素, 等同于 bigflow.transforms.min(self, key)

    参数:
    • key (function, optional) — 用于提取key的函数,与Python内置min()中的 key 参数相同
    • **options — 可配置选项
    返回:

    最小元素

    返回类型:

    PObject

    1. >>> p.parallelize([3, 7, 1, 3, 2, 8]).min().get()
    2. 1
  • min_elements(n, key=None, **options)

    取前n小元素,等同于 bigflow.transforms.min_elements(self, key)

    参数:
    • n (int) — 必须大于0
    • key (function, optional) — 用于提取key的函数,与Python内置min()中的 key 参数相同
    • **options — 可配置选项
    返回:

    包含前n小元素的PCollection,注意对于n=1,这里仍然返回PCollection 而非PObject

    返回类型:

    PCollection

    1. >>> p.parallelize([3, 7, 1, 3, 2, 8]).min_elements(2).get()
    2. [1, 2]
  • reduce(fn, side_inputs, options*)

    使用给定的fn将所有元素规约为单个元素, 等同于 bigflow.transforms.reduce(self, fn, *side_inputs, **options),

    参数:
    • fn (function) — 规约函数
    • side_inputs — 参与运算的SideInputs
    • *options — 可配置参数
    返回:

    规约结果

    返回类型:

    PObject

    1. >>> p.parallelize([1, 2, 3, 4]).reduce(lambda x, y: x + y).get()
    2. 10
  • right_join(other, others, options*)

    与其他PCollection做右连接操作,等同于 bigflow.transforms.right_join(self, other, *others)

    参数:
    • other (PCollection) — 做连接操作的PCollection
    • *others — 更多的PCollection
    返回:

    右连接结果

    返回类型:

    PCollection

    1. >>> x = _pipeline.parallelize([("a", 1), ("b", 4)])
    2. >>> y = _pipeline.parallelize([("a", 2), ("a", 3)])
    3. >>> x.right_join(y).get()
    4. [("a", (1, 2)), ("a", (1, 3))]
  • select(io_description, fn, args, kargs*)

    对每条数据选择一些字段进行变换。

    参数:
    • p (pcollection) — 输入数据集,需要是一个每个元素都是dict的pcollection
    • io_description (str) — 格式为: a,b=>c,d,e 即,输入字段=>输出字段
    • fn (callable) — 函数原型为 (input_pobjects) => (output_pcollection_or_pobjects) 即,该函数的输入参数为多个pobject,每个pobject表示数据的一个字段, 对这个pobject上进行的操作会执行在每行数据上;该函数的返回值是一些pobject或 pcollection所组成的tuple(如果只有一个元素可以不必返回tuple)。
    返回:

    返回一个每个元素是一个dict的pcollection。 这个pcollection中所有元素相当于对原数据每条数据进行一次fn处理, 处理后返回的tuple中的所有数据集进行笛卡尔积, 最终再把所有输入数据处理后得出的结果拼成一个数据集。

    例如::

    1. >>> x = _pipeline.parallelize([{'a' : 1, 'b': 2.0}, {'a': 2, 'b': 3.0}])
    2. >>> print x.apply(fields.select,
    3. >>> 'a, b => c, d, e',
    4. >>> lambda a, b: (
    5. >>> a.map(lambda x: x + 1),
    6. >>> b.map(lambda x, y: x / y, a),
    7. >>> a.flat_map(lambda x: xrange(x))
    8. >>> )
    9. >>> ).get()
    10. [{'c': 2, 'd': 2.0, 'e': 0}, {'c': 3, 'd': 1.5, 'e': 0}, {'c': 3, 'd': 1.5, 'e': 1}]
  • sort(reverse=False)

    对元素排序,等同于 bigflow.transforms.sort(self, reverse)

    参数:reverse (bool) — 若True则降序排列,否则为升序排列
    返回:排序结果
    返回类型:PCollection
    1. >>> _pipeline.parallelize([3, 1, 2, 8]).sort().get()
    2. [1, 2, 3, 8]
  • sort_by(key, reverse=False)

    使用给定的key对元素排序,等同于 bigflow.transforms.sort_by(self, fn, reverse)

    参数:
    • key (function, optional) — 用于提取key的函数,与Python内置sort()中的 key 参数相同
    • reverse (bool) — 若True则降序排列,否则为升序排列
    返回:

    排序结果

    返回类型:

    PCollection

    1. >>> _pipeline.parallelize([3, 1, 2, 8]).sort_by().get()
    2. [1, 2, 3, 8]
  • substract(other)

    已废弃,请使用subtract.

  • subtract(other)

    返回不存在另一个PCollection中的元素,相当于做容器减法

    参数:other (PCollection) — 作为减数的PCollection
    返回:表示减法结果的PCollection
    返回类型:PCollection
    1. >>> a = _pipeline.parallelize([1, 2, 3, 3, 4])
    2. >>> b = _pipeline.parallelize([1, 2, 5])
    3. >>> a.subtract(b).get()
    4. [3, 3, 4]
  • sum()

    将所有元素相加,等同于 bigflow.transforms.sum(self)

    返回:相加结果
    返回类型:PObject
    1. >>> _pipeline.parallelize([3, 1, 2, 8]).sum().get()
    2. 14
  • take(n, **options)

    给定PCollection中的任意n个元素,等同于 bigflow.transforms.take()

    参数:
    • n (int or PObject) — 元素数量
    • **options — 可配置参数
    返回:

    表示结果的PCollection

    返回类型:

    PCollection

    1. >>> _pipeline.parallelize([1, 2, 3, 4]).take(3).get()
    2. [1, 2, 3]
    1. >>> _n = _pipeline.parallelize(2)
    2. >>> _pipeline.parallelize([1, 2, 3, 4]).take(_n).get()
    3. [1, 2]
  • transform(args, options*)

    等同于 bigflow.transforms.transform(self, *args, **options)

  • union(other, others, options*)

    将元素与其他PCollection/PObject中的所有元素共同构成新的PCollection 等同于 bigflow.transforms.union(self, other, *others)

    参数:
    • other (PCollection or PObject) — 其他PCollection/PObject
    • *others — 其他PCollection/PObject
    返回:

    表示结果的PCollection

    返回类型:

    PCollection

    1. >>> _p1 = _pipeline.parallelize([1, 2, 3, 4])
    2. >>> _p2 = _pipeline.parallelize([5, 6, 7, 8])
    3. >>> _p1.union(_p2).get()
    4. [1, 2, 3, 4, 5, 6, 7, 8]
  • window_into(window, **options)

    对元素根据Window分组

    参数:
    • window (Window) — 用于分组的Window
    • **options — 可配置选项
    返回:

    分组结果

    返回类型:

    PTable