CSV Format

Format: Serialization Schema Format: Deserialization Schema

CSV Format 允许我们基于 CSV schema 进行解析和生成 CSV 数据。 目前 CSV schema 是基于 table schema 推断而来的。

依赖

In order to use the CSV format the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependencySQL Client
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-csv</artifactId>
  4. <version>1.16.0</version>
  5. </dependency>
Copied to clipboard!
Built-in

如何创建使用 CSV 格式的表

以下是一个使用 Kafka 连接器和 CSV 格式创建表的示例。

  1. CREATE TABLE user_behavior (
  2. user_id BIGINT,
  3. item_id BIGINT,
  4. category_id BIGINT,
  5. behavior STRING,
  6. ts TIMESTAMP(3)
  7. ) WITH (
  8. 'connector' = 'kafka',
  9. 'topic' = 'user_behavior',
  10. 'properties.bootstrap.servers' = 'localhost:9092',
  11. 'properties.group.id' = 'testGroup',
  12. 'format' = 'csv',
  13. 'csv.ignore-parse-errors' = 'true',
  14. 'csv.allow-comments' = 'true'
  15. )

Format 参数

参数是否必选默认值类型描述
format
必选(none)String指定要使用的格式,这里应该是 ‘csv’
csv.field-delimiter
可选,String字段分隔符 (默认‘,’),必须为单字符。你可以使用反斜杠字符指定一些特殊字符,例如 ‘\t’ 代表制表符。 你也可以通过 unicode 编码在纯 SQL 文本中指定一些特殊字符,例如 ‘csv.field-delimiter’ = U&’\0001’ 代表 0x01 字符。
csv.disable-quote-character
可选falseBoolean是否禁止对引用的值使用引号 (默认是 false). 如果禁止,选项 ‘csv.quote-character’ 不能设置。
csv.quote-character
可选String用于围住字段值的引号字符 (默认).
csv.allow-comments
可选falseBoolean是否允许忽略注释行(默认不允许),注释行以 ‘#’ 作为起始字符。 如果允许注释行,请确保 csv.ignore-parse-errors 也开启了从而允许空行。
csv.ignore-parse-errors
可选falseBoolean当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null
csv.array-element-delimiter
可选;String分隔数组和行元素的字符串(默认‘;’).
csv.escape-character
可选(none)String转义字符(默认关闭).
csv.null-literal
可选(none)String是否将 “null” 字符串转化为 null 值。
csv.write-bigdecimal-in-scientific-notation
可选trueBoolean设置将 Bigdecimal 类型的数据表示为科学计数法(默认为true,即需要转为科学计数法),例如一个BigDecimal的值为100000,设置true,结果为 ‘1E+5’;设置为false,结果为 100000。注意:只有当值不等于0且是10的倍数才会转为科学计数法

数据类型映射

目前 CSV 的 schema 都是从 table schema 推断而来的。显式地定义 CSV schema 暂不支持。 Flink 的 CSV Format 数据使用 jackson databind API 去解析 CSV 字符串。

下面的表格列出了flink数据和CSV数据的对应关系。

Flink SQL 类型CSV 类型
CHAR / VARCHAR / STRINGstring
BOOLEANboolean
BINARY / VARBINARYstring with encoding: base64
DECIMALnumber
TINYINTnumber
SMALLINTnumber
INTnumber
BIGINTnumber
FLOATnumber
DOUBLEnumber
DATEstring with format: date
TIMEstring with format: time
TIMESTAMPstring with format: date-time
INTERVALnumber
ARRAYarray
ROWobject