Writing Real-Time Data to MatrixOne Using Flink

Overview

Apache Flink is a powerful framework and distributed processing engine focusing on stateful computation. It is suitable for processing both unbounded and bounded data streams efficiently. Flink can operate efficiently in various common cluster environments and performs calculations at memory speed. It supports processing data of any scale.

Scenarios

  • Event-Driven Applications

    Event-driven applications typically have states and extract data from one or more event streams. They trigger computations, update states, or perform other external actions based on incoming events. Typical event-driven applications include anti-fraud systems, anomaly detection, rule-based alert systems, and business process monitoring.

  • Data Analytics Applications

    The primary goal of data analytics tasks is to extract valuable information and metrics from raw data. Flink supports streaming and batch analytics applications, making it suitable for various scenarios such as telecom network quality monitoring, product updates, and experiment evaluation analysis in mobile applications, real-time ad-hoc analysis in the consumer technology space, and large-scale graph analysis.

  • Data Pipeline Applications

    Extract, transform, load (ETL) is a standard method for transferring data between different storage systems. Data pipelines and ETL jobs are similar in that they can perform data transformation and enrichment and move data from one storage system to another. The difference is that data pipelines run in a continuous streaming mode rather than being triggered periodically. Typical data pipeline applications include real-time query index building in e-commerce and continuous ETL.

This document will introduce two examples. One involves using the Flink computing engine to write real-time data to MatrixOne, and the other uses the Flink computing engine to write streaming data to the MatrixOne database.

Before you start

Hardware Environment

The hardware requirements for this practice are as follows:

| Server Name | Server IP | Installed Software | Operating System | | node1 | 192.168.146.10 | MatrixOne | Debian11.1 x86 | | node2 | 192.168.146.12 | kafka | Centos7.9 | | node3 | 192.168.146.11 | IDEA,MYSQL | win10 |

Software Environment

This practice requires the installation and deployment of the following software environments:

Example 1: Migrating Data from MySQL to MatrixOne

Step 1: Initialize the Project

  1. Start IDEA, click File > New > Project, select Spring Initializer, and fill in the following configuration parameters:

    • Name:matrixone-flink-demo
    • Location:~\Desktop
    • Language:Java
    • Type:Maven
    • Group:com.example
    • Artifact:matrixone-flink-demo
    • Package name:com.matrixone.flink.demo
    • JDK 1.8

    Writing Real-Time Data to MatrixOne Using Flink - 图1

  2. Add project dependencies and edit the content of pom.xml in the project root directory as follows:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.matrixone.flink</groupId>
  7. <artifactId>matrixone-flink-demo</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <scala.binary.version>2.12</scala.binary.version>
  11. <java.version>1.8</java.version>
  12. <flink.version>1.17.0</flink.version>
  13. <scope.mode>compile</scope.mode>
  14. </properties>
  15. <dependencies>
  16. <!-- Flink Dependency -->
  17. <dependency>
  18. <groupId>org.apache.flink</groupId>
  19. <artifactId>flink-connector-hive_2.12</artifactId>
  20. <version>${flink.version}</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.apache.flink</groupId>
  24. <artifactId>flink-java</artifactId>
  25. <version>${flink.version}</version>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.apache.flink</groupId>
  29. <artifactId>flink-streaming-java</artifactId>
  30. <version>${flink.version}</version>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.apache.flink</groupId>
  34. <artifactId>flink-clients</artifactId>
  35. <version>${flink.version}</version>
  36. </dependency>
  37. <dependency>
  38. <groupId>org.apache.flink</groupId>
  39. <artifactId>flink-table-api-java-bridge</artifactId>
  40. <version>${flink.version}</version>
  41. </dependency>
  42. <dependency>
  43. <groupId>org.apache.flink</groupId>
  44. <artifactId>flink-table-planner_2.12</artifactId>
  45. <version>${flink.version}</version>
  46. </dependency>
  47. <!--JDBC related dependency packages-->
  48. <dependency>
  49. <groupId>org.apache.flink</groupId>
  50. <artifactId>flink-connector-jdbc</artifactId>
  51. <version>1.15.4</version>
  52. </dependency>
  53. <dependency>
  54. <groupId>mysql</groupId>
  55. <artifactId>mysql-connector-java</artifactId>
  56. <version>8.0.33</version>
  57. </dependency>
  58. <!-- Kafka related dependency packages -->
  59. <dependency>
  60. <groupId>org.apache.kafka</groupId>
  61. <artifactId>kafka_2.13</artifactId>
  62. <version>3.5.0</version>
  63. </dependency>
  64. <dependency>
  65. <groupId>org.apache.flink</groupId>
  66. <artifactId>flink-connector-kafka</artifactId>
  67. <version>3.0.0-1.17</version>
  68. </dependency>
  69. <!-- JSON -->
  70. <dependency>
  71. <groupId>com.alibaba.fastjson2</groupId>
  72. <artifactId>fastjson2</artifactId>
  73. <version>2.0.34</version>
  74. </dependency>
  75. </dependencies>
  76. <build>
  77. <plugins>
  78. <plugin>
  79. <groupId>org.apache.maven.plugins</groupId>
  80. <artifactId>maven-compiler-plugin</artifactId>
  81. <version>3.8.0</version>
  82. <configuration>
  83. <source>${java.version}</source>
  84. <target>${java.version}</target>
  85. <encoding>UTF-8</encoding>
  86. </configuration>
  87. </plugin>
  88. <plugin>
  89. <artifactId>maven-assembly-plugin</artifactId>
  90. <version>2.6</version>
  91. <configuration>
  92. <descriptorRefs>
  93. <descriptor>jar-with-dependencies</descriptor>
  94. </descriptorRefs>
  95. </configuration>
  96. <executions>
  97. <execution>
  98. <id>make-assembly</id>
  99. <phase>package</phase>
  100. <goals>
  101. <goal>single</goal>
  102. </goals>
  103. </execution>
  104. </executions>
  105. </plugin>
  106. </plugins>
  107. </build>
  108. </project>

