3.2 从 RDD 创建

  1. 未指定列名:

    1. rdd = sc.parallelize([('Alice', 1)])
    2. spark_session.createDataFrame(rdd).collect()

    结果为:

    1. [Row(_1=u'Alice', _2=1)] #自动分配列名
  2. 指定列名:

    1. rdd = sc.parallelize([('Alice', 1)])
    2. spark_session.createDataFrame(rdd, ['name', 'age']).collect()

    结果为:

    1. [Row(name=u'Alice', age=1)]
  3. 通过Row 来创建:

    1. from pyspark.sql import Row
    2. Person = Row('name', 'age')
    3. rdd = sc.parallelize([('Alice', 1)]).map(lambda r: Person(*r))
    4. spark_session.createDataFrame(rdd, ['name', 'age']).collect()

    结果为:

    1. [Row(name=u'Alice', age=1)]
  4. 指定schema

    1. from pyspark.sql.types import *
    2. schema = StructType([
    3. StructField("name", StringType(), True),
    4. StructField("age", IntegerType(), True)])
    5. rdd = sc.parallelize([('Alice', 1)])
    6. spark_session.createDataFrame(rdd, schema).collect()

    结果为:

    1. [Row(name=u'Alice', age=1)]
  5. 通过字符串指定schema

    1. rdd = sc.parallelize([('Alice', 1)])
    2. spark_session.createDataFrame(rdd, "a: string, b: int").collect()

    结果为:

    1. [Row(name=u'Alice', age=1)]
    • 如果只有一列,则字符串schema 为:

      1. rdd = sc.parallelize([1])
      2. spark_session.createDataFrame(rdd, "int").collect()

      结果为:

      1. [Row(value=1)]