CSV Format

Format: Serialization Schema Format: Deserialization Schema

CSV Format 允许我们基于 CSV schema 进行解析和生成 CSV 数据。 目前 CSV schema 是基于 table schema 推断而来的。

依赖

为了建立CSV格式,下列的表格提供了为项目使用自动化工具(例如Maven或者SBT)以及SQL客户端使用SQL JAR包的依赖信息。

Maven依赖SQL 客户端 JAR
flink-csv内置

如何创建使用 CSV 格式的表

以下是一个使用 Kafka 连接器和 CSV 格式创建表的示例。

  1. CREATE TABLE user_behavior (
  2. user_id BIGINT,
  3. item_id BIGINT,
  4. category_id BIGINT,
  5. behavior STRING,
  6. ts TIMESTAMP(3)
  7. ) WITH (
  8. 'connector' = 'kafka',
  9. 'topic' = 'user_behavior',
  10. 'properties.bootstrap.servers' = 'localhost:9092',
  11. 'properties.group.id' = 'testGroup',
  12. 'format' = 'csv',
  13. 'csv.ignore-parse-errors' = 'true',
  14. 'csv.allow-comments' = 'true'
  15. )

Format 参数

参数是否必选默认值类型描述
format
必选(none)String指定要使用的格式,这里应该是 ‘csv’
csv.field-delimiter
可选,String字段分隔符 (默认‘,’)。
csv.line-delimiter
可选\nString行分隔符, 默认\n。注意 \n\r 是不可见的特殊符号, 在显式的 SQL 语句中必须使用 unicode 编码。
  • 例如 ‘csv.line-delimiter’ = U&’\000D’ 使用换行符号 \r 作为行分隔符。
  • 例如 ‘csv.line-delimiter’ = U&’\000A’ 使用换行符号 \n 作为行分隔符。
csv.disable-quote-character
可选falseBoolean关闭对引用的值使用引号 (默认是 false).如果允许,选项 ‘csv.quote-character’ 必须被设置。
csv.quote-character
可选String用于围住字段值的引号字符 (默认).
csv.allow-comments
可选falseBoolean是否允许忽略注释行(默认不允许),注释行以 ‘#’ 作为起始字符。 如果允许注释行,请确保 csv.ignore-parse-errors 也开启了从而允许空行。
csv.ignore-parse-errors
可选falseBoolean当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null
csv.array-element-delimiter
可选;String分隔数组和行元素的字符串(默认‘;’).
csv.escape-character
可选(none)String转义字符(默认关闭).
csv.null-literal
可选(none)String是否将 “null” 字符串转化为 null 值。

数据类型映射

目前 CSV 的 schema 都是从 table schema 推断而来的。显式地定义 CSV schema 暂不支持。 Flink 的 CSV Format 数据使用 jackson databind API 去解析 CSV 字符串。

下面的表格列出了flink数据和CSV数据的对应关系。

Flink SQL 类型CSV 类型
CHAR / VARCHAR / STRINGstring
BOOLEANboolean
BINARY / VARBINARYstring with encoding: base64
DECIMALnumber
TINYINTnumber
SMALLINTnumber
INTnumber
BIGINTnumber
FLOATnumber
DOUBLEnumber
DATEstring with format: date
TIMEstring with format: time
TIMESTAMPstring with format: date-time
INTERVALnumber
ARRAYarray
ROWobject