Tasks

Core.Task — Type

  1. Task(func)

Create a Task (i.e. coroutine) to execute the given function func (which must be callable with no arguments). The task exits when this function returns. The task will run in the “world age” from the parent at construction when scheduled.

Examples

  1. julia> a() = sum(i for i in 1:1000);
  2. julia> b = Task(a);

In this example, b is a runnable Task that hasn’t started yet.

source

Base.@task — Macro

  1. @task

Wrap an expression in a Task without executing it, and return the Task. This only creates a task, and does not run it.

Examples

  1. julia> a1() = sum(i for i in 1:1000);
  2. julia> b = @task a1();
  3. julia> istaskstarted(b)
  4. false
  5. julia> schedule(b);
  6. julia> yield();
  7. julia> istaskdone(b)
  8. true

source

Base.@async — Macro

  1. @async

Wrap an expression in a Task and add it to the local machine’s scheduler queue.

Values can be interpolated into @async via $, which copies the value directly into the constructed underlying closure. This allows you to insert the value of a variable, isolating the asynchronous code from changes to the variable’s value in the current task.

Julia 1.4

Interpolating values via $ is available as of Julia 1.4.

source

Base.asyncmap — Function

  1. asyncmap(f, c...; ntasks=0, batch_size=nothing)

Uses multiple concurrent tasks to map f over a collection (or multiple equal length collections). For multiple collection arguments, f is applied elementwise.

ntasks specifies the number of tasks to run concurrently. Depending on the length of the collections, if ntasks is unspecified, up to 100 tasks will be used for concurrent mapping.

ntasks can also be specified as a zero-arg function. In this case, the number of tasks to run in parallel is checked before processing every element and a new task started if the value of ntasks_func is greater than the current number of tasks.

If batch_size is specified, the collection is processed in batch mode. f must then be a function that must accept a Vector of argument tuples and must return a vector of results. The input vector will have a length of batch_size or less.

The following examples highlight execution in different tasks by returning the objectid of the tasks in which the mapping function is executed.

First, with ntasks undefined, each element is processed in a different task.

  1. julia> tskoid() = objectid(current_task());
  2. julia> asyncmap(x->tskoid(), 1:5)
  3. 5-element Array{UInt64,1}:
  4. 0x6e15e66c75c75853
  5. 0x440f8819a1baa682
  6. 0x9fb3eeadd0c83985
  7. 0xebd3e35fe90d4050
  8. 0x29efc93edce2b961
  9. julia> length(unique(asyncmap(x->tskoid(), 1:5)))
  10. 5

With ntasks=2 all elements are processed in 2 tasks.

  1. julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
  2. 5-element Array{UInt64,1}:
  3. 0x027ab1680df7ae94
  4. 0xa23d2f80cd7cf157
  5. 0x027ab1680df7ae94
  6. 0xa23d2f80cd7cf157
  7. 0x027ab1680df7ae94
  8. julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
  9. 2

With batch_size defined, the mapping function needs to be changed to accept an array of argument tuples and return an array of results. map is used in the modified mapping function to achieve this.

  1. julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
  2. batch_func (generic function with 1 method)
  3. julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
  4. 5-element Array{String,1}:
  5. "args_tuple: (1,), element_val: 1, task: 9118321258196414413"
  6. "args_tuple: (2,), element_val: 2, task: 4904288162898683522"
  7. "args_tuple: (3,), element_val: 3, task: 9118321258196414413"
  8. "args_tuple: (4,), element_val: 4, task: 4904288162898683522"
  9. "args_tuple: (5,), element_val: 5, task: 9118321258196414413"

Note

Currently, all tasks in Julia are executed in a single OS thread co-operatively. Consequently, asyncmap is beneficial only when the mapping function involves any I/O - disk, network, remote worker invocation, etc.

source

Base.asyncmap! — Function

  1. asyncmap!(f, results, c...; ntasks=0, batch_size=nothing)

Like asyncmap, but stores output in results rather than returning a collection.

source

Base.current_task — Function

  1. current_task()

Get the currently running Task.

source

Base.istaskdone — Function

  1. istaskdone(t::Task) -> Bool

Determine whether a task has exited.

Examples

  1. julia> a2() = sum(i for i in 1:1000);
  2. julia> b = Task(a2);
  3. julia> istaskdone(b)
  4. false
  5. julia> schedule(b);
  6. julia> yield();
  7. julia> istaskdone(b)
  8. true