Step 2: Read MatrixOne Data

After connecting to MatrixOne using the MySQL client, create the necessary database and data tables for the demonstration.

  1. Create a database, tables and import data in MatrixOne:

    1. CREATE DATABASE test;
    2. USE test;
    3. CREATE TABLE `person` (`id` INT DEFAULT NULL, `name` VARCHAR(255) DEFAULT NULL, `birthday` DATE DEFAULT NULL);
    4. INSERT INTO test.person (id, name, birthday) VALUES(1, 'zhangsan', '2023-07-09'),(2, 'lisi', '2023-07-08'),(3, 'wangwu', '2023-07-12');
  2. In IDEA, create the MoRead.java class to read MatrixOne data using Flink:

    1. package com.matrixone.flink.demo;
    2. import org.apache.flink.api.common.functions.MapFunction;
    3. import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    4. import org.apache.flink.api.java.ExecutionEnvironment;
    5. import org.apache.flink.api.java.operators.DataSource;
    6. import org.apache.flink.api.java.operators.MapOperator;
    7. import org.apache.flink.api.java.typeutils.RowTypeInfo;
    8. import org.apache.flink.connector.jdbc.JdbcInputFormat;
    9. import org.apache.flink.types.Row;
    10. import java.text.SimpleDateFormat;
    11. /**
    12. * @author MatrixOne
    13. * @description
    14. */
    15. public class MoRead {
    16. private static String srcHost = "192.168.146.10";
    17. private static Integer srcPort = 6001;
    18. private static String srcUserName = "root";
    19. private static String srcPassword = "111";
    20. private static String srcDataBase = "test";
    21. public static void main(String[] args) throws Exception {
    22. ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
    23. // Set parallelism
    24. environment.setParallelism(1);
    25. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
    26. // Set query field type
    27. RowTypeInfo rowTypeInfo = new RowTypeInfo(
    28. new BasicTypeInfo[]{
    29. BasicTypeInfo.INT_TYPE_INFO,
    30. BasicTypeInfo.STRING_TYPE_INFO,
    31. BasicTypeInfo.DATE_TYPE_INFO
    32. },
    33. new String[]{
    34. "id",
    35. "name",
    36. "birthday"
    37. }
    38. );
    39. DataSource<Row> dataSource = environment.createInput(JdbcInputFormat.buildJdbcInputFormat()
    40. .setDrivername("com.mysql.cj.jdbc.Driver")
    41. .setDBUrl("jdbc:mysql://" + srcHost + ":" + srcPort + "/" + srcDataBase)
    42. .setUsername(srcUserName)
    43. .setPassword(srcPassword)
    44. .setQuery("select * from person")
    45. .setRowTypeInfo(rowTypeInfo)
    46. .finish());
    47. // Convert Wed Jul 12 00:00:00 CST 2023 date format to 2023-07-12
    48. MapOperator<Row, Row> mapOperator = dataSource.map((MapFunction<Row, Row>) row -> {
    49. row.setField("birthday", sdf.format(row.getField("birthday")));
    50. return row;
    51. });
    52. mapOperator.print();
    53. }
    54. }
  3. Run MoRead.Main() in IDEA, the result is as below:

    Writing Real-Time Data to MatrixOne Using Flink - 图2

