Mesos Setup

Background

The Mesos implementation consists of two components: The Application Master andthe Worker. The workers are simple TaskManagers which are parameterized by the environmentset up by the application master. The most sophisticated component of the Mesosimplementation is the application master. The application master currently hoststhe following components:

Mesos Scheduler

The scheduler is responsible for registering the framework with Mesos,requesting resources, and launching worker nodes. The scheduler continuouslyneeds to report back to Mesos to ensure the framework is in a healthy state. Toverify the health of the cluster, the scheduler monitors the spawned workers andmarks them as failed and restarts them if necessary.

Flink’s Mesos scheduler itself is currently not highly available. However, itpersists all necessary information about its state (e.g. configuration, list ofworkers) in Zookeeper. In the presence of a failure, it relies on an externalsystem to bring up a new scheduler. The scheduler will then register with Mesosagain and go through the reconciliation phase. In the reconciliation phase, thescheduler receives a list of running workers nodes. It matches these against therecovered information from Zookeeper and makes sure to bring back the cluster inthe state before the failure.

Artifact Server

The artifact server is responsible for providing resources to the workernodes. The resources can be anything from the Flink binaries to shared secretsor configuration files. For instance, in non-containerized environments, theartifact server will provide the Flink binaries. What files will be serveddepends on the configuration overlay used.

The Dispatcher and the web interface provide a central point for monitoring,job submission, and other client interaction with the cluster(see FLIP-6).

Startup script and configuration overlays

The startup script provide a way to configure and start the applicationmaster. All further configuration is then inherited by the workers nodes. Thisis achieved using configuration overlays. Configuration overlays provide a wayto infer configuration from environment variables and config files which areshipped to the worker nodes.

DC/OS

This section refers to DC/OS which is a Mesos distributionwith a sophisticated application management layer. It comes pre-installed withMarathon, a service to supervise applications and maintain their state in caseof failures.

If you don’t have a running DC/OS cluster, please follow theinstructions on how to install DC/OS on the official website.

Once you have a DC/OS cluster, you may install Flink through the DC/OSUniverse. In the search prompt, just search for Flink. Alternatively, you can use the DC/OS CLI:

  1. dcos package install flink

Further information can be found in theDC/OS examples documentation.

Mesos without DC/OS

You can also run Mesos without DC/OS.

Installing Mesos

Please follow the instructions on how to setup Mesos on the official website.

After installation you have to configure the set of master and agent nodes by creating the files MESOS_HOME/etc/mesos/masters and MESOS_HOME/etc/mesos/slaves.These files contain in each row a single hostname on which the respective component will be started (assuming SSH access to these nodes).

Next you have to create MESOS_HOME/etc/mesos/mesos-master-env.sh or use the template found in the same directory.In this file, you have to define

  1. export MESOS_work_dir=WORK_DIRECTORY

and it is recommended to uncommment

  1. export MESOS_log_dir=LOGGING_DIRECTORY

In order to configure the Mesos agents, you have to create MESOS_HOME/etc/mesos/mesos-agent-env.sh or use the template found in the same directory.You have to configure

  1. export MESOS_master=MASTER_HOSTNAME:MASTER_PORT

and uncomment

  1. export MESOS_log_dir=LOGGING_DIRECTORY
  2. export MESOS_work_dir=WORK_DIRECTORY

Mesos Library

In order to run Java applications with Mesos you have to export MESOS_NATIVE_JAVA_LIBRARY=MESOS_HOME/lib/libmesos.so on Linux.Under Mac OS X you have to export MESOS_NATIVE_JAVA_LIBRARY=MESOS_HOME/lib/libmesos.dylib.

Deploying Mesos

In order to start your mesos cluster, use the deployment script MESOS_HOME/sbin/mesos-start-cluster.sh.In order to stop your mesos cluster, use the deployment script MESOS_HOME/sbin/mesos-stop-cluster.sh.More information about the deployment scripts can be found here.

Installing Marathon

Optionally, you may also install Marathon which enables you to run Flink in high availability (HA) mode.

You may install Flink on all of your Mesos Master and Agent nodes.You can also pull the binaries from the Flink web site during deployment and apply your custom configuration before launching the application master.A more convenient and easier to maintain approach is to use Docker containers to manage the Flink binaries and configuration.

This is controlled via the following configuration entries:

  1. mesos.resourcemanager.tasks.container.type: mesos _or_ docker

