CSV Format

Format: Serialization Schema Format: Deserialization Schema

The CSV format allows to read and write CSV data based on an CSV schema. Currently, the CSV schema is derived from table schema.

Dependencies

In order to setup the CSV format, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependencySQL Client JAR
flink-csvBuilt-in

How to create a table with CSV format

Here is an example to create a table using Kafka connector and CSV format.

  1. CREATE TABLE user_behavior (
  2. user_id BIGINT,
  3. item_id BIGINT,
  4. category_id BIGINT,
  5. behavior STRING,
  6. ts TIMESTAMP(3)
  7. ) WITH (
  8. 'connector' = 'kafka',
  9. 'topic' = 'user_behavior',
  10. 'properties.bootstrap.servers' = 'localhost:9092',
  11. 'properties.group.id' = 'testGroup',
  12. 'format' = 'csv',
  13. 'csv.ignore-parse-errors' = 'true',
  14. 'csv.allow-comments' = 'true'
  15. )

Format Options

OptionRequiredDefaultTypeDescription
format
required(none)StringSpecify what format to use, here should be ‘csv’.
csv.field-delimiter
optional,StringField delimiter character (‘,’ by default).
csv.line-delimiter
optional\nStringLine delimiter, \n by default. Note the \n and \r are invisible special characters, you have to use unicode to specify them in plain SQL.
  • e.g. ‘csv.line-delimiter’ = U&’\000D’ specifies the to use carriage return \r as line delimiter.
  • e.g. ‘csv.line-delimiter’ = U&’\000A’ specifies the to use line feed \n as line delimiter.
csv.disable-quote-character
optionalfalseBooleanDisabled quote character for enclosing field values (false by default). If true, option ‘csv.quote-character’ must be set.
csv.quote-character
optionalStringQuote character for enclosing field values ( by default).
csv.allow-comments
optionalfalseBooleanIgnore comment lines that start with ‘#’ (disabled by default). If enabled, make sure to also ignore parse errors to allow empty rows.
csv.ignore-parse-errors
optionalfalseBooleanSkip fields and rows with parse errors instead of failing. Fields are set to null in case of errors.
csv.array-element-delimiter
optional;StringArray element delimiter string for separating array and row element values (‘;’ by default).
csv.escape-character
optional(none)StringEscape character for escaping values (disabled by default).
csv.null-literal
optional(none)StringNull literal string that is interpreted as a null value (disabled by default).

Data Type Mapping

Currently, the CSV schema is always derived from table schema. Explicitly defining an CSV schema is not supported yet.

Flink CSV format uses jackson databind API to parse and generate CSV string.

The following table lists the type mapping from Flink type to CSV type.

Flink SQL typeCSV type
CHAR / VARCHAR / STRINGstring
BOOLEANboolean
BINARY / VARBINARYstring with encoding: base64
DECIMALnumber
TINYINTnumber
SMALLINTnumber
INTnumber
BIGINTnumber
FLOATnumber
DOUBLEnumber
DATEstring with format: date
TIMEstring with format: time
TIMESTAMPstring with format: date-time
INTERVALnumber
ARRAYarray
ROWobject