Step 3: Write MySQL Data to MatrixOne

Now, you can begin migrating MySQL data to MatrixOne using Flink.

  1. Prepare MySQL data: On node3, use the MySQL client to connect to the local MySQL instance. Create the necessary database, tables, and insert data:

    1. mysql -h127.0.0.1 -P3306 -uroot -proot
    2. mysql> CREATE DATABASE motest;
    3. mysql> USE motest;
    4. mysql> CREATE TABLE `person` (`id` int DEFAULT NULL, `name` varchar(255) DEFAULT NULL, `birthday` date DEFAULT NULL);
    5. mysql> INSERT INTO motest.person (id, name, birthday) VALUES(2, 'lisi', '2023-07-09'),(3, 'wangwu', '2023-07-13'),(4, 'zhaoliu', '2023-08-08');
  2. Clear MatrixOne table data:

    On node3, use the MySQL client to connect to the local MatrixOne instance. Since this example continues to use the test database from the previous MatrixOne data reading example, you need to clear the data from the person table first.

    1. -- On node3, use the MySQL client to connect to the local MatrixOne
    2. mysql -h192.168.146.10 -P6001 -uroot -p111
    3. mysql> TRUNCATE TABLE test.person;
  3. Write code in IDEA:

    Create the Person.java and Mysql2Mo.java classes to use Flink to read MySQL data. Refer to the following example for the Mysql2Mo.java class code:

  1. package com.matrixone.flink.demo.entity;
  2. import java.util.Date;
  3. public class Person {
  4. private int id;
  5. private String name;
  6. private Date birthday;
  7. public int getId() {
  8. return id;
  9. }
  10. public void setId(int id) {
  11. this.id = id;
  12. }
  13. public String getName() {
  14. return name;
  15. }
  16. public void setName(String name) {
  17. this.name = name;
  18. }
  19. public Date getBirthday() {
  20. return birthday;
  21. }
  22. public void setBirthday(Date birthday) {
  23. this.birthday = birthday;
  24. }
  25. }
  1. package com.matrixone.flink.demo;
  2. import com.matrixone.flink.demo.entity.Person;
  3. import org.apache.flink.api.common.functions.MapFunction;
  4. import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
  5. import org.apache.flink.api.java.typeutils.RowTypeInfo;
  6. import org.apache.flink.connector.jdbc.*;
  7. import org.apache.flink.streaming.api.datastream.DataStreamSink;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.types.Row;
  12. import java.sql.Date;
  13. /**
  14. * @author MatrixOne
  15. * @description
  16. */
  17. public class Mysql2Mo {
  18. private static String srcHost = "127.0.0.1";
  19. private static Integer srcPort = 3306;
  20. private static String srcUserName = "root";
  21. private static String srcPassword = "root";
  22. private static String srcDataBase = "motest";
  23. private static String destHost = "192.168.146.10";
  24. private static Integer destPort = 6001;
  25. private static String destUserName = "root";
  26. private static String destPassword = "111";
  27. private static String destDataBase = "test";
  28. private static String destTable = "person";
  29. public static void main(String[] args) throws Exception {
  30. StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
  31. //Set parallelism
  32. environment.setParallelism(1);
  33. //Set query field type
  34. RowTypeInfo rowTypeInfo = new RowTypeInfo(
  35. new BasicTypeInfo[]{
  36. BasicTypeInfo.INT_TYPE_INFO,
  37. BasicTypeInfo.STRING_TYPE_INFO,
  38. BasicTypeInfo.DATE_TYPE_INFO
  39. },
  40. new String[]{
  41. "id",
  42. "name",
  43. "birthday"
  44. }
  45. );
  46. // add srouce
  47. DataStreamSource<Row> dataSource = environment.createInput(JdbcInputFormat.buildJdbcInputFormat()
  48. .setDrivername("com.mysql.cj.jdbc.Driver")
  49. .setDBUrl("jdbc:mysql://" + srcHost + ":" + srcPort + "/" + srcDataBase)
  50. .setUsername(srcUserName)
  51. .setPassword(srcPassword)
  52. .setQuery("select * from person")
  53. .setRowTypeInfo(rowTypeInfo)
  54. .finish());
  55. //run ETL
  56. SingleOutputStreamOperator<Person> mapOperator = dataSource.map((MapFunction<Row, Person>) row -> {
  57. Person person = new Person();
  58. person.setId((Integer) row.getField("id"));
  59. person.setName((String) row.getField("name"));
  60. person.setBirthday((java.util.Date)row.getField("birthday"));
  61. return person;
  62. });
  63. //set matrixone sink information
  64. mapOperator.addSink(
  65. JdbcSink.sink(
  66. "insert into " + destTable + " values(?,?,?)",
  67. (ps, t) -> {
  68. ps.setInt(1, t.getId());
  69. ps.setString(2, t.getName());
  70. ps.setDate(3, new Date(t.getBirthday().getTime()));
  71. },
  72. new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
  73. .withDriverName("com.mysql.cj.jdbc.Driver")
  74. .withUrl("jdbc:mysql://" + destHost + ":" + destPort + "/" + destDataBase)
  75. .withUsername(destUserName)
  76. .withPassword(destPassword)
  77. .build()
  78. )
  79. );
  80. environment.execute();
  81. }
  82. }