source

Base.istaskstarted — Function

  1. istaskstarted(t::Task) -> Bool

Determine whether a task has started executing.

Examples

  1. julia> a3() = sum(i for i in 1:1000);
  2. julia> b = Task(a3);
  3. julia> istaskstarted(b)
  4. false

source

Base.istaskfailed — Function

  1. istaskfailed(t::Task) -> Bool

Determine whether a task has exited because an exception was thrown.

Examples

  1. julia> a4() = error("task failed");
  2. julia> b = Task(a4);
  3. julia> istaskfailed(b)
  4. false
  5. julia> schedule(b);
  6. julia> yield();
  7. julia> istaskfailed(b)
  8. true

Julia 1.3

This function requires at least Julia 1.3.

source

Base.task_local_storage — Method

  1. task_local_storage(key)

Look up the value of a key in the current task’s task-local storage.

source

Base.task_local_storage — Method

  1. task_local_storage(key, value)

Assign a value to a key in the current task’s task-local storage.

source

Base.task_local_storage — Method

  1. task_local_storage(body, key, value)

Call the function body with a modified task-local storage, in which value is assigned to key; the previous value of key, or lack thereof, is restored afterwards. Useful for emulating dynamic scoping.

source

Scheduling

Base.yield — Function

  1. yield()

Switch to the scheduler to allow another scheduled task to run. A task that calls this function is still runnable, and will be restarted immediately if there are no other runnable tasks.

source

  1. yield(t::Task, arg = nothing)

A fast, unfair-scheduling version of schedule(t, arg); yield() which immediately yields to t before calling the scheduler.

source

Base.yieldto — Function

  1. yieldto(t::Task, arg = nothing)

Switch to the given task. The first time a task is switched to, the task’s function is called with no arguments. On subsequent switches, arg is returned from the task’s last call to yieldto. This is a low-level call that only switches tasks, not considering states or scheduling in any way. Its use is discouraged.

source

Base.sleep — Function

  1. sleep(seconds)

Block the current task for a specified number of seconds. The minimum sleep time is 1 millisecond or input of 0.001.

source

Base.schedule — Function

  1. schedule(t::Task, [val]; error=false)

Add a Task to the scheduler’s queue. This causes the task to run constantly when the system is otherwise idle, unless the task performs a blocking operation such as wait.

If a second argument val is provided, it will be passed to the task (via the return value of yieldto) when it runs again. If error is true, the value is raised as an exception in the woken task.

Warning

It is incorrect to use schedule on an arbitrary Task that has already been started. See the API reference for more information.

Examples

  1. julia> a5() = sum(i for i in 1:1000);
  2. julia> b = Task(a5);
  3. julia> istaskstarted(b)
  4. false
  5. julia> schedule(b);
  6. julia> yield();
  7. julia> istaskstarted(b)
  8. true
  9. julia> istaskdone(b)
  10. true

source

Synchronization

Synchronization

Base.errormonitor — Function

  1. errormonitor(t::Task)

Print an error log to stderr if task t fails.

source

Base.@sync — Macro

  1. @sync

Wait until all lexically-enclosed uses of @async, @spawn, @spawnat and @distributed are complete. All exceptions thrown by enclosed async operations are collected and thrown as a CompositeException.

source

Base.wait — Function

  1. wait(r::Future)

Wait for a value to become available for the specified Future.

  1. wait(r::RemoteChannel, args...)

Wait for a value to become available on the specified RemoteChannel.

  1. wait([x])

Block the current task until some event occurs, depending on the type of the argument:

  • Channel: Wait for a value to be appended to the channel.
  • Condition: Wait for notify on a condition and return the val parameter passed to notify.
  • Process: Wait for a process or process chain to exit. The exitcode field of a process can be used to determine success or failure.
  • Task: Wait for a Task to finish. If the task fails with an exception, a TaskFailedException (which wraps the failed task) is thrown.
  • RawFD: Wait for changes on a file descriptor (see the FileWatching package).

If no argument is passed, the task blocks for an undefined period. A task can only be restarted by an explicit call to schedule or yieldto.

Often wait is called within a while loop to ensure a waited-for condition is met before proceeding.

source

Special note for Threads.Condition:

