PObject

bigflow.pobject.PObject 定义

class bigflow.pobject.PObject(node, pipeline)

基类:bigflow.ptype.PType

用于表示单个元素的 bigflow.ptype.PType,通常为聚合类变换的结果,例如 bigflow.pcollection.PCollection.combine(), bigflow.pcollection.PCollection.aggregate()

注解

用户不应当直接使用其构造方法

注解

Python不允许重载这3个运算符(and, or, not),用户不应该在PObject上使用这三个运算符。Bigflow重载的是按位运算(&, |, ^)

PObject类上重载了以下操作符:

双目操作符:

__add__, __sub__, __mul__, __div__, __floordiv__, __mod__, __pow__, __lshift__, __rshift__, __and__, __xor__, __or__, __lt__, __le__, __eq__, __ge__, __gt__, __ne__ __radd__, __rsub__, __rmul__, __rdiv__, __rfloordiv__, __rmod__, __rpow__, __rlshift__, __rrshift__, __rand__, __rxor__, __ror__

单目操作符:

__neg__, __pos__, __abs__, __invert__

这些操作都将返回一个把相应数据进行相应变换后的pobject。

例如:p.sum() / p.count()等价于p.sum().map(lambda s, c: s / c, p.count())

同时, PObject 禁止了 bool 操作, 调用 bool(PObject) 或者 if PObject 将会抛出异常. :param node: LogicalPlan.Node :type node: Node

  • as_pcollection()

    将PObject转为PCollection

    返回:变换结果
    返回类型:PCollection
  • cartesian(pvalues, options*)

    求当前算子与pvalues的笛卡尔积。 等价于 bigflow.transforms.cartesian(self, *pvalues, **options)

    Args: *pvalues (PObject/PCollection) :returns: 此PObject与所有参数的笛卡尔积。结果PCollection中的每条记录是一个tuple。

    每个tuple的第n个元素是第n个输入ptype对象的记录。

    返回类型:PCollection
    1. >>> _p1 = _pipeline.parallelize(1)
    2. >>> _p2 = _pipeline.parallelize(2)
    3. >>> _p1.cartesian(_p2).get()
    4. [(1, 2)]
    5. >>> _p3 = _pipeline.parallelize([3, 4])
    6. >>> _p1.cartesian(_p3).get()
    7. [(1, 3), (1, 4)]
    8. >>> _p1.cartesian(_p2, _p3).get()
    9. [(1, 2, 3), (1, 2, 4)]
  • ceil()

    PObject ceil(类似于math.ceil)算子

    返回:PObject
    1. >>> _p1 = _pipeline.parallelize(1.2)
    2. >>> print _p1.ceil().get()
    3. 2
  • flat_map(fn, side_inputs, option*)

    对包含的元素进行一对多变换,等同于 transforms.map(self, fn, *side_inputs, **options)

    参数:
    • fn (function) — 变换函数
    • side_inputs — 参与运算的SideInputs
    • *options — 可配置选项
    返回:

    变换结果

    返回类型:

    PCollection

    1. >>> _pipeline.parallelize(1).flat_map(lambda x: [x, x * 2]).get()
    2. [1, 2]
  • floor()

    PObject floor(类似于math.floor)算子

    返回:PObject
    1. >>> _p1 = _pipeline.parallelize(1.2)
    2. >>> print _p1.floor().get()
    3. 1
  • map(fn, side_inputs, options*)

    对包含的元素进行变换,等同于 transforms.map(self, fn, *side_inputs, **options)

    参数:
    • fn (function) — 变换函数
    • side_inputs — 参与运算的SideInputs
    • *options — 可配置选项
    返回:

    变换结果

    返回类型:

    PObject

    1. >>> p.parallelize(1).map(lambda x: x + 1).get()
    2. 2
  • not_()

    PObject not算子

    返回:PObject
    1. >>> _p1 = _pipeline.parallelize(1)
    2. >>> print _p1.not_().get()
    3. False
  • round(n=0)

    PObject round(类似于math.round)算子 :param n: 小数点后面保留的位数

    返回:PObject
    1. >>> _p1 = _pipeline.parallelize(1.2)
    2. >>> print _p1.round().get()
    3. 1
  • union(other, others, option*)

    将元素与其他PCollection/PObject中的所有元素共同构成PCollection 等同于 transforms.union(self, other, *others)

    参数:
    • other (PCollection or PObject) — 其他PCollection/PObject
    • *others — 其他PCollection/PObject
    返回:

    表示结果的PCollection

    返回类型:

    PCollection

    1. >>> _p1 = _pipeline.parallelize(1)
    2. >>> _p2 = _pipeline.parallelize([2, 3])
    3. >>> _p1.union(_p2).get()
    4. [1, 2, 3]