JDBC Connector

This connector provides a sink that writes data to a JDBC database.

To use it, add the following dependency to your project (along with your JDBC-driver):

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-jdbc_2.11</artifactId>
  4. <version>1.11.0</version>
  5. </dependency>

Note that the streaming connectors are currently NOT part of the binary distribution. See how to link with them for cluster execution here.

Created JDBC sink provides at-least-once guarantee. Effectively exactly-once can be achived using upsert statements or idempotent updates.

Example usage:

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env
  3. .fromElements(...)
  4. .addSink(JdbcSink.sink(
  5. "insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
  6. (ps, t) -> {
  7. ps.setInt(1, t.id);
  8. ps.setString(2, t.title);
  9. ps.setString(3, t.author);
  10. ps.setDouble(4, t.price);
  11. ps.setInt(5, t.qty);
  12. },
  13. new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
  14. .withUrl(getDbMetadata().getUrl())
  15. .withDriverName(getDbMetadata().getDriverClass())
  16. .build()));
  17. env.execute();

Please refer to the API documentation for more details.