Flink Node

Overview

Flink task type, used to execute Flink programs. For Flink nodes:

  1. When the program type is Java, Scala or Python, the worker submits the task flink run using the Flink command. See flink cli for more details.

  2. When the program type is SQL, the worker submit tasks using sql-client.sh. See flink sql client for more details.

Create Task

  • Click Project Management -> Project Name -> Workflow Definition, and click the Create Workflow button to enter the DAG editing page.
  • Drag from the toolbar Flink - 图1task node to canvas.

Task Parameters

ParameterDescription
Node nameThe node name in a workflow definition is unique.
Run flagIdentifies whether this node schedules normally, if it does not need to execute, select the prohibition execution.
DescriptionDescribe the function of the node.
Task priorityWhen the number of worker threads is insufficient, execute in the order of priority from high to low, and tasks with the same priority will execute in a first-in first-out order.
Worker groupingAssign tasks to the machines of the worker group to execute. If Default is selected, randomly select a worker machine for execution.
Environment NameConfigure the environment name in which run the script.
Times of failed retry attemptsThe number of times the task failed to resubmit.
Failed retry intervalThe time interval (unit minute) for resubmitting the task after a failed task.
Delayed execution timeThe time (unit minute) that a task delays in execution.
Timeout alarmCheck the timeout alarm and timeout failure. When the task runs exceed the “timeout”, an alarm email will send and the task execution will fail.
Program typeSupport Java, Scala, Python and SQL four languages.
Class of main functionThe full path of Main Class, the entry point of the Flink program.
Main jar packageThe jar package of the Flink program (upload by Resource Center).
Deployment modeSupport 3 deployment modes: cluster, local and application (Flink 1.11 and later. See also Run an application in Application Mode).
Initialization scriptScript file to initialize session context.
ScriptThe sql script file developed by the user that should be executed.
Flink versionSelect version according to the execution environment.
Task nameFlink task name.
JobManager memory sizeUsed to set the size of jobManager memories, which can be set according to the actual production environment.
Number of slotsUsed to set the number of slots, which can be set according to the actual production environment.
TaskManager memory sizeUsed to set the size of taskManager memories, which can be set according to the actual production environment.
Number of TaskManagerUsed to set the number of taskManagers, which can be set according to the actual production environment.
ParallelismUsed to set the degree of parallelism for executing Flink tasks.
Main program parametersSet the input parameters for the Flink program and support the substitution of custom parameter variables.
Optional parametersSupport —jar, —files,—archives, —conf format.
ResourceAppoint resource files in the Resource if parameters refer to them.
Custom parameterIt is a local user-defined parameter for Flink, and will replace the content with ${variable} in the script.
Predecessor taskSelecting a predecessor task for the current task, will set the selected predecessor task as upstream of the current task.

Task Example

Execute the WordCount Program

This is a common introductory case in the big data ecosystem, which often apply to computational frameworks such as MapReduce, Flink and Spark. The main purpose is to count the number of identical words in the input text. (Flink’s releases attach this example job)

If you are using the flink task type in a production environment, it is necessary to configure the required environment first. The following is the configuration file: bin/env/dolphinscheduler_env.sh.

demo-flink-simple

Upload the Main Package

When using the Flink task node, you need to upload the jar package to the Resource Center for the execution, refer to the resource center.

After finish the Resource Centre configuration, upload the required target files directly by dragging and dropping.

resource_upload

Configure the required content according to the parameter descriptions above.

demo-flink-simple

Execute the FlinkSQL Program

Configure the required content according to the parameter descriptions above.

demo-flink-sql-simple

Note

  • JAVA and Scala only used for identification, there is no difference. If use Python to develop Flink, there is no class of the main function and the rest is the same.

  • Use SQL to execute Flink SQL tasks, currently only Flink 1.13 and above are supported.