INSERT Statement

INSERT statements are used to add rows to a table.

Run an INSERT statement

INSERT statements are specified with the sqlUpdate() method of the TableEnvironment or executed in SQL CLI. The method sqlUpdate() for INSERT statements is a lazy execution, they will be executed only when TableEnvironment.execute(jobName) is invoked.

The following examples show how to run an INSERT statement in TableEnvironment and in SQL CLI.

  1. EnvironmentSettings settings = EnvironmentSettings.newInstance()...
  2. TableEnvironment tEnv = TableEnvironment.create(settings);
  3. // register a source table named "Orders" and a sink table named "RubberOrders"
  4. tEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)");
  5. tEnv.sqlUpdate("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH (...)");
  6. // run a SQL update query on the registered source table and emit the result to registered sink table
  7. tEnv.sqlUpdate(
  8. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
  1. val settings = EnvironmentSettings.newInstance()...
  2. val tEnv = TableEnvironment.create(settings)
  3. // register a source table named "Orders" and a sink table named "RubberOrders"
  4. tEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
  5. tEnv.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
  6. // run a SQL update query on the registered source table and emit the result to registered sink table
  7. tEnv.sqlUpdate(
  8. "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
  1. settings = EnvironmentSettings.newInstance()...
  2. table_env = TableEnvironment.create(settings)
  3. # register a source table named "Orders" and a sink table named "RubberOrders"
  4. table_env.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
  5. table_env.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
  6. # run a SQL update query on the registered source table and emit the result to registered sink table
  7. table_env \
  8. .sqlUpdate("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
  1. Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...);
  2. [INFO] Table has been created.
  3. Flink SQL> CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...);
  4. Flink SQL> SHOW TABLES;
  5. Orders
  6. RubberOrders
  7. Flink SQL> INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%';
  8. [INFO] Submitting SQL update statement to the cluster...
  9. [INFO] Table update statement has been successfully submitted to the cluster:

Insert from select queries

Query Results can be inserted into tables by using the insert clause.

Syntax

  1. INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name [PARTITION part_spec] select_statement
  2. part_spec:
  3. (part_col_name1=val1 [, part_col_name2=val2, ...])

OVERWRITE

INSERT OVERWRITE will overwrite any existing data in the table or partition. Otherwise, new data is appended.

PARTITION

PARTITION clause should contain static partition columns of this inserting.

Examples

  1. -- Creates a partitioned table
  2. CREATE TABLE country_page_view (user STRING, cnt INT, date STRING, country STRING)
  3. PARTITIONED BY (date, country)
  4. WITH (...)
  5. -- Appends rows into the static partition (date='2019-8-30', country='China')
  6. INSERT INTO country_page_view PARTITION (date='2019-8-30', country='China')
  7. SELECT user, cnt FROM page_view_source;
  8. -- Appends rows into partition (date, country), where date is static partition with value '2019-8-30',
  9. -- country is dynamic partition whose value is dynamic determined by each row.
  10. INSERT INTO country_page_view PARTITION (date='2019-8-30')
  11. SELECT user, cnt, country FROM page_view_source;
  12. -- Overwrites rows into static partition (date='2019-8-30', country='China')
  13. INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30', country='China')
  14. SELECT user, cnt FROM page_view_source;
  15. -- Overwrites rows into partition (date, country), where date is static partition with value '2019-8-30',
  16. -- country is dynamic partition whose value is dynamic determined by each row.
  17. INSERT OVERWRITE country_page_view PARTITION (date='2019-8-30')
  18. SELECT user, cnt, country FROM page_view_source;

Insert values into tables

The INSERT…VALUES statement can be used to insert data into tables directly from SQL.

Syntax

  1. INSERT { INTO | OVERWRITE } [catalog_name.][db_name.]table_name VALUES values_row [, values_row ...]
  2. values_row:
  3. : (val1 [, val2, ...])

OVERWRITE

INSERT OVERWRITE will overwrite any existing data in the table. Otherwise, new data is appended.

Examples

  1. CREATE TABLE students (name STRING, age INT, gpa DECIMAL(3, 2)) WITH (...);
  2. INSERT INTO students
  3. VALUES ('fred flintstone', 35, 1.28), ('barney rubble', 32, 2.32);