Step 4: View the Execution Results

Execute the following SQL in MatrixOne to view the execution results:

  1. mysql> select * from test.person;
  2. +------+---------+------------+
  3. | id | name | birthday |
  4. +------+---------+------------+
  5. | 2 | lisi | 2023-07-09 |
  6. | 3 | wangwu | 2023-07-13 |
  7. | 4 | zhaoliu | 2023-08-08 |
  8. +------+---------+------------+
  9. 3 rows in set (0.01 sec)

Example 2: Importing Kafka data to MatrixOne

Step 1: Start the Kafka Service

Kafka cluster coordination and metadata management can be achieved using KRaft or ZooKeeper. Here, we will use Kafka version 3.5.0, eliminating the need for a standalone ZooKeeper software and utilizing Kafka’s built-in KRaft for metadata management. Follow the steps below to configure the settings. The configuration file can be found in the Kafka software’s root directory under config/kraft/server.properties.

The configuration file is as follows:

  1. # Licensed to the Apache Software Foundation (ASF) under one or more
  2. # contributor license agreements. See the NOTICE file distributed with
  3. # this work for additional information regarding copyright ownership.
  4. # The ASF licenses this file to You under the Apache License, Version 2.0
  5. # (the "License"); you may not use this file except in compliance with
  6. # the License. You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # This configuration file is intended for use in KRaft mode, where
  17. # Apache ZooKeeper is not present. See config/kraft/README.md for details.
  18. #
  19. ############################# Server Basics #############################
  20. # The role of this server. Setting this puts us in KRaft mode
  21. process.roles=broker,controller
  22. # The node id associated with this instance's roles
  23. node.id=1
  24. # The connect string for the controller quorum
  25. controller.quorum.voters=1@192.168.146.12:9093
  26. ############################# Socket Server Settings #############################
  27. # The address the socket server listens on.
  28. # Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum.
  29. # If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(),
  30. # with PLAINTEXT listener name, and port 9092.
  31. # FORMAT:
  32. # listeners = listener_name://host_name:port
  33. # EXAMPLE:
  34. # listeners = PLAINTEXT://your.host.name:9092
  35. #listeners=PLAINTEXT://:9092,CONTROLLER://:9093
  36. listeners=PLAINTEXT://192.168.146.12:9092,CONTROLLER://192.168.146.12:9093
  37. # Name of listener used for communication between brokers.
  38. inter.broker.listener.name=PLAINTEXT
  39. # Listener name, hostname and port the broker will advertise to clients.
  40. # If not set, it uses the value for "listeners".
  41. #advertised.listeners=PLAINTEXT://localhost:9092
  42. # A comma-separated list of the names of the listeners used by the controller.
  43. # If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
  44. # This is required if running in KRaft mode.
  45. controller.listener.names=CONTROLLER
  46. # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
  47. listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
  48. # The number of threads that the server uses for receiving requests from the network and sending responses to the network
  49. num.network.threads=3
  50. # The number of threads that the server uses for processing requests, which may include disk I/O
  51. num.io.threads=8
  52. # The send buffer (SO_SNDBUF) used by the socket server
  53. socket.send.buffer.bytes=102400
  54. # The receive buffer (SO_RCVBUF) used by the socket server
  55. socket.receive.buffer.bytes=102400
  56. # The maximum size of a request that the socket server will accept (protection against OOM)
  57. socket.request.max.bytes=104857600
  58. ############################# Log Basics #############################
  59. # A comma separated list of directories under which to store log files
  60. log.dirs=/home/software/kafka_2.13-3.5.0/kraft-combined-logs
  61. # The default number of log partitions per topic. More partitions allow greater
  62. # parallelism for consumption, but this will also result in more files across
  63. # the brokers.
  64. num.partitions=1
  65. # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
  66. # This value is recommended to be increased for installations with data dirs located in RAID array.
  67. num.recovery.threads.per.data.dir=1
  68. ############################# Internal Topic Settings #############################
  69. # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
  70. # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
  71. offsets.topic.replication.factor=1
  72. transaction.state.log.replication.factor=1
  73. transaction.state.log.min.isr=1
  74. ############################# Log Flush Policy #############################
  75. # Messages are immediately written to the filesystem but by default we only fsync() to sync
  76. # the OS cache lazily. The following configurations control the flush of data to disk.
  77. # There are a few important trade-offs here:
  78. # 1. Durability: Unflushed data may be lost if you are not using replication.
  79. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
  80. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
  81. # The settings below allow one to configure the flush policy to flush data after a period of time or
  82. # every N messages (or both). This can be done globally and overridden on a per-topic basis.
  83. # The number of messages to accept before forcing a flush of data to disk
  84. #log.flush.interval.messages=10000
  85. # The maximum amount of time a message can sit in a log before we force a flush
  86. #log.flush.interval.ms=1000
  87. ############################# Log Retention Policy #############################
  88. # The following configurations control the disposal of log segments. The policy can
  89. # be set to delete segments after a period of time, or after a given size has accumulated.
  90. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
  91. # from the end of the log.
  92. # The minimum age of a log file to be eligible for deletion due to age
  93. log.retention.hours=72
  94. # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
  95. # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
  96. #log.retention.bytes=1073741824
  97. # The maximum size of a log segment file. When this size is reached a new log segment will be created.
  98. log.segment.bytes=1073741824
  99. # The interval at which log segments are checked to see if they can be deleted according
  100. # to the retention policies
  101. log.retention.check.interval.ms=300000

