三、转换操作

  1. 转换操作(transformation) 会从一个RDD 生成一个新的RDD

    • 在这个过程中并不会求值。求值发生在action 操作中
    • 在这个过程中并不会改变输入的RDDRDD 是不可变的),而是创建并返回一个新的RDD
  2. spark 会使用谱系图来记录各个RDD 之间的依赖关系

    • 在对RDD 行动操作中,需要这个依赖关系来按需计算每个中间RDD
    • 当持久化的RDD 丢失部分数据时,也需要这个依赖关系来恢复丢失的数据

3.1 通用转换操作

  1. 基本转换操作:

    • .map(f, preservesPartitioning=False) :将函数 f 作用于当前RDD的每个元素,返回值构成新的RDD

      • preservesPartitioning:如果为True,则新的RDD 保留旧RDD 的分区
    • .flatMap(f, preservesPartitioning=False) :将函数 f 作用于当前RDD的每个元素,将返回的迭代器的内容构成了新的RDD

      • flatMap 可以视作:将返回的迭代器扁平化
      1. lines = sc.parallelize(['hello world','hi'])
      2. lines.map(lambda line:line.split(" ")) #新的RDD元素为[['hello','world'],['hi',]]
      3. lines.flatMap(lambda line:line.split(" ")) #新的RDD元素为 ['hello','word','hi']
    • .mapPartitions(f, preservesPartitioning=False):将函数 f 作用于当前RDD的每个分区,将返回的迭代器的内容构成了新的RDD

      这里f 函数的参数是一个集合(表示一个分区的数据)

    • .mapPartitionsWithIndex(f, preservesPartitioning=False):将函数 f 作用于当前RDD的每个分区以及分区id,将返回的迭代器的内容构成了新的RDD

      • 这里f 函数的参数是f分区id 以及一个集合(表示一个分区的数据)

      示例:

      1. def f(splitIndex, iterator):
      2. xxx
      3. rdd.mapPartitionsWithIndex(f)
    • .filter(f):将函数f(称作过滤器) 作用于当前RDD的每个元素,通过f 的那些元素构成新的RDD

    • .distinct(numPartitions=None):返回一个由当前RDD 元素去重之后的结果组成新的RDD

      • numPartitions:指定了新的RDD 的分区数
    • sample(withReplacement, fraction, seed=None) :对当前RDD 进行采样,采样结果组成新的RDD

      • withReplacement:如果为True,则可以重复采样;否则是无放回采样

      • fractions:新的RDD 的期望大小(占旧RDD的比例)。spark 并不保证结果刚好满足这个比例(只是一个期望值)

        • 如果withReplacement=True:则表示每个元素期望被选择的次数
        • 如果withReplacement=False:则表示每个元素期望被选择的概率
      • seed:随机数生成器的种子
    • .sortBy(keyfunc, ascending=True, numPartitions=None):对当前RDD 进行排序,排序结果组成新的RDD

      • keyfunc:自定义的比较函数
      • ascending:如果为True,则升序排列
    • .glom():返回一个RDD,它将旧RDD 每个分区的元素聚合成一个列表,作为新RDD 的元素

    • .groupBy(f, numPartitions=None, partitionFunc=<function portable_hash at 0x7f51f1ac0668>):返回一个分组的RDD

      示例:

      1. rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
      2. result = rdd.groupBy(lambda x: x % 2).collect()
      3. #结果为: [(0, [2, 8]), (1, [1, 1, 3, 5])]
  2. 针对两个RDD的转换操作:

    尽管RDD 不是集合,但是它也支持数学上的集合操作。注意:这些操作都要求被操作的RDD 是相同数据类型的。

    • .union(other):合并两个RDD 中所有元素,生成一个新的RDD

      • other:另一个RDD

      该操作并不会检查两个输入RDD 的重复元素,只是简单的将二者合并(并不会去重)。

    • .intersection(other):取两个RDD 元素的交集,生成一个新的RDD

      该操作会保证结果是去重的,因此它的性能很差。因为它需要通过网络混洗数据来发现重复的元素。

    • .subtract(other, numPartitions=None):存在于第一个RDD 而不存在于第二个RDD 中的所有元素组成的新的RDD

      该操作也会保证结果是去重的,因此它的性能很差。因为它需要通过网络混洗数据来发现重复的元素。

    • .cartesian(other):两个RDD 的笛卡尔积,生成一个新的RDD

      RDD 中的元素是元组 (a,b),其中 a 来自于第一个RDDb 来自于第二个RDD

      • 注意:求大规模的RDD 的笛卡尔积开销巨大
      • 该操作不会保证结果是去重的,它并不需要网络混洗数据。
  3. .keyBy(f):创建一个RDD,它的元素是元组(f(x),x)

    示例:

    1. sc.parallelize(range(2,5)).keyBy(lambda x: x*x)
    1. # 结果为:[(4, 2), (9, 3), (16, 4)]
  4. .pipe(command, env=None, checkCode=False):返回一个RDD,它由外部进程的输出结果组成。

    • 参数:

      • command:外部进程命令
      • env:环境变量
      • checkCode:如果为True,则校验进程的返回值
  5. .randomSplit(weights, seed=None):返回一组新的RDD,它是旧RDD 的随机拆分

    • 参数:

      • weights:一个double的列表。它给出了每个结果DataFrame 的相对大小。如果列表的数值之和不等于 1.0,则它将被归一化为 1.0
      • seed:随机数种子
  6. .zip(other):返回一个Pair RDD,其中键来自于self,值来自于other

    • 它假设两个RDD 拥有同样数量的分区,且每个分区拥有同样数量的元素
  7. .zipWithIndex():返回一个Pair RDD,其中键来自于self,值就是键的索引。

  8. .zipWithUniqueId():返回一个Pair RDD,其中键来自于self,值是一个独一无二的id

    它不会触发一个spark job,这是它与zipWithIndex 的重要区别。