The caller must be holding the lock that owns a Threads.Condition before calling this method. The calling task will be blocked until some other task wakes it, usually by calling notify on the same Threads.Condition object. The lock will be atomically released when blocking (even if it was locked recursively), and will be reacquired before returning.

source

Base.fetch — Method

  1. fetch(t::Task)

Wait for a Task to finish, then return its result value. If the task fails with an exception, a TaskFailedException (which wraps the failed task) is thrown.

source

Base.timedwait — Function

  1. timedwait(callback::Function, timeout::Real; pollint::Real=0.1)

Waits until callback returns true or timeout seconds have passed, whichever is earlier. callback is polled every pollint seconds. The minimum value for timeout and pollint is 0.001, that is, 1 millisecond.

Returns :ok or :timed_out

source

Base.Condition — Type

  1. Condition()

Create an edge-triggered event source that tasks can wait for. Tasks that call wait on a Condition are suspended and queued. Tasks are woken up when notify is later called on the Condition. Edge triggering means that only tasks waiting at the time notify is called can be woken up. For level-triggered notifications, you must keep extra state to keep track of whether a notification has happened. The Channel and Threads.Event types do this, and can be used for level-triggered events.

This object is NOT thread-safe. See Threads.Condition for a thread-safe version.

source

Base.Threads.Condition — Type

  1. Threads.Condition([lock])

A thread-safe version of Base.Condition.

To call wait or notify on a Threads.Condition, you must first call lock on it. When wait is called, the lock is atomically released during blocking, and will be reacquired before wait returns. Therefore idiomatic use of a Threads.Condition c looks like the following:

  1. lock(c)
  2. try
  3. while !thing_we_are_waiting_for
  4. wait(c)
  5. end
  6. finally
  7. unlock(c)
  8. end

Julia 1.2

This functionality requires at least Julia 1.2.

source

Base.Event — Type

  1. Event([autoreset=false])

Create a level-triggered event source. Tasks that call wait on an Event are suspended and queued until notify is called on the Event. After notify is called, the Event remains in a signaled state and tasks will no longer block when waiting for it, until reset is called.

If autoreset is true, at most one task will be released from wait for each call to notify.

This provides an acquire & release memory ordering on notify/wait.

Julia 1.1

This functionality requires at least Julia 1.1.

Julia 1.8

The autoreset functionality and memory ordering guarantee requires at least Julia 1.8.

source

Base.notify — Function

  1. notify(condition, val=nothing; all=true, error=false)

Wake up tasks waiting for a condition, passing them val. If all is true (the default), all waiting tasks are woken, otherwise only one is. If error is true, the passed value is raised as an exception in the woken tasks.

Return the count of tasks woken up. Return 0 if no tasks are waiting on condition.

source

Base.reset — Method

  1. reset(::Event)

Reset an Event back into an un-set state. Then any future calls to wait will block until notify is called again.

source

Base.Semaphore — Type

  1. Semaphore(sem_size)

Create a counting semaphore that allows at most sem_size acquires to be in use at any time. Each acquire must be matched with a release.

This provides a acquire & release memory ordering on acquire/release calls.

source

Base.acquire — Function

  1. acquire(s::Semaphore)

Wait for one of the sem_size permits to be available, blocking until one can be acquired.

source

  1. acquire(f, s::Semaphore)

Execute f after acquiring from Semaphore s, and release on completion or error.

For example, a do-block form that ensures only 2 calls of foo will be active at the same time:

  1. s = Base.Semaphore(2)
  2. @sync for _ in 1:100
  3. Threads.@spawn begin
  4. Base.acquire(s) do
  5. foo()
  6. end
  7. end
  8. end

Julia 1.8

This method requires at least Julia 1.8.

source

Base.release — Function

  1. release(s::Semaphore)

Return one permit to the pool, possibly allowing another task to acquire it and resume execution.

source

Base.AbstractLock — Type

  1. AbstractLock

Abstract supertype describing types that implement the synchronization primitives: lock, trylock, unlock, and islocked.

source

Base.lock — Function

  1. lock(lock)

Acquire the lock when it becomes available. If the lock is already locked by a different task/thread, wait for it to become available.

Each lock must be matched by an unlock.

source

  1. lock(f::Function, lock)

Acquire the lock, execute f with the lock held, and release the lock when f returns. If the lock is already locked by a different task/thread, wait for it to become available.

When this function returns, the lock has been released, so the caller should not attempt to unlock it.