After the file configuration is completed, execute the following command to start the Kafka service:

  1. #Generate cluster ID
  2. $ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
  3. #Set log directory format
  4. $ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
  5. #Start Kafka service
  6. $ bin/kafka-server-start.sh config/kraft/server.properties

Step 2: Create a Kafka Topic

To enable Flink to read data from and write data to MatrixOne, we first need to create a Kafka topic named “matrixone.” In the command below, use the --bootstrap-server parameter to specify the Kafka service’s listening address as 192.168.146.12:9092:

  1. $ bin/kafka-topics.sh --create --topic matrixone --bootstrap-server 192.168.146.12:9092

Step 3: Read MatrixOne Data

After connecting to the MatrixOne database, perform the following steps to create the necessary database and tables:

  1. Create a database, and tables and import data in MatrixOne:

    1. CREATE TABLE `users` (
    2. `id` INT DEFAULT NULL,
    3. `name` VARCHAR(255) DEFAULT NULL,
    4. `age` INT DEFAULT NULL
    5. )
  2. Write code in the IDEA integrated development environment:

    In IDEA, create two classes: User.java and Kafka2Mo.java. These classes read from Kafka and write data to the MatrixOne database using Flink.

  1. package com.matrixone.flink.demo.entity;
  2. public class User {
  3. private int id;
  4. private String name;
  5. private int age;
  6. public int getId() {
  7. return id;
  8. }
  9. public void setId(int id) {
  10. this.id = id;
  11. }
  12. public String getName() {
  13. return name;
  14. }
  15. public void setName(String name) {
  16. this.name = name;
  17. }
  18. public int getAge() {
  19. return age;
  20. }
  21. public void setAge(int age) {
  22. this.age = age;
  23. }
  24. }
  1. package com.matrixone.flink.demo;
  2. import com.alibaba.fastjson2.JSON;
  3. import com.matrixone.flink.demo.entity.User;
  4. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  5. import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
  6. import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
  7. import org.apache.flink.connector.jdbc.JdbcSink;
  8. import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
  9. import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
  10. import org.apache.flink.connector.kafka.source.KafkaSource;
  11. import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
  12. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  13. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  14. import org.apache.kafka.clients.consumer.OffsetResetStrategy;
  15. import java.nio.charset.StandardCharsets;
  16. /**
  17. * @author MatrixOne
  18. * @desc
  19. */
  20. public class Kafka2Mo {
  21. private static String srcServer = "192.168.146.12:9092";
  22. private static String srcTopic = "matrixone";
  23. private static String consumerGroup = "matrixone_group";
  24. private static String destHost = "192.168.146.10";
  25. private static Integer destPort = 6001;
  26. private static String destUserName = "root";
  27. private static String destPassword = "111";
  28. private static String destDataBase = "test";
  29. private static String destTable = "person";
  30. public static void main(String[] args) throws Exception {
  31. //Initialize environment
  32. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  33. //Set parallelism
  34. env.setParallelism(1);
  35. //Set kafka source information
  36. KafkaSource<User> source = KafkaSource.<User>builder()
  37. //Kafka service
  38. .setBootstrapServers(srcServer)
  39. //Message topic
  40. .setTopics(srcTopic)
  41. //Consumer group
  42. .setGroupId(consumerGroup)
  43. //Offset When no offset is submitted, consumption starts from the beginning.
  44. .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
  45. //Customized parsing message content
  46. .setValueOnlyDeserializer(new AbstractDeserializationSchema<User>() {
  47. @Override
  48. public User deserialize(byte[] message) {
  49. return JSON.parseObject(new String(message, StandardCharsets.UTF_8), User.class);
  50. }
  51. })
  52. .build();
  53. DataStreamSource<User> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka_maxtixone");
  54. //kafkaSource.print();
  55. //set matrixone sink information
  56. kafkaSource.addSink(JdbcSink.sink(
  57. "insert into users (id,name,age) values(?,?,?)",
  58. (JdbcStatementBuilder<User>) (preparedStatement, user) -> {
  59. preparedStatement.setInt(1, user.getId());
  60. preparedStatement.setString(2, user.getName());
  61. preparedStatement.setInt(3, user.getAge());
  62. },
  63. JdbcExecutionOptions.builder()
  64. //default value is 5000
  65. .withBatchSize(1000)
  66. //default value is 0
  67. .withBatchIntervalMs(200)
  68. //Maximum attempts
  69. .withMaxRetries(5)
  70. .build(),
  71. JdbcConnectorOptions.builder()
  72. .setDBUrl("jdbc:mysql://"+destHost+":"+destPort+"/"+destDataBase)
  73. .setUsername(destUserName)
  74. .setPassword(destPassword)
  75. .setDriverName("com.mysql.cj.jdbc.Driver")
  76. .setTableName(destTable)
  77. .build()
  78. ));
  79. env.execute();
  80. }
  81. }

