Job Management

Job is the fundamental object of high performance workload; this document provides the definition of Job in Volcano.

The definition of Job follow Kuberentes’s style, e.g. Status, Spec; the follow sections will only describethe major functions of Job.

Multiple Pod Template

As most jobs of high performance workload include different type of tasks, e.g. TensorFlow (ps/worker), Spark (driver/executor);Job introduces taskSpecs to support multiple pod template, defined as follow. The Policies will describe in Error Handling section.

  1. // JobSpec describes how the job execution will look like and when it will actually run
  2. type JobSpec struct {
  3. ...
  4. // Tasks specifies the task specification of Job
  5. // +optional
  6. Tasks []TaskSpec `json:"tasks,omitempty" protobuf:"bytes,5,opt,name=tasks"`
  7. }
  8. // TaskSpec specifies the task specification of Job
  9. type TaskSpec struct {
  10. // Name specifies the name of task
  11. Name string `json:"name,omitempty" protobuf:"bytes,1,opt,name=name"`
  12. // Replicas specifies the replicas of this TaskSpec in Job
  13. Replicas int32 `json:"replicas,omitempty" protobuf:"bytes,2,opt,name=replicas"`
  14. // Specifies the pod that will be created for this TaskSpec
  15. // when executing a Job
  16. Template v1.PodTemplateSpec `json:"template,omitempty" protobuf:"bytes,3,opt,name=template"`
  17. // Specifies the lifecycle of tasks
  18. // +optional
  19. Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,4,opt,name=policies"`
  20. }

JobController will create Pods based on the templates and replicas in spec.tasks;the controlled OwnerReference of Pod will be set to the Job. The following isan example YAML with multiple pod template.

  1. apiVersion: batch.volcano.sh/v1alpha1
  2. kind: Job
  3. metadata:
  4. name: tf-job
  5. spec:
  6. tasks:
  7. - name: "ps"
  8. replicas: 2
  9. template:
  10. spec:
  11. containers:
  12. - name: ps
  13. image: ps-img
  14. - name: "worker"
  15. replicas: 5
  16. template:
  17. spec:
  18. containers:
  19. - name: worker
  20. image: worker-img

Job Input/Output

Most of high performance workload will handle data which is considering as input/output of a Job.The following types are introduced for Job’s input/output.

  1. type VolumeSpec struct {
  2. MountPath string `json:"mountPath" protobuf:"bytes,1,opt,name=mountPath"`
  3. // defined the PVC name
  4. // + optional
  5. VolumeClaimName string `json:"volumeClaimName,omitempty" protobuf:"bytes,2,opt,name=volumeClaimName"`
  6. // VolumeClaim defines the PVC used by the VolumeSpec.
  7. // + optional
  8. VolumeClaim *PersistentVolumeClaim `json:"claim,omitempty" protobuf:"bytes,3,opt,name=claim"`
  9. }
  10. type JobSpec struct{
  11. ...
  12. // The volumes mount on Job
  13. // +optional
  14. Volumes []VolumeSpec `json:"volumes,omitempty" protobuf:"bytes,1,opt,name=volumes"`
  15. }

The Volumes of Job can be nil which means user will manage data themselves. If VolumeSpec.volumeClaim is nil and VolumeSpec.volumeClaimName is nil or not exist in PersistentVolumeClaim,emptyDir volume will be used for each Task/Pod.

Conditions and Phases

