一、概述

  1. RDD(弹性分布式数据集Resilient Distributed Dataset):Spark 中数据的核心抽象。

    • RDD 是不可变的分布式对象集合
    • 每个RDD 都被分为多个分区,这些分区运行在集群中的不同节点上
    • RDD 可以包含Python、Java、Scala 中任意类型的对象
  2. spark 中,RDD 相关的函数分为三类:

    • 创建RDD
    • 转换(transformation) 已有的RDD
    • 执行动作 (action) 来对RDD 求值

    在这些背后,spark 自动将RDD 中的数据分发到集群上,并将action 并行化执行。

  3. RDD 支持两类操作:

    • 转换操作(transformation) : 它会从一个RDD 生成一个新的RDD
    • 行动操作(action) :它会对RDD 计算出一个结果,并将结果返回到driver 程序中(或者把结果存储到外部存储系统,如HDFS 中)
  4. 如果你不知道一个函数时转换操作还是行动操作,你可以考察它的返回值:

    • 如果返回的是RDD,则是转换操作。如果返回的是其它数据类型,则是行动操作
  5. 转换操作和行动操作的区别在于:行动操作会触发实际的计算。

    • 你可以在任意时候定义新的RDD,但是Spark 只会惰性计算这些RDD :只有第一次在一个行动操作中用到时,才会真正计算

      • 所有返回RDD的操作都是惰性的(包括读取数据的sc.textFile() 函数)
    • 在计算RDD 时,它所有依赖的中间RDD 也会被求值

      • 通过完整的转换链,spark 只计算求值过程中需要的那些数据。
  6. 默认情况下,sparkRDD 会在你每次对它进行行动操作时重新计算。

    • 如果希望在多个行动操作中重用同一个RDD,则可以使用RDD.persist()spark 把这个RDD 缓存起来

    • 在第一次对持久化的RDD 计算之后,Spark 会把RDD 的内容保存到内存中(以分区的方式存储到集群的各个机器上)。然后在此后的行动操作中,可以重用这些数据

      1. lines = sc.textFile("xxx.md")
      2. lines.persist()
      3. lines.count() #计算 lines,此时将 lines 缓存
      4. lines.first() # 使用缓存的 lines
    • 之所以默认不缓存RDD的计算结果,是因为:spark 可以直接遍历一遍数据然后计算出结果,没必要浪费存储空间。

  7. 每个Spark 程序或者shell 会话都按照如下流程工作:

    • 从外部数据创建输入的RDD
    • 使用诸如filter() 这样的转换操作对RDD 进行转换,以定义新的RDD
    • 对需要被重用的中间结果RDD 执行persist() 操作
    • 使用行动操作(如count() 等) 触发一次并行计算,spark 会对计算进行优化之后再执行
  8. spark 中的大部分转化操作和一部分行动操作需要用户传入一个可调用对象。在python 中,有三种方式:lambda 表达式、全局定义的函数、局部定义的函数

    • 注意:python 可能会把函数所在的对象也序列化之后向外传递。

      当传递的函数是某个对象的成员,或者包含了某个对象中一个字段的引用(如self.xxx 时),spark 会把整个对象发送到工作节点上。

      • 如果python 不知道如何序列化该对象,则程序运行失败
      • 如果该序列化对象太大,则传输的数据较多
      1. class XXX:
      2. def is_match(self,s):
      3. return xxx
      4. def get_xxx(self,rdd):
      5. return rdd.filter(self.is_match) # bad! 传递的函数 self.is_match 是对象的成员
      6. def get_yyy(self,rdd):
      7. return rdd.filter(lambda x:self._x in x) #bad! 传递的函数包含了对象的成员 self._x
    • 解决方案是:将你需要的字段从对象中取出,放到一个局部变量中:

      1. class XXX:
      2. def is_match(self,s):
      3. return xxx
      4. def get_xxx(self,rdd):
      5. _is_match = self.is_match
      6. return rdd.filter(_is_match) # OK
      7. def get_yyy(self,rdd):
      8. _x = self._x
      9. return rdd.filter(lambda x:_x in x) #OK
  9. python中,如果操作对应的RDD 数据类型不正确,则导致运行报错。