After writing the code, you can run the Flink task by selecting the Kafka2Mo.java file in IDEA and executing Kafka2Mo.Main().

Step 4: Generate data

You can add data to Kafka’s “matrixone” topic using the command-line producer tools provided by Kafka. In the following command, use the --topic parameter to specify the topic to add to and the --bootstrap-server parameter to determine the listening address of the Kafka service.

  1. bin/kafka-console-producer.sh --topic matrixone --bootstrap-server 192.168.146.12:9092

After executing the above command, you will wait for the message content to be entered on the console. Enter the message values ​​(values) directly, with each line representing one message (separated by newline characters), as follows:

  1. {"id": 10, "name": "xiaowang", "age": 22}
  2. {"id": 20, "name": "xiaozhang", "age": 24}
  3. {"id": 30, "name": "xiaogao", "age": 18}
  4. {"id": 40, "name": "xiaowu", "age": 20}
  5. {"id": 50, "name": "xiaoli", "age": 42}

Writing Real-Time Data to MatrixOne Using Flink - 图3

Step 5: View execution results

Execute the following SQL query results in MatrixOne:

  1. mysql> select * from test.users;
  2. +------+-----------+------+
  3. | id | name | age |
  4. +------+-----------+------+
  5. | 10 | xiaowang | 22 |
  6. | 20 | xiaozhang | 24 |
  7. | 30 | xiaogao | 18 |
  8. | 40 | xiaowu | 20 |
  9. | 50 | xiaoli | 42 |
  10. +------+-----------+------+
  11. 5 rows in set (0.01 sec)