The following phases are introduced to give a simple, high-level summary of where the Job is in its lifecycle; and the conditions array,the reason and message field contain more detail about the job’s status.

  1. type JobPhase string
  2. const (
  3. // Pending is the phase that job is pending in the queue, waiting for scheduling decision
  4. Pending JobPhase = "Pending"
  5. // Aborting is the phase that job is aborted, waiting for releasing pods
  6. Aborting JobPhase = "Aborting"
  7. // Aborted is the phase that job is aborted by user or error handling
  8. Aborted JobPhase = "Aborted"
  9. // Running is the phase that minimal available tasks of Job are running
  10. Running JobPhase = "Running"
  11. // Restarting is the phase that the Job is restarted, waiting for pod releasing and recreating
  12. Restarting JobPhase = "Restarting"
  13. // Completed is the phase that all tasks of Job are completed successfully
  14. Completed JobPhase = "Completed"
  15. // Terminating is the phase that the Job is terminated, waiting for releasing pods
  16. Terminating JobPhase = "Terminating"
  17. // Teriminated is the phase that the job is finished unexpected, e.g. events
  18. Teriminated JobPhase = "Terminated"
  19. )
  20. // JobState contains details for the current state of the job.
  21. type JobState struct {
  22. // The phase of Job.
  23. // +optional
  24. Phase JobPhase `json:"phase,omitempty" protobuf:"bytes,1,opt,name=phase"`
  25. // Unique, one-word, CamelCase reason for the phase's last transition.
  26. // +optional
  27. Reason string `json:"reason,omitempty" protobuf:"bytes,2,opt,name=reason"`
  28. // Human-readable message indicating details about last transition.
  29. // +optional
  30. Message string `json:"message,omitempty" protobuf:"bytes,3,opt,name=message"`
  31. }
  32. // JobStatus represents the current state of a Job
  33. type JobStatus struct {
  34. // Current state of Job.
  35. State JobState `json:"state,omitempty" protobuf:"bytes,1,opt,name=state"`
  36. ......
  37. }

The following table shows available transactions between different phases. The phase can not transfer to the targetphase if the cell is empty.

From \ ToPendingAbortedRunningCompletedTerminated
Pending
Aborted
Running
Completed
Terminated*

Restarting, Aborting and Terminating are temporary states to avoid race condition, e.g. there’ll be severalPodeEvictedEvents because of TerminateJobAction which should not be handled again.

Error Handling

After Job was created in system, there’ll be several events related to the Job, e.g. Pod succeeded, Pod failed;and some events are critical to the Job, e.g. Pod of MPIJob failed. So LifecyclePolicy is introduced to handle differentevents based on user’s configuration.

  1. // Event is the type of Event related to the Job
  2. type Event string
  3. const (
  4. // AllEvents means all event
  5. AllEvents Event = "*"
  6. // PodFailedEvent is triggered if Pod was failed
  7. PodFailedEvent Event = "PodFailed"
  8. // PodEvictedEvent is triggered if Pod was deleted
  9. PodEvictedEvent Event = "PodEvicted"
  10. // These below are several events can lead to job 'Unknown'
  11. // 1. Task Unschedulable, this is triggered when part of
  12. // pods can't be scheduled while some are already running in gang-scheduling case.
  13. JobUnknownEvent Event = "Unknown"
  14. // OutOfSyncEvent is triggered if Pod/Job were updated
  15. OutOfSyncEvent Event = "OutOfSync"
  16. // CommandIssuedEvent is triggered if a command is raised by user
  17. CommandIssuedEvent Event = "CommandIssued"
  18. // TaskCompletedEvent is triggered if the 'Replicas' amount of pods in one task are succeed
  19. TaskCompletedEvent Event = "TaskCompleted"
  20. )
  21. // Action is the type of event handling
  22. type Action string
  23. const (
  24. // AbortJobAction if this action is set, the whole job will be aborted:
  25. // all Pod of Job will be evicted, and no Pod will be recreated
  26. AbortJobAction Action = "AbortJob"
  27. // RestartJobAction if this action is set, the whole job will be restarted
  28. RestartJobAction Action = "RestartJob"
  29. // TerminateJobAction if this action is set, the whole job wil be terminated
  30. // and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
  31. TerminateJobAction Action = "TerminateJob"
  32. // CompleteJobAction if this action is set, the unfinished pods will be killed, job completed.
  33. CompleteJobAction Action = "CompleteJob"
  34. // ResumeJobAction is the action to resume an aborted job.
  35. ResumeJobAction Action = "ResumeJob"
  36. // SyncJobAction is the action to sync Job/Pod status.
  37. SyncJobAction Action = "SyncJob"
  38. )
  39. // LifecyclePolicy specifies the lifecycle and error handling of task and job.
  40. type LifecyclePolicy struct {
  41. Event Event `json:"event,omitempty" protobuf:"bytes,1,opt,name=event"`
  42. Action Action `json:"action,omitempty" protobuf:"bytes,2,opt,name=action"`
  43. Timeout *metav1.Duration `json:"timeout,omitempty" protobuf:"bytes,3,opt,name=timeout"`
  44. }

Both JobSpec and TaskSpec include lifecycle policy: the policies in JobSpec are the default policy if no policiesin TaskSpec; the policies in TaskSpec will overwrite defaults.

  1. // JobSpec describes how the job execution will look like and when it will actually run
  2. type JobSpec struct {
  3. ...
  4. // Specifies the default lifecycle of tasks
  5. // +optional
  6. Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,5,opt,name=policies"`
  7. // Tasks specifies the task specification of Job
  8. // +optional
  9. Tasks []TaskSpec `json:"tasks,omitempty" protobuf:"bytes,6,opt,name=tasks"`
  10. }
  11. // TaskSpec specifies the task specification of Job
  12. type TaskSpec struct {
  13. ...
  14. // Specifies the lifecycle of tasks
  15. // +optional
  16. Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,4,opt,name=policies"`
  17. }

The following examples demonstrate the usage of LifecyclePolicy for job and task.

For the training job of machine learning framework, the whole job should be restarted if any task was failed or evicted.To simplify the configuration, a job level LifecyclePolicy is set as follows. As no LifecyclePolicy is set for anytask, all tasks will use the policies in spec.policies.

  1. apiVersion: batch.volcano.sh/v1alpha1
  2. kind: Job
  3. metadata:
  4. name: tf-job
  5. spec:
  6. # If any event here, restart the whole job.
  7. policies:
  8. - event: *
  9. action: RestartJob
  10. tasks:
  11. - name: "ps"
  12. replicas: 1
  13. template:
  14. spec:
  15. containers:
  16. - name: ps
  17. image: ps-img
  18. - name: "worker"
  19. replicas: 5
  20. template:
  21. spec:
  22. containers:
  23. - name: worker
  24. image: worker-img
  25. ...

Some BigData framework (e.g. Spark) may have different requirements. Take Spark as example, the whole job will be restartedif ‘driver’ tasks failed and only restart the task if ‘executor’ tasks failed. OnFailure restartPolicy is set for executorand RestartJob is set for driver spec.tasks.policies as follow.

  1. apiVersion: batch.volcano.sh/v1alpha1
  2. kind: Job
  3. metadata:
  4. name: spark-job
  5. spec:
  6. tasks:
  7. - name: "driver"
  8. replicas: 1
  9. policies:
  10. - event: *
  11. action: RestartJob
  12. template:
  13. spec:
  14. containers:
  15. - name: driver
  16. image: driver-img
  17. - name: "executor"
  18. replicas: 5
  19. template:
  20. spec:
  21. containers:
  22. - name: executor
  23. image: executor-img
  24. restartPolicy: OnFailure

Features Interaction

Admission Controller

The following validations must be included to make sure expected behaviours:

  • spec.minAvailable <= sum(spec.taskSpecs.replicas)
  • no duplicated name in spec.taskSpecs array
  • no duplicated event handler in LifecyclePolicy array, both job policies and task policies

CoScheduling

CoScheduling (or Gang-scheduling) is required by most of high performance workload, e.g. TF training job, MPI job.The spec.minAvailable is used to identify how many pods will be scheduled together. The default value of spec.minAvailableis summary of spec.tasks.replicas. The admission controller web hook will check spec.minAvailable againstthe summary of spec.tasks.replicas; the job creation will be rejected if spec.minAvailable > sum(spec.tasks.replicas).If spec.minAvailable < sum(spec.tasks.replicas), the pod of spec.tasks will be created randomly;refer to Task Priority with Job section on how to create tasks in order.

  1. apiVersion: batch.volcano.sh/v1alpha1
  2. kind: Job
  3. metadata:
  4. name: tf-job
  5. spec:
  6. # minAvailable to run job
  7. minAvailable: 6
  8. tasks:
  9. - name: "ps"
  10. replicas: 1
  11. template:
  12. spec:
  13. containers:
  14. - name: "ps"
  15. image: "ps-img"
  16. - name: "worker"
  17. replicas: 5
  18. template:
  19. spec:
  20. containers:
  21. - name: "worker"
  22. image: "worker-img"

Task Priority within Job

In addition to multiple pod template, the priority of each task maybe different. PriorityClass of PodTemplate is reusedto define the priority of task within a job. This’s an example to run spark job: 1 driver with 5 executors, the driver’spriority is master-pri which is higher than normal pods; as spec.minAvailable is 3, the scheduler will make sure one driverwith 2 executors will be scheduled if not enough resources.

  1. apiVersion: batch.volcano.sh/v1alpha1
  2. kind: Job
  3. metadata:
  4. name: spark-job
  5. spec:
  6. minAvailable: 3
  7. tasks:
  8. - name: "driver"
  9. replicas: 1
  10. template:
  11. spec:
  12. priorityClass: "master-pri"
  13. containers:
  14. - name: driver
  15. image: driver-img
  16. - name: "executor"
  17. replicas: 5
  18. template:
  19. spec:
  20. containers:
  21. - name: executor
  22. image: executor-img

NOTE: although scheduler will make sure high priority pods with job will be scheduled firstly, there’s still a racecondition between different kubelets that low priority pod maybe launched early; the job/task dependency will be introducedlater to handle such kind of race condition.

Resource sharing between Job

By default, the spec.minAvailable is set to the summary of spec.tasks.replicas; if it’s set to a smaller value,the pod beyond spec.minAvailable will share resource between jobs.

  1. apiVersion: batch.volcano.sh/v1alpha1
  2. kind: Job
  3. metadata:
  4. name: spark-job
  5. spec:
  6. minAvailable: 3
  7. tasks:
  8. - name: "driver"
  9. replicas: 1
  10. template:
  11. spec:
  12. priorityClass: "master-pri"
  13. containers:
  14. - name: driver
  15. image: driver-img
  16. - name: "executor"
  17. replicas: 5
  18. template:
  19. spec:
  20. containers:
  21. - name: executor
  22. image: executor-img

Plugins for Job

As many jobs of AI frame, e.g. TensorFlow, MPI, Mxnet, need set env, pods communicate, ssh sign in without password.We provide Job api plugins to give users a better focus on core business.Now we have three plugins, every plugin has parameters, if not provided, we use default.

  • env: set VK_TASK_INDEX to each container, is a index for giving the identity to container.
  • svc: create Serivce and *.host to enable pods communicate.
  • ssh: sign in ssh without password, e.g. use command mpirun or mpiexec.
  1. apiVersion: batch.volcano.sh/v1alpha1
  2. kind: Job
  3. metadata:
  4. name: mpi-job
  5. spec:
  6. minAvailable: 2
  7. schedulerName: scheduler
  8. policies:
  9. - event: PodEvicted
  10. action: RestartJob
  11. plugins:
  12. ssh: []
  13. env: []
  14. svc: []
  15. tasks:
  16. - replicas: 1
  17. name: mpimaster
  18. template:
  19. spec:
  20. containers:
  21. image: mpi-image
  22. name: mpimaster
  23. - replicas: 2
  24. name: mpiworker
  25. template:
  26. spec:
  27. containers:
  28. image: mpi-image
  29. name: mpiworker