9.8 其它
array(*cols)
:创新一个新的array
列。参数:
cols
:列名字符串列表,或者Column
列表。要求这些列具有同样的数据类型
示例:
df.select(array('age', 'age').alias("arr"))
df.select(array([df.age, df.age]).alias("arr"))
array_contains(col, value)
:创建一个新列,指示value
是否在array
中(由col
给定)其中
col
必须是array
类型。而value
是一个值,或者一个Column
或者列名。判断逻辑:
- 如果
array
为null
,则返回null
; - 如果
value
位于array
中,则返回True
; - 如果
value
不在array
中,则返回False
- 如果
示例:
df = spark_session.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
df.select(array_contains(df.data, "a"))
create_map(*cols)
:创建一个map
列。参数:
cols
:列名字符串列表,或者Column
列表。这些列组成了键值对。如(key1,value1,key2,value2,...)
示例:
df.select(create_map('name', 'age').alias("map")).collect()
#[Row(map={u'Alice': 2}), Row(map={u'Bob': 5})]
broadcast(df)
:标记df
这个Dataframe
足够小,从而应用于broadcast join
参数:
df
:一个Dataframe
对象
coalesce(*cols)
:返回第一个非null
的列组成的Column
。如果都为null
,则返回null
参数:
cols
:列名字符串列表,或者Column
列表。
crc32(col)
:计算二进制列的CRC32
校验值。要求col
是二进制列。explode(col)
:将一个array
或者map
列拆成多行。要求col
是一个array
或者map
列。示例:
eDF = spark_session.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
eDF.select(explode(eDF.intlist).alias("anInt")).collect()
# 结果为:[Row(anInt=1), Row(anInt=2), Row(anInt=3)]
eDF.select(explode(eDF.mapfield).alias("key", "value")).show()
#结果为:
# +---+-----+
# |key|value|
# +---+-----+
# | a| b|
# +---+-----+
posexplode(col)
: 对指定array
或者map
中的每个元素,依据每个位置返回新的一行。要求
col
是一个array
或者map
列。示例:
eDF = spark_session.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
eDF.select(posexplode(eDF.intlist)).collect()
#结果为:[Row(pos=0, col=1), Row(pos=1, col=2), Row(pos=2, col=3)]
expr(str)
:计算表达式。参数:
str
:一个表达式。如length(name)
from_json(col,schema,options={})
:解析一个包含JSON
字符串的列。如果遇到无法解析的字符串,则返回null
。参数:
col
:一个字符串列,字符串是json
格式schema
:一个StructType
(表示解析一个元素),或者StructType
的ArrayType
(表示解析一组元素)options
:用于控制解析过程。
示例:
from pyspark.sql.types import *
schema = StructType([StructField("a", IntegerType())])
df = spark_session.createDataFrame([(1, '{"a": 1}')], ("key", "value"))
df.select(from_json(df.value, schema).alias("json")).collect()
#结果为:[Row(json=Row(a=1))]
get_json_object(col,path)
:从json
字符串中提取指定的字段。如果json
字符串无效,则返回null
.参数:
col
:包含json
格式的字符串的列。path
:json
的字段的路径。
示例:
data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value12"}''')]
df = spark_session.createDataFrame(data, ("key", "jstring"))
df.select(df.key, get_json_object(df.jstring, '$.f1').alias("c0"),
get_json_object(df.jstring, '$.f2').alias("c1") ).collect()
# 结果为:[Row(key=u'1', c0=u'value1', c1=u'value2'), Row(key=u'2', c0=u'value12', c1=None)]
greatest(*cols)
:返回指定的一堆列中的最大值。要求至少包含2列。它会跳过
null
值。如果都是null
值,则返回null
。least(*cols)
:返回指定的一堆列中的最小值。要求至少包含2列。它会跳过
null
值。如果都是null
值,则返回null
。json_tuple(col,*fields)
:从json
列中抽取字段组成新列(抽取n
个字段,则生成n
列)参数:
col
:一个json
字符串列fields
:一组字符串,给出了json
中待抽取的字段
lit(col)
:创建一个字面量值的列monotonically_increasing_id()
:创建一个单调递增的id
列(64位整数)。它可以确保结果是单调递增的,并且是
unique
的,但是不保证是连续的。它隐含两个假设:
- 假设
dataframe
分区数量少于1 billion
- 假设每个分区的记录数量少于
8 billion
- 假设
nanvl(col1,col2)
:如果col1
不是NaN
,则返回col1
;否则返回col2
。要求
col1
和col2
都是浮点列(DoubleType
或者FloatType
)size(col)
:计算array/map
列的长度(元素个数)。sort_array(col,asc=True)
: 对array
列中的array
进行排序(排序的方式是自然的顺序)参数:
col
:一个字符串或者Column
, 指定一个array
列asc
: 如果为True
,则是升序;否则是降序
spark_partition_id()
:返回一个partition ID
列该方法产生的结果依赖于数据划分和任务调度,因此是未确定结果的。
struct(*cols)
:创建一个新的struct
列。参数:
cols
:一个字符串列表(指定了列名),或者一个Column
列表
示例:
df.select(struct('age', 'name').alias("struct")).collect()
# [Row(struct=Row(age=2, name=u'Alice')), Row(struct=Row(age=5, name=u'Bob'))]
to_json(col,options={})
:将包含StructType
或者Arrytype
的StructType
转换为json
字符串。如果遇到不支持的类型,则抛出异常。参数:
col
:一个字符串或者Column
,表示待转换的列options
:转换选项。它支持和json datasource
同样的选项
udf(f=None,returnType=StringType)
:根据用户定义函数(UDF
) 来创建一列。参数:
f
:一个python
函数,它接受一个参数returnType
:一个pyspqrk.sql.types.DataType
类型,表示udf
的返回类型
示例:
from pyspark.sql.types import IntegerType
slen = udf(lambda s: len(s), IntegerType())
df.select(slen("name").alias("slen_name"))
when(condition,value)
: 对一系列条件求值,返回其中匹配的哪个结果。如果
Column.otherwise()
未被调用,则当未匹配时,返回None
;如果Column.otherwise()
被调用,则当未匹配时,返回otherwise()
的结果。参数:
condition
:一个布尔列value
:一个字面量值,或者一个Column
示例:
df.select(when(df['age'] == 2, 3).otherwise(4).alias("age")).collect()
# [Row(age=3), Row(age=4)]