If set to ‘docker’, specify the image name:

  1. mesos.resourcemanager.tasks.container.image.name: image_name

A Flink session cluster is executed as a long-running Mesos Deployment. Note that you can run multiple Flink jobs on a session cluster. Each job needs to be submitted to the cluster after the cluster has been deployed.

In the /bin directory of the Flink distribution, you find two startup scriptswhich manage the Flink processes in a Mesos cluster:

  • mesos-appmaster.shThis starts the Mesos application master which will register the Mesos scheduler.It is also responsible for starting up the worker nodes.

  • mesos-taskmanager.shThe entry point for the Mesos worker processes.You don’t need to explicitly execute this script.It is automatically launched by the Mesos worker node to bring up a new TaskManager.

In order to run the mesos-appmaster.sh script you have to define mesos.master in the flink-conf.yaml or pass it via -Dmesos.master=… to the Java process.

When executing mesos-appmaster.sh, it will create a job manager on the machine where you executed the script.In contrast to that, the task managers will be run as Mesos tasks in the Mesos cluster.

A Flink job cluster is a dedicated cluster which runs a single job.There is no extra job submission needed.

In the /bin directory of the Flink distribution, you find one startup scriptwhich manage the Flink processes in a Mesos cluster:

  • mesos-appmaster-job.shThis starts the Mesos application master which will register the Mesos scheduler, retrieve the job graph and then launch the task managers accordingly.In order to run the mesos-appmaster-job.sh script you have to define mesos.master and internal.jobgraph-path in the flink-conf.yamlor pass it via -Dmesos.master=… -Dinterval.jobgraph-path=… to the Java process.

The job graph file may be generated like this way:

  1. final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
  2. final String jobGraphFilename = "job.graph";
  3. File jobGraphFile = new File(jobGraphFilename);
  4. try (FileOutputStream output = new FileOutputStream(jobGraphFile);
  5. ObjectOutputStream obOutput = new ObjectOutputStream(output)){
  6. obOutput.writeObject(jobGraph);
  7. }

Note Make sure that all Mesos processes have the user code jar on the classpath. There are two ways:

  • One way is putting them in the lib/ directory, which will result in the user code jar being loaded by the system classloader.
  • The other way is creating a usrlib/ directory in the parent directory of lib/ and putting the user code jar in the usrlib/ directory.After launching a job cluster via bin/mesos-appmaster-job.sh …, the user code jar will be loaded by the user code classloader.

General configuration

It is possible to completely parameterize a Mesos application through Java properties passed to the Mesos application master.This also allows to specify general Flink configuration parameters.For example:

  1. bin/mesos-appmaster.sh \
  2. -Dmesos.master=master.foobar.org:5050 \
  3. -Djobmanager.heap.size=1024m \
  4. -Djobmanager.rpc.port=6123 \
  5. -Drest.port=8081 \
  6. -Dmesos.resourcemanager.tasks.mem=4096 \
  7. -Dtaskmanager.memory.process.size=3500m \
  8. -Dtaskmanager.numberOfTaskSlots=2 \
  9. -Dparallelism.default=10

High Availability

You will need to run a service like Marathon or Apache Aurora which takes care of restarting the Flink master process in case of node or process failures.In addition, Zookeeper needs to be configured like described in the High Availability section of the Flink docs.

Marathon

Marathon needs to be set up to launch the bin/mesos-appmaster.sh script.In particular, it should also adjust any configuration parameters for the Flink cluster.

Here is an example configuration for Marathon:

  1. {
  2. "id": "flink",
  3. "cmd": "$FLINK_HOME/bin/mesos-appmaster.sh -Djobmanager.heap.size=1024m -Djobmanager.rpc.port=6123 -Drest.port=8081 -Dmesos.resourcemanager.tasks.mem=1024 -Dtaskmanager.memory.process.size=1024m -Dtaskmanager.numberOfTaskSlots=2 -Dparallelism.default=2 -Dmesos.resourcemanager.tasks.cpus=1",
  4. "cpus": 1.0,
  5. "mem": 1024
  6. }

When running Flink with Marathon, the whole Flink cluster including the job manager will be run as Mesos tasks in the Mesos cluster.

Configuration parameters

For a list of Mesos specific configuration, refer to the Mesos sectionof the configuration documentation.