PCollection
bigflow.pcollection.PCollection
定义
class bigflow.pcollection.PCollection
(node, pipeline)
用于表示分布式数据集的 bigflow.ptype.PType
注解
用户不应当直接使用其构造方法
参数: | node (Node) — LogicalPlan.Node |
---|
accumulate
(zero, accumulate_fn, side_inputs, options*)等同于
bigflow.transforms.accumulate(self, zero, accumulate_fn, *side_inputs, **options)
参数: - zero (value or function) — 初始值,或是一个返回初始值的方法
- accumulate_fn (function) — 聚合方法
- side_inputs — 参与运算的SideInputs
- *options — 可配置选项
返回: 聚合结果
返回类型: >>> from operator import add
>>> p.parallelize([1, 2, 3]).accumulate(0, add).get()
6
agg
(io_description, fn, args, kargs*)选择一些字段去做一些聚合操作。
参数: - p (pcollection) — 输入数据集,需要是一个每个元素都是dict的pcollection
- io_description (str) — 格式为: a,b=>c,d,e 即,输入字段=>输出字段
- fn (callable) — 函数原型为 (input_pcollections) => (output_pcollection_or_pobjects) 即,该函数的输入参数为多个pcollection, 每个pcollection表示数据的一个字段的全部行所拼成的一个pcollection。 该函数的返回值是一些pobject或 pcollection所组成的tuple(如果只有一个元素可以不必返回tuple)。
返回: 返回一个每个元素是一个dict的pcollection。 这个pcollection中所有元素输出的几个pcollection进行笛卡尔积并添加字段名后的结果。
例如::
>>> x = _pipeline.parallelize([{'a' : 1, 'b': 2.0}, {'a': 2, 'b': 3.0}])
>>> print x.apply(fields.agg,
>>> 'a, b => c, d, e',
>>> lambda a, b: (
>>> a.count(),
>>> b.sum(),
>>> a.flat_map(lambda x: xrange(x))
>>> )
>>> ).get()
[{'c': 2, 'd': 5.0, 'e': 0}, {'c': 2, 'd': 5.0, 'e': 0}, {'c': 2, 'd': 5.0, 'e': 1}]
aggregate
(zero, aggregate_fn, combine_fn, side_inputs, options*)等同于
bigflow.transforms.aggregate(self, aggregate_fn, combine_fn, *side_inputs, **options)
参数: - pcollection (PCollection) — 输入PCollection
- zero (value or function) — 初始值,或是一个返回初始值的方法
- accumulate_fn (function) — 聚合方法
- side_inputs — 参与运算的SideInputs
- *options — 可配置选项
返回: 聚合结果
返回类型: -
等同于 to
self.first()
返回: 转换结果 返回类型: PObject >>> _pipeline.parallelize(["A"]).as_pobject().get()
"A"
-
根据字段,返回一个SchemaPCollection
参数: fields — 类型可以是,tuple,list,dict; 当fields是tuple或list时, 会判断每个元素的类型:
fields中的每个元素是python基本类型或一个serde; 接口将构造TupleSerde设置到PCollection每个元素fields中的每个元素是python string,抛出异常
- 当fields是dict时:
- fields的key标识字段类型,value标识该字段的类型,如 {“name”: str, “age”: int} 当前PCollection中的每个元素必须是dict,dict内的key必须相同。 fields内的key要和PCollection内的key必须相同
返回: 表示转化后的PCollection 返回类型: PCollection Examples
>>> data = self._pipeline.parallelize([("xiaoming", "PKU", 20)])
>>> d1 = data.as_schema((str, str, int))
>>> d2 = data.as_schema([str, str, int])
>>> print d1.get()
[('xiaoming', 'PKU', 20)]
>>>
>>> print d2.get()
[('xiaoming', 'PKU', 20)]
>>>
>>> data = self._pipeline.parallelize([{"name": "xiaoming", "school": "PKU", "age": 20}])
>>> d5 = data.as_schema({"name": str, "school": str, "age": int})
>>> print d5.get()
[{'age': 20, 'name': 'xiaoming', 'school': 'PKU'}]
>>>
cartesian
(other, others, options*)与其他的PCollection做笛卡尔积
参数: - other (PCollection) — 其他的PCollection
- *others — 更多的PCollection
返回: 表示结果的PCollection
返回类型: >>> _p1 = _pipeline.parallelize([1, 2, 3])
>>> _p2 = _pipeline.parallelize([4, 5])
>>> _p1.cartesian(_p2).get()
[(1, 4), (1, 5), (2, 4), (2, 5), (3, 4), (3, 5)]
-
等同于
bigflow.transforms.cogroup(self, other, *others)
,参数: - other (PCollection) — 用于协同分组的PCollection
- *others — 更多的PCollection
返回: 分组结果
返回类型: -
等同于
bigflow.transforms.combine(self, fn)
参数: - fn (callable) — 合并函数
- **options — 可配置选项
返回: 合并结果
返回类型: >>> _pipeline.parallelize([2, 4, 6, 10]).combine(sum).get()
22
-
返回元素的数量,等同于
bigflow.transforms.count(self)
返回: 元素数量 返回类型: PObject >>> p.parallelize([1, 2, 3, 4]).count().get()
4
-
返回与另一个PCollection中不相同的元素
参数: other (PCollection) — 另一个PCollection 返回: 表示结果的PCollection 返回类型: PCollection >>> a = _pipeline.parallelize([1, 1, 2, 3])
>>> b = _pipeline.parallelize([1, 1, 2, 2])
>>> a.diff(b).get()
[(2, (1, 2)), (3, (1, 0))]
-
元素去重,等同于
bigflow.transforms.distinct(self)
参数: **options — 可配置选项 返回: 不重复元素,以PCollection给出 返回类型: PCollection >>> p.parallelize([2, 2, 1, 9, 3, 3]).distinct().get()
[2, 3, 1, 9]
filter
(fn, side_inputs, options*)过滤元素,等同于
bigflow.transforms.filter(self, fn, *side_inputs, **options)
,参数: - fn (function) — 断言函数
- side_inputs — 参与运算的SideInputs
- *options — 可配置选项
返回: 过滤结果
返回类型: >>> p.parallelize([3, 7, 1, 3, 2, 8]).filter(lambda x: x % 2 == 0).get()
[2, 8]
-
取第一个元素
返回: 表示结果的PObject 返回类型: PObject >>> p.parallelize([3, 7, 1, 3, 2, 8]).first.get()
3
flat_map
(fn, side_inputs, options*)对所有元素进行一对多映射,等同于
bigflow.transforms.flat_map(self, fn, *side_inputs, **options)
参数: - fn (function) — 变换函数
- side_inputs — 参与运算的SideInputs
- *options — 可配置选项
返回: 变换结果
返回类型: >>> _pipeline.parallelize([1, 3, 5, 7]).flat_map(lambda x: [x, x * 2]).get()
[1, 2, 3, 5, 6, 7, 10, 14]
foreach
(fn, side_inputs, options*)等同于
bigflow.transforms.foreach(self, fn, *side_inputs, **options)
参数: - fn (function) — 变换函数
- *side_inputs — 参与运算的SideInputs
返回: None
full_join
(other, others, options*)与其他PCollection做全连接操作,等同于
bigflow.transforms.full_join(self, other, *others)
参数: - other (PCollection) — 做连接操作的PCollection
- *others — 更多的PCollection
返回: 全连接结果
返回类型: >>> x = _pipeline.parallelize([("a", 1)])
>>> y = _pipeline.parallelize([("b", 2)])
>>> x.full_join(y).get()
[("a", (1, None)), ("b", (None, 2))]
group_by
(key_extractor, value_extractor=None, **options)对元素分组,等同于
bigflow.transforms.group_by(self, key_extractor, value_extractor)
参数: - key_extractor (function) — 用于提取key的函数
- value_extractor (function, optional) — 用于提取value的函数
- **options — 可配置选项
返回: 分组结果
返回类型: >>> _pcollection = _pipeline.parallelize([("A", 4), ("A", 3), ("B", 2), ("A", 1)])
>>> _grouped = _pcollection.group_by(lambda x: x[0], lambda x: x[1]).get()
>>> _grouped.get()
{"A": [4, 3, 1], "B": [2]}
-
与
group_by
变换类似,但使用默认的key/value提取函数对元素分组参数: **options — 可配置参数 返回: 分组结果 返回类型: PTable >>> _pipeline.parallelize([("A", 4), ("A", 3), ("B", 2), ("A", 1)]).group_by_key().get()
{"A": [4, 3, 1], "B": [2]}
intersection
(other, output_duplicated=False)返回与另一个PCollection的交集
参数: other (PCollection) — 另一个PCollection 返回: 表示交集的PCollection 返回类型: PCollection >>> a = _pipeline.parallelize([1, 1, 2, 3])
>>> b = _pipeline.parallelize([1, 1, 2, 2, 5])
>>> a.intersection(b).get()
[1, 2]
>>> a.intersection(b, output_duplicated = True).get()
[1, 1, 2]
-
判断此PCollection是否为空
返回: 表示结果的PObject 返回类型: PObject >>> a = _pipeline.parallelize([1, 2, 3, 4])
>>> transforms.is_empty(a).get()
False
>>> b = _pipeline.parallelize([])
>>> transforms.is_empty(b).get()
True
-
与其他PCollection做连接操作,等同于
bigflow.transforms.join(self, other, *others)
参数: - other (PCollection) — 做连接操作的PCollection
- *others — 更多的PCollection
返回: 连接结果
返回类型: >>> x = _pipeline.parallelize([("a", 1), ("b", 4)])
>>> y = _pipeline.parallelize([("a", 2), ("a", 3)])
>>> x.join(y).get()
[("a", (1, 2)), ("a", (1, 3))]
left_join
(other, others, options*)与其他PCollection做左连接操作,等同于
bigflow.transforms.left_join(self, other, *others)
参数: - other (PCollection) — 做连接操作的PCollection
- *others — 更多的PCollection
返回: 左连接结果
返回类型: >>> x = _pipeline.parallelize([("a", 1), ("b", 4)])
>>> y = _pipeline.parallelize([("a", 2)])
>>> x.left_join(y).get()
[("a", (1, 2)), ("b", (4, None))]
map
(fn, side_inputs, options*)对所有元素进行一对一映射变换,等同于
bigflow.transforms.map(self, fn, *side_inputs, **options)
参数: - fn (function) — 变换函数
- side_inputs — 参与运算的SideInputs
- *options — 可配置选项
返回: 变换结果
返回类型: >>> p.parallelize([1, 3, 5, 7]).map(lambda x: x + 1).get()
[2, 4, 6, 8]
-
取最大元素,等同于
bigflow.transforms.max(self, key)
参数: - key (function, optional) — 用于提取key的函数,与Python内置max()中的
key
参数相同 - **options — 可配置选项
返回: 包含最大元素的PObject
返回类型: >>> p.parallelize([3, 7, 1, 3, 2, 8]).max().get()
8
- key (function, optional) — 用于提取key的函数,与Python内置max()中的
max_elements
(n, key=None, **options)取前n大元素,等同于
bigflow.transforms.max_elements(self, n, key)
参数: - n (int) — 必须大于0
- key (function, optional) — 用于提取key的函数,与Python内置max()中的
key
参数相同 - **options — 可配置选项
返回: 包含前n大元素的PCollection,注意对于n=1,这里仍然返回PCollection 而非PObject
返回类型: >>> p.parallelize([3, 7, 1, 3, 2, 8]).max_elements(2).get()
[8, 7]
-
取最小元素, 等同于
bigflow.transforms.min(self, key)
参数: - key (function, optional) — 用于提取key的函数,与Python内置min()中的
key
参数相同 - **options — 可配置选项
返回: 最小元素
返回类型: >>> p.parallelize([3, 7, 1, 3, 2, 8]).min().get()
1
- key (function, optional) — 用于提取key的函数,与Python内置min()中的
min_elements
(n, key=None, **options)取前n小元素,等同于
bigflow.transforms.min_elements(self, key)
参数: - n (int) — 必须大于0
- key (function, optional) — 用于提取key的函数,与Python内置min()中的
key
参数相同 - **options — 可配置选项
返回: 包含前n小元素的PCollection,注意对于n=1,这里仍然返回PCollection 而非PObject
返回类型: >>> p.parallelize([3, 7, 1, 3, 2, 8]).min_elements(2).get()
[1, 2]
reduce
(fn, side_inputs, options*)使用给定的fn将所有元素规约为单个元素, 等同于
bigflow.transforms.reduce(self, fn, *side_inputs, **options)
,参数: - fn (function) — 规约函数
- side_inputs — 参与运算的SideInputs
- *options — 可配置参数
返回: 规约结果
返回类型: >>> p.parallelize([1, 2, 3, 4]).reduce(lambda x, y: x + y).get()
10
right_join
(other, others, options*)与其他PCollection做右连接操作,等同于
bigflow.transforms.right_join(self, other, *others)
参数: - other (PCollection) — 做连接操作的PCollection
- *others — 更多的PCollection
返回: 右连接结果
返回类型: >>> x = _pipeline.parallelize([("a", 1), ("b", 4)])
>>> y = _pipeline.parallelize([("a", 2), ("a", 3)])
>>> x.right_join(y).get()
[("a", (1, 2)), ("a", (1, 3))]
select
(io_description, fn, args, kargs*)对每条数据选择一些字段进行变换。
参数: - p (pcollection) — 输入数据集,需要是一个每个元素都是dict的pcollection
- io_description (str) — 格式为: a,b=>c,d,e 即,输入字段=>输出字段
- fn (callable) — 函数原型为 (input_pobjects) => (output_pcollection_or_pobjects) 即,该函数的输入参数为多个pobject,每个pobject表示数据的一个字段, 对这个pobject上进行的操作会执行在每行数据上;该函数的返回值是一些pobject或 pcollection所组成的tuple(如果只有一个元素可以不必返回tuple)。
返回: 返回一个每个元素是一个dict的pcollection。 这个pcollection中所有元素相当于对原数据每条数据进行一次fn处理, 处理后返回的tuple中的所有数据集进行笛卡尔积, 最终再把所有输入数据处理后得出的结果拼成一个数据集。
例如::
>>> x = _pipeline.parallelize([{'a' : 1, 'b': 2.0}, {'a': 2, 'b': 3.0}])
>>> print x.apply(fields.select,
>>> 'a, b => c, d, e',
>>> lambda a, b: (
>>> a.map(lambda x: x + 1),
>>> b.map(lambda x, y: x / y, a),
>>> a.flat_map(lambda x: xrange(x))
>>> )
>>> ).get()
[{'c': 2, 'd': 2.0, 'e': 0}, {'c': 3, 'd': 1.5, 'e': 0}, {'c': 3, 'd': 1.5, 'e': 1}]
-
对元素排序,等同于
bigflow.transforms.sort(self, reverse)
参数: reverse (bool) — 若True则降序排列,否则为升序排列 返回: 排序结果 返回类型: PCollection >>> _pipeline.parallelize([3, 1, 2, 8]).sort().get()
[1, 2, 3, 8]
-
使用给定的key对元素排序,等同于
bigflow.transforms.sort_by(self, fn, reverse)
参数: - key (function, optional) — 用于提取key的函数,与Python内置sort()中的
key
参数相同 - reverse (bool) — 若True则降序排列,否则为升序排列
返回: 排序结果
返回类型: >>> _pipeline.parallelize([3, 1, 2, 8]).sort_by().get()
[1, 2, 3, 8]
- key (function, optional) — 用于提取key的函数,与Python内置sort()中的
-
已废弃,请使用subtract.
-
返回不存在另一个PCollection中的元素,相当于做容器减法
参数: other (PCollection) — 作为减数的PCollection 返回: 表示减法结果的PCollection 返回类型: PCollection >>> a = _pipeline.parallelize([1, 2, 3, 3, 4])
>>> b = _pipeline.parallelize([1, 2, 5])
>>> a.subtract(b).get()
[3, 3, 4]
-
将所有元素相加,等同于
bigflow.transforms.sum(self)
返回: 相加结果 返回类型: PObject >>> _pipeline.parallelize([3, 1, 2, 8]).sum().get()
14
-
给定PCollection中的任意n个元素,等同于
bigflow.transforms.take()
参数: 返回: 表示结果的PCollection
返回类型: >>> _pipeline.parallelize([1, 2, 3, 4]).take(3).get()
[1, 2, 3]
>>> _n = _pipeline.parallelize(2)
>>> _pipeline.parallelize([1, 2, 3, 4]).take(_n).get()
[1, 2]
union
(other, others, options*)将元素与其他PCollection/PObject中的所有元素共同构成新的PCollection 等同于
bigflow.transforms.union(self, other, *others)
参数: - other (PCollection or PObject) — 其他PCollection/PObject
- *others — 其他PCollection/PObject
返回: 表示结果的PCollection
返回类型: >>> _p1 = _pipeline.parallelize([1, 2, 3, 4])
>>> _p2 = _pipeline.parallelize([5, 6, 7, 8])
>>> _p1.union(_p2).get()
[1, 2, 3, 4, 5, 6, 7, 8]
window_into
(window, **options)对元素根据Window分组
参数: - window (Window) — 用于分组的Window
- **options — 可配置选项
返回: 分组结果
返回类型: