JDBC Connector

This connector provides a sink that writes data to a JDBC database.

To use it, add the following dependency to your project (along with your JDBC driver):

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-jdbc_2.11</artifactId>
  4. <version>1.13.0</version>
  5. </dependency>

Copied to clipboard!

Note that the streaming connectors are currently NOT part of the binary distribution. See how to link with them for cluster execution here.

JdbcSink.sink

The JDBC sink provides at-least-once guarantee. Effectively though, exactly-once can be achieved by crafting upsert SQL statements or idempotent SQL updates. Configuration goes as follow (see also JdbcSink javadoc ).

  1. JdbcSink.sink(
  2. sqlDmlStatement, // mandatory
  3. jdbcStatementBuilder, // mandatory
  4. jdbcExecutionOptions, // optional
  5. jdbcConnectionOptions // mandatory
  6. );

SQL DML statement and JDBC statement builder

The sink builds one JDBC prepared statement from a user-provider SQL string, e.g.:

  1. INSERT INTO some_table field1, field2 values (?, ?)

It then repeatedly calls a user-provided function to update that prepared statement with each value of the stream, e.g.:

  1. (preparedStatement, someRecord) -> { ... update here the preparedStatement with values from someRecord ... }

JDBC execution options

The SQL DML statements are executed in batches, which can optionally be configured with the following instance (see also JdbcExecutionOptions javadoc )

  1. JdbcExecutionOptions.builder()
  2. .withBatchIntervalMs(200) // optional: default = 0, meaning no time-based execution is done
  3. .withBathSize(1000) // optional: default = 5000 values
  4. .withMaxRetries(5) // optional: default = 3
  5. .build()

A JDBC batch is executed as soon as one of the following conditions is true:

  • the configured batch interval time is elapsed
  • the maximum batch size is reached
  • a Flink checkpoint has started

JDBC connection parameters

The connection to the database is configured with a JdbcConnectionOptions instance. Please see JdbcConnectionOptions javadoc for details

Full example

  1. public class JdbcSinkExample {
  2. static class Book {
  3. public Book(Long id, String title, String authors, Integer year) {
  4. this.id = id;
  5. this.title = title;
  6. this.authors = authors;
  7. this.year = year;
  8. }
  9. final Long id;
  10. final String title;
  11. final String authors;
  12. final Integer year;
  13. }
  14. public static void main(String[] args) throws Exception {
  15. var env = StreamExecutionEnvironment.getExecutionEnvironment();
  16. env.fromElements(
  17. new Book(101L, "Stream Processing with Apache Flink", "Fabian Hueske, Vasiliki Kalavri", 2019),
  18. new Book(102L, "Streaming Systems", "Tyler Akidau, Slava Chernyak, Reuven Lax", 2018),
  19. new Book(103L, "Designing Data-Intensive Applications", "Martin Kleppmann", 2017),
  20. new Book(104L, "Kafka: The Definitive Guide", "Gwen Shapira, Neha Narkhede, Todd Palino", 2017)
  21. ).addSink(
  22. JdbcSink.sink(
  23. "insert into books (id, title, authors, year) values (?, ?, ?, ?)",
  24. (statement, book) -> {
  25. statement.setLong(1, book.id);
  26. statement.setString(2, book.title);
  27. statement.setString(3, book.authors);
  28. statement.setInt(4, book.year);
  29. },
  30. JdbcExecutionOptions.builder()
  31. .withBatchSize(1000)
  32. .withBatchIntervalMs(200)
  33. .withMaxRetries(5)
  34. .build(),
  35. new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
  36. .withUrl("jdbc:postgresql://dbhost:5432/postgresdb")
  37. .withDriverName("org.postgresql.Driver")
  38. .withUsername("someUser")
  39. .withPassword("somePassword")
  40. .build()
  41. ));
  42. env.execute();
  43. }
  44. }

JdbcSink.exactlyOnceSink

Since 1.13, Flink JDBC sink supports exactly-once mode. The implementation relies on the JDBC driver support of XA standard.

Attention: In 1.13, Flink JDBC sink does not support exactly-once mode with MySQL or other databases that do not support multiple XA transaction per connection. We will improve the support in FLINK-22239.

To use it, create a sink using exactlyOnceSink() method as above and additionally provide:

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env
  3. .fromElements(...)
  4. .addSink(JdbcSink.exactlyOnceSink(
  5. "insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
  6. (ps, t) -> {
  7. ps.setInt(1, t.id);
  8. ps.setString(2, t.title);
  9. ps.setString(3, t.author);
  10. ps.setDouble(4, t.price);
  11. ps.setInt(5, t.qty);
  12. },
  13. JdbcExecutionOptions.builder().build(),
  14. JdbcExactlyOnceOptions.defaults(),
  15. () -> {
  16. // create a driver-specific XA DataSource
  17. EmbeddedXADataSource ds = new EmbeddedXADataSource();
  18. ds.setDatabaseName("my_db");
  19. return ds;
  20. });
  21. env.execute();

Please refer to the JdbcXaSinkFunction documentation for more details.