DELETE Statements

DELETE statement is used to perform row-level deletion on the target table according to the filter if provided.

Attention Currently, DELETE statement only supports in batch mode, and it requires the target table connector implements the SupportsRowLevelDelete interface to support the row-level delete. An exception will be thrown if trying to DELETE the table which has not implements the related interface. Currently, there is no existing connector maintained by flink has supported DELETE yet.

Run a DELETE statement

Java

DELETE statements can be executed with the executeSql() method of the TableEnvironment. The executeSql() will submit a Flink job immediately, and return a TableResult instance which associates the submitted job.

The following examples show how to run a single DELETE statement in TableEnvironment.

Scala

DELETE statements can be executed with the executeSql() method of the TableEnvironment. The executeSql() will submit a Flink job immediately, and return a TableResult instance which associates the submitted job.

The following examples show how to run a single DELETE statement in TableEnvironment.

Python

DELETE statements can be executed with the execute_sql() method of the TableEnvironment. The executeSql() will submit a Flink job immediately, and return a TableResult instance which associates the submitted job.

The following examples show how to run a single DELETE statement in TableEnvironment.

SQL CLI

DELETE statements can be executed in SQL CLI.

The following examples show how to run a DELETE statement in SQL CLI.

Java

  1. EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
  2. TableEnvironment tEnv = TableEnvironment.create(settings);
  3. // register a table named "Orders"
  4. tEnv.executeSql("CREATE TABLE Orders (`user` STRING, product STRING, amount INT) WITH (...)");
  5. // insert values
  6. tEnv.executeSql("INSERT INTO Orders VALUES ('Lili', 'Apple', 1), ('Jessica', 'Banana', 2), ('Mr.White', 'Chicken', 3)").await();
  7. tEnv.executeSql("SELECT * FROM Orders").print();
  8. // +--------------------------------+--------------------------------+-------------+
  9. // | user | product | amount |
  10. // +--------------------------------+--------------------------------+-------------+
  11. // | Lili | Apple | 1 |
  12. // | Jessica | Banana | 2 |
  13. // | Mr.White | Chicken | 3 |
  14. // +--------------------------------+--------------------------------+-------------+
  15. // 3 rows in set
  16. // delete by filter
  17. tEnv.executeSql("DELETE FROM Orders WHERE `user` = 'Lili'").await();
  18. tEnv.executeSql("SELECT * FROM Orders").print();
  19. // +--------------------------------+--------------------------------+-------------+
  20. // | user | product | amount |
  21. // +--------------------------------+--------------------------------+-------------+
  22. // | Jessica | Banana | 2 |
  23. // | Mr.White | Chicken | 3 |
  24. // +--------------------------------+--------------------------------+-------------+
  25. // 2 rows in set
  26. // delete entire table
  27. tEnv.executeSql("DELETE FROM Orders").await();
  28. tEnv.executeSql("SELECT * FROM Orders").print();
  29. // Empty set

Scala

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. val settings = EnvironmentSettings.newInstance().inBatchMode().build()
  3. val tEnv = StreamTableEnvironment.create(env, settings)
  4. // register a table named "Orders"
  5. tEnv.executeSql("CREATE TABLE Orders (`user` STRING, product STRING, amount INT) WITH (...)");
  6. // insert values
  7. tEnv.executeSql("INSERT INTO Orders VALUES ('Lili', 'Apple', 1), ('Jessica', 'Banana', 2), ('Mr.White', 'Chicken', 3)").await();
  8. tEnv.executeSql("SELECT * FROM Orders").print();
  9. // +--------------------------------+--------------------------------+-------------+
  10. // | user | product | amount |
  11. // +--------------------------------+--------------------------------+-------------+
  12. // | Lili | Apple | 1 |
  13. // | Jessica | Banana | 2 |
  14. // | Mr.White | Chicken | 3 |
  15. // +--------------------------------+--------------------------------+-------------+
  16. // 3 rows in set
  17. // delete by filter
  18. tEnv.executeSql("DELETE FROM Orders WHERE `user` = 'Lili'").await();
  19. tEnv.executeSql("SELECT * FROM Orders").print();
  20. // +--------------------------------+--------------------------------+-------------+
  21. // | user | product | amount |
  22. // +--------------------------------+--------------------------------+-------------+
  23. // | Jessica | Banana | 2 |
  24. // | Mr.White | Chicken | 3 |
  25. // +--------------------------------+--------------------------------+-------------+
  26. // 2 rows in set
  27. // delete entire table
  28. tEnv.executeSql("DELETE FROM Orders").await();
  29. tEnv.executeSql("SELECT * FROM Orders").print();
  30. // Empty set

Python

  1. env_settings = EnvironmentSettings.in_batch_mode()
  2. table_env = TableEnvironment.create(env_settings)
  3. # register a table named "Orders"
  4. table_env.executeSql("CREATE TABLE Orders (`user` STRING, product STRING, amount INT) WITH (...)");
  5. # insert values
  6. table_env.executeSql("INSERT INTO Orders VALUES ('Lili', 'Apple', 1), ('Jessica', 'Banana', 2), ('Mr.White', 'Chicken', 3)").wait();
  7. table_env.executeSql("SELECT * FROM Orders").print();
  8. # +--------------------------------+--------------------------------+-------------+
  9. # | user | product | amount |
  10. # +--------------------------------+--------------------------------+-------------+
  11. # | Lili | Apple | 1 |
  12. # | Jessica | Banana | 2 |
  13. # | Mr.White | Chicken | 3 |
  14. # +--------------------------------+--------------------------------+-------------+
  15. # 3 rows in set
  16. # delete by filter
  17. table_env.executeSql("DELETE FROM Orders WHERE `user` = 'Lili'").wait();
  18. table_env.executeSql("SELECT * FROM Orders").print();
  19. # +--------------------------------+--------------------------------+-------------+
  20. # | user | product | amount |
  21. # +--------------------------------+--------------------------------+-------------+
  22. # | Jessica | Banana | 2 |
  23. # | Mr.White | Chicken | 3 |
  24. # +--------------------------------+--------------------------------+-------------+
  25. # 2 rows in set
  26. # delete entire table
  27. table_env.executeSql("DELETE FROM Orders").wait();
  28. table_env.executeSql("SELECT * FROM Orders").print();
  29. # Empty set

SQL CLI

  1. Flink SQL> SET 'execution.runtime-mode' = 'batch';
  2. [INFO] Session property has been set.
  3. Flink SQL> CREATE TABLE Orders (`user` STRING, product STRING, amount INT) with (...);
  4. [INFO] Execute statement succeed.
  5. Flink SQL> INSERT INTO Orders VALUES ('Lili', 'Apple', 1), ('Jessica', 'Banana', 1), ('Mr.White', 'Chicken', 3);
  6. [INFO] Submitting SQL update statement to the cluster...
  7. [INFO] SQL update statement has been successfully submitted to the cluster:
  8. Job ID: bd2c46a7b2769d5c559abd73ecde82e9
  9. Flink SQL> SELECT * FROM Orders;
  10. user product amount
  11. Lili Apple 1
  12. Jessica Banana 2
  13. Mr.White Chicken 3
  14. Flink SQL> DELETE FROM Orders WHERE `user` = 'Lili';
  15. user product amount
  16. Jessica Banana 2
  17. Mr.White Chicken 3

DELETE ROWS

  1. DELETE FROM [catalog_name.][db_name.]table_name [ WHERE condition ]