Julia 1.7

Using a Channel as the second argument requires Julia 1.7 or later.

source

Base.unlock — Function

  1. unlock(lock)

Releases ownership of the lock.

If this is a recursive lock which has been acquired before, decrement an internal counter and return immediately.

source

Base.trylock — Function

  1. trylock(lock) -> Success (Boolean)

Acquire the lock if it is available, and return true if successful. If the lock is already locked by a different task/thread, return false.

Each successful trylock must be matched by an unlock.

source

Base.islocked — Function

  1. islocked(lock) -> Status (Boolean)

Check whether the lock is held by any task/thread. This should not be used for synchronization (see instead trylock).

source

Base.ReentrantLock — Type

  1. ReentrantLock()

Creates a re-entrant lock for synchronizing Tasks. The same task can acquire the lock as many times as required. Each lock must be matched with an unlock.

Calling ‘lock’ will also inhibit running of finalizers on that thread until the corresponding ‘unlock’. Use of the standard lock pattern illustrated below should naturally be supported, but beware of inverting the try/lock order or missing the try block entirely (e.g. attempting to return with the lock still held):

This provides a acquire/release memory ordering on lock/unlock calls.

  1. lock(l)
  2. try
  3. <atomic work>
  4. finally
  5. unlock(l)
  6. end

source

Channels

Base.Channel — Type

  1. Channel{T=Any}(size::Int=0)

Constructs a Channel with an internal buffer that can hold a maximum of size objects of type T. put! calls on a full channel block until an object is removed with take!.

Channel(0) constructs an unbuffered channel. put! blocks until a matching take! is called. And vice-versa.

Other constructors:

  • Channel(): default constructor, equivalent to Channel{Any}(0)
  • Channel(Inf): equivalent to Channel{Any}(typemax(Int))
  • Channel(sz): equivalent to Channel{Any}(sz)

Julia 1.3

The default constructor Channel() and default size=0 were added in Julia 1.3.

source

Base.Channel — Method

  1. Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false)

Create a new task from func, bind it to a new channel of type T and size size, and schedule the task, all in a single call. The channel is automatically closed when the task terminates.

func must accept the bound channel as its only argument.

If you need a reference to the created task, pass a Ref{Task} object via the keyword argument taskref.

If spawn = true, the Task created for func may be scheduled on another thread in parallel, equivalent to creating a task via Threads.@spawn.

Return a Channel.

Examples

  1. julia> chnl = Channel() do ch
  2. foreach(i -> put!(ch, i), 1:4)
  3. end;
  4. julia> typeof(chnl)
  5. Channel{Any}
  6. julia> for i in chnl
  7. @show i
  8. end;
  9. i = 1
  10. i = 2
  11. i = 3
  12. i = 4

Referencing the created task:

  1. julia> taskref = Ref{Task}();
  2. julia> chnl = Channel(taskref=taskref) do ch
  3. println(take!(ch))
  4. end;
  5. julia> istaskdone(taskref[])
  6. false
  7. julia> put!(chnl, "Hello");
  8. Hello
  9. julia> istaskdone(taskref[])
  10. true

Julia 1.3

The spawn= parameter was added in Julia 1.3. This constructor was added in Julia 1.3. In earlier versions of Julia, Channel used keyword arguments to set size and T, but those constructors are deprecated.

  1. julia> chnl = Channel{Char}(1, spawn=true) do ch
  2. for c in "hello world"
  3. put!(ch, c)
  4. end
  5. end
  6. Channel{Char}(1) (2 items available)
  7. julia> String(collect(chnl))
  8. "hello world"

source

Base.put! — Method

  1. put!(c::Channel, v)

Append an item v to the channel c. Blocks if the channel is full.

For unbuffered channels, blocks until a take! is performed by a different task.

Julia 1.1

v now gets converted to the channel’s type with convert as put! is called.

source

Base.take! — Method

  1. take!(c::Channel)

Remove and return a value from a Channel. Blocks until data is available.

For unbuffered channels, blocks until a put! is performed by a different task.

source

Base.isready — Method

  1. isready(c::Channel)

Determine whether a Channel has a value stored to it. Returns immediately, does not block.

For unbuffered channels returns true if there are tasks waiting on a put!.

source

Base.fetch — Method

  1. fetch(c::Channel)

Wait for and get the first available item from the channel. Does not remove the item. fetch is unsupported on an unbuffered (0-size) channel.