3.2 Pair RDD转换操作

  1. Pair RDD 可以使用所有标准RDD 上的可用的转换操作

    • 由于Pair RDD 的元素是二元元组,因此传入的函数应当操作二元元组,而不是独立的元素。
  2. 基本转换操作:

    • .keys():返回一个新的RDD,包含了旧RDD 每个元素的键

    • .values():返回一个新的RDD,包含了旧RDD 每个元素的值

    • .mapValues(f):返回一个新的RDD,元素为 [K,f(V)](保留原来的键不变,通过f 改变值)。

    • .flatMapValues(f):返回一个新的RDD,元素为 [K,f(V)](保留原来的键不变,通过f 改变值)。它与.mapValues(f) 区别见下面的示例:

      1. x=sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
      2. x1=x.flatMapValues(lambda t:t).collect()
      3. # x1: [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
      4. x2=x.mapValues(lambda t:t).collect()
      5. # x2: [("a", ["x", "y", "z"]), ("b", ["p", "r"])]
    • .sortByKey(ascending=True, numPartitions=None, keyfunc=<function <lambda> at 0x7f51f1ab5050>):对当前Pair RDD 进行排序,排序结果组成新的RDD

      • keyfunc:自定义的比较函数
      • ascending:如果为True,则升序排列
    • .sampleByKey(withReplacement, fractions, seed=None):基于键的采样(即:分层采样)

      • 参数:

        • withReplacement:如果为True,则是有放回的采样;否则是无放回的采样
        • fractions:一个字典,指定了键上的每个取值的采样比例(不同取值之间的采样比例无关,不需要加起来为1)
        • seed:随机数种子
    • .subtractByKey(other, numPartitions=None):基于键的差集。返回一个新的RDD,其中每个(key,value) 都位于self 中,且不在other
  3. 基于键的聚合操作:

    在常规RDD上,fold()、aggregate()、reduce() 等都是行动操作。在Pair RDD 上,有类似的一组操作,用于针对相同键的元素进行聚合。这些操作返回RDD,因此是转化操作而不是行动操作。

    返回的新RDD 的键为原来的键,值为针对键的元素聚合的结果。

    • .reduceByKey(f,numPartitions=None,partitionFunc=<function portable_hash at 0x7f51f1ac0668>):合并具有相同键的元素。f 作用于同一个键的那些元素的值。

      • 它为每个键进行并行的规约操作,每个规约操作将键相同的值合并起来
      • 因为数据集中可能有大量的键,因此该操作返回的是一个新的RDD:由键,以及对应的规约结果组成
    • .foldByKey(zeroValue,f,numPartitions=None,partitionFunc=<function portable_hash at 0x7f51f1ac0668>):通过f聚合具有相同键的元素。其中zeroValue 为零值。参见.fold()

    • .aggregateByKey(zeroValue,seqFunc,combFunc,numPartitions=None,partitionFunc=<function portable_hash at 0x7f51f1ac0668>):通过f聚合具有相同键的元素。其中zeroValue 为零值。参见.aggregate()

    • .combineByKey(createCombiner,mergeValue,mergeCombiners, numPartitions=None,partitionFunc=<function portable_hash at 0x7f51f1ac0668>):它是最为常用的基于键的聚合函数,大多数基于键的聚合函数都是用它实现的。

      aggregate() 一样,combineByKey() 可以让用户返回与输入数据类型不同的返回值。

      你需要提供三个函数:

      • createCombiner(v)v 表示键对应的值。返回一个C 类型的值(表示累加器)
      • mergeValue(c,v)c 表示当前累加器,v 表示键对应的值。返回一个C 类型的值(表示更新后的累加器)
      • mergeCombiners(c1,c2)c1 表示某个分区某个键的累加器,c2 表示同一个键另一个分区的累加器。返回一个C 类型的值(表示合并后的累加器)

      其工作流程是:遍历分区中的所有元素。考察该元素的键:

      • 如果键从未在该分区中出现过,表明这是分区中的一个新的键。则使用createCombiner() 函数来创建该键对应的累加器的初始值。

        注意:这一过程发生在每个分区中,第一次出现各个键的时候发生。而不仅仅是整个RDD 中第一次出现一个键时发生。

      • 如果键已经在该分区中出现过,则使用mergeValue() 函数将该键的累加器对应的当前值与这个新的值合并

      • 由于每个分区是独立处理的,因此同一个键可以有多个累加器。如果有两个或者更多的分区都有同一个键的累加器,则使用mergeCombiners() 函数将各个分区的结果合并。

  4. 数据分组:

    • .groupByKey(numPartitions=None, partitionFunc=<function portable_hash at 0x7f51f1ac0668>):根据键来进行分组。

      • 返回一个新的RDD,类型为[K,Iterable[V]],其中K 为原来RDD 的键的类型,V 为原来RDD 的值的类型。
      • 如果你分组的目的是为了聚合,那么直接使用reduceByKey、aggregateByKey 性能更好。
    • .cogroup(other,numPartitions=None):它基于selfother 两个TDD 中的所有的键来进行分组,它提供了为多个RDD 进行数据分组的方法。

      • 返回一个新的RDD,类型为[K,(Iterable[V],Iterable[W])] 。其中K 为两个输入RDD 的键的类型,V 为原来self的值的类型,Wother 的值的类型。
      • 如果某个键只存在于一个输入RDD 中,另一个输入RDD 中不存在,则对应的迭代器为空。
      • 它是groupWith 的别名,但是groupWith 支持更多的TDD 来分组。
  5. 数据连接:

    数据连接操作的输出RDD 会包含来自两个输入RDD 的每一组相对应的记录。输出RDD 的类型为[K,(V,W)] ,其中K 为两个输入RDD 的键的类型,V 为原来self的值的类型,Wother 的值的类型。

    • .join(other,numPartitions=None):返回一个新的RDD,它是两个输入RDD的内连接。
    • .leftOuterJoin(other,numPartitions=None):返回一个新的RDD,它是两个输入RDD的左外连接。
    • .rightOuterJoin(other,numPartitions=None):返回一个新的RDD,它是两个输入RDD的右外连接。
    • .fullOuterJoin(other, numPartitions=None):执行right outer join