JDBC Connector

该连接器可以向 JDBC 数据库写入数据。

添加下面的依赖以便使用该连接器(同时添加 JDBC 驱动):

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-jdbc</artifactId>
  4. <version>3.1.2-1.17</version>
  5. </dependency>

Copied to clipboard!

注意该连接器目前还 不是 二进制发行版的一部分,如何在集群中运行请参考 这里

已创建的 JDBC Sink 能够保证至少一次的语义。 更有效的精确执行一次可以通过 upsert 语句或幂等更新实现。

用法示例:

Java

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env
  3. .fromElements(...)
  4. .addSink(JdbcSink.sink(
  5. "insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
  6. (ps, t) -> {
  7. ps.setInt(1, t.id);
  8. ps.setString(2, t.title);
  9. ps.setString(3, t.author);
  10. ps.setDouble(4, t.price);
  11. ps.setInt(5, t.qty);
  12. },
  13. new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
  14. .withUrl(getDbMetadata().getUrl())
  15. .withDriverName(getDbMetadata().getDriverClass())
  16. .build()));
  17. env.execute();

Python

  1. env = StreamExecutionEnvironment.get_execution_environment()
  2. type_info = Types.ROW([Types.INT(), Types.STRING(), Types.STRING(), Types.INT()])
  3. env.from_collection(
  4. [(101, "Stream Processing with Apache Flink", "Fabian Hueske, Vasiliki Kalavri", 2019),
  5. (102, "Streaming Systems", "Tyler Akidau, Slava Chernyak, Reuven Lax", 2018),
  6. (103, "Designing Data-Intensive Applications", "Martin Kleppmann", 2017),
  7. (104, "Kafka: The Definitive Guide", "Gwen Shapira, Neha Narkhede, Todd Palino", 2017)
  8. ], type_info=type_info) \
  9. .add_sink(
  10. JdbcSink.sink(
  11. "insert into books (id, title, authors, year) values (?, ?, ?, ?)",
  12. type_info,
  13. JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
  14. .with_url('jdbc:postgresql://dbhost:5432/postgresdb')
  15. .with_driver_name('org.postgresql.Driver')
  16. .with_user_name('someUser')
  17. .with_password('somePassword')
  18. .build()
  19. ))
  20. env.execute()

更多细节请查看 API documentation 。