source

Base.close — Method

  1. close(c::Channel[, excp::Exception])

Close a channel. An exception (optionally given by excp), is thrown by:

  • put! on a closed channel.
  • take! and fetch on an empty, closed channel.

source

Base.bind — Method

  1. bind(chnl::Channel, task::Task)

Associate the lifetime of chnl with a task. Channel chnl is automatically closed when the task terminates. Any uncaught exception in the task is propagated to all waiters on chnl.

The chnl object can be explicitly closed independent of task termination. Terminating tasks have no effect on already closed Channel objects.

When a channel is bound to multiple tasks, the first task to terminate will close the channel. When multiple channels are bound to the same task, termination of the task will close all of the bound channels.

Examples

  1. julia> c = Channel(0);
  2. julia> task = @async foreach(i->put!(c, i), 1:4);
  3. julia> bind(c,task);
  4. julia> for i in c
  5. @show i
  6. end;
  7. i = 1
  8. i = 2
  9. i = 3
  10. i = 4
  11. julia> isopen(c)
  12. false
  1. julia> c = Channel(0);
  2. julia> task = @async (put!(c, 1); error("foo"));
  3. julia> bind(c, task);
  4. julia> take!(c)
  5. 1
  6. julia> put!(c, 1);
  7. ERROR: TaskFailedException
  8. Stacktrace:
  9. [...]
  10. nested task error: foo
  11. [...]

source

Low-level synchronization using schedule and wait

The easiest correct use of schedule is on a Task that is not started (scheduled) yet. However, it is possible to use schedule and wait as a very low-level building block for constructing synchronization interfaces. A crucial pre-condition of calling schedule(task) is that the caller must “own” the task; i.e., it must know that the call to wait in the given task is happening at the locations known to the code calling schedule(task). One strategy for ensuring such pre-condition is to use atomics, as demonstrated in the following example:

  1. @enum OWEState begin
  2. OWE_EMPTY
  3. OWE_WAITING
  4. OWE_NOTIFYING
  5. end
  6. mutable struct OneWayEvent
  7. @atomic state::OWEState
  8. task::Task
  9. OneWayEvent() = new(OWE_EMPTY)
  10. end
  11. function Base.notify(ev::OneWayEvent)
  12. state = @atomic ev.state
  13. while state !== OWE_NOTIFYING
  14. # Spin until we successfully update the state to OWE_NOTIFYING:
  15. state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
  16. if ok
  17. if state == OWE_WAITING
  18. # OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
  19. # already waiting or about to call `wait`. The notifier task must wake up
  20. # the waiter task.
  21. schedule(ev.task)
  22. else
  23. @assert state == OWE_EMPTY
  24. # Since we are assuming that there is only one notifier task (for
  25. # simplicity), we know that the other possible case here is OWE_EMPTY.
  26. # We do not need to do anything because we know that the waiter task has
  27. # not called `wait(ev::OneWayEvent)` yet.
  28. end
  29. break
  30. end
  31. end
  32. return
  33. end
  34. function Base.wait(ev::OneWayEvent)
  35. ev.task = current_task()
  36. state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
  37. if ok
  38. # OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
  39. # invoke OWE_WAITING -> OWE_NOTIFYING transition. The waiter task must call
  40. # `wait()` immediately. In particular, it MUST NOT invoke any function that may
  41. # yield to the scheduler at this point in code.
  42. wait()
  43. else
  44. @assert state == OWE_NOTIFYING
  45. # Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
  46. # notifier task.
  47. end
  48. return
  49. end
  50. ev = OneWayEvent()
  51. @sync begin
  52. @async begin
  53. wait(ev)
  54. println("done")
  55. end
  56. println("notifying...")
  57. notify(ev)
  58. end
  59. # output
  60. notifying...
  61. done

OneWayEvent lets one task to wait for another task’s notify. It is a limited communication interface since wait can only be used once from a single task (note the non-atomic assignment of ev.task)

In this example, notify(ev::OneWayEvent) is allowed to call schedule(ev.task) if and only if it modifies the state from OWE_WAITING to OWE_NOTIFYING. This lets us know that the task executing wait(ev::OneWayEvent) is now in the ok branch and that there cannot be other tasks that tries to schedule(ev.task) since their @atomicreplace(ev.state, state => OWE_NOTIFYING) will fail.