Async

Asynchronous Functions

OpenFaaS enables long-running tasks or function invocations to run in the background through the use of NATS Streaming. This decouples the HTTP transaction between the caller and the function.

  • The HTTP request is serialized to NATS Streaming through the gateway as a “producer”.
  • The queue-worker acts as a subscriber and deserializes the HTTP request and uses it to invoke the function directly

The asynchronous workflow can have a longer, separate timeout compared with synchronous timeout on the gateway.

Why might you want to use an asynchronous invocation?

You’re working with a partner’s webhook. They send you data, but if you don’t send a HTTP 200 OK within 1 second, they assume failure and retry the message. This works well if your function can complete in less than a second on every invocation, but if there’s any risk that it can’t, you need another solution. So you will give your partner a URL to the asynchronous function URL instead and it will reply within several milliseconds whilst still processing the data.

Synchronous vs asynchronous invocation

  • Sync

    A synchronous invocation of a user requesting a PDF would be as follows, where connections are established between each component from the beginning to the end of the invocation.

    Synchronous

If the whole process takes less than a few seconds, this may be the ideal approach, given that it’s simple to implement.

  • Async

    An asynchronous invocation of a user requesting a PDF would be as follows: an initial connection is formed to the gateway, the user’s request is serialized to a queue via the queue-worker and NATS. At a later time, the queue-worker then dequeues the request, deserializes it and makes it to the function - either directly or via the gateway using a synchronous call.

    Asynchronous

    This is beneficial if there are for instance 100 requests that all take 2 minutes to execute. It means the client / caller needs to wait only for a new milliseconds.

How it works

Any function can be invoked asynchronously by changing the route on the gateway from /function/<name> to /async-function/<name>. A 202 Accepted message will be issued in response to asynchronous calls.

Note: that asynchronous invocations do not make sense with a HTTP GET verb since they are queued and deferred, there is nothing to GET. For this reason, a HTTP POST is required.

If you would like to receive a value from an asynchronous call you should pass a HTTP header with the URL to be used for the call-back.

  1. $ faas invoke figlet -H "X-Callback-Url=https://request.bin/mybin"

It will pass back the X-Call-Id you had when you sent the initial request.

You can use netcat to check the Call Id during invocation:

  1. $ curl http://127.0.0.1:8080/async-function/figlet \
  2. --data "Hi" \
  3. --header "X-Callback-Url: http://<your-ip>:8888"
  1. $ nc -l 8888
  2. HTTP/1.1 200 OK
  3. Content-Length: 174
  4. Content-Type: application/x-www-form-urlencoded
  5. Date: Tue, 04 Dec 2018 09:24:55 GMT
  6. X-Call-Id: eb8283f5-1679-48e0-afec-194544b054aa
  7. X-Duration-Seconds: 0.002885
  8. X-Start-Time: 1543915495384346700
  9. _ _ _ _
  10. | | | | ___| | | ___
  11. | |_| |/ _ \ | |/ _ \
  12. | _ | __/ | | (_) |
  13. |_| |_|\___|_|_|\___/

Alternatively you can specify another asynchronous or synchronous function to run instead.

Configuration & Limits

There are limits for asynchronous functions, which you should understand before using them:

  • Timeouts - the timeout for any asynchronous function must “agree” with all other timeouts within the system, including the gateway and the function.
  • Concurrency / parallelism - the amount of function invocations processed at any one time.
  • Named queues - by default there is one queue, but additional queues can be added. Each named queue can have its own timeout and concurrency.
  • Payload size - the maximum size is configured to be 1MB. The limit is defined by NATS, but can be changed. Use a database, or S3 bucket for storing large payloads, and pass an identifier to function calls.
  • Retries - retries are available in OpenFaaS PRO with an exponential back-off.

Parallelism

By default there is one queue-worker replica deployed which is set up to run a single task of up to 30 seconds in duration.

You can increase the parallelism by scaling the queue-worker up - i.e. 5 replicas for 5 parallel tasks.

Alternatively you can increase the parallelism by setting the queue worker’s “max_inflight” option to a value greater than one. This will cause the queue-worker to concurrently receive up to max_inflight many messages and simultaneously invoke their corresponding functions. Should you wish to restrict concurrency for certain functions, please make use of multiple queues and separate these functions accordingly. When scaling up a queue worker, please be aware that you will get up to ‘n * max_inflight’ parallel function invocations.

You can tune the values for the number of tasks each queue worker may run in parallel as well as the maximum duration of any asynchronous task that worker processes. Edit the Kubernetes helm chart, YAML or Swarm docker-compose.yml files.

The OpenFaaS workshop has more instructions on running tasks asynchronously.

Multiple queues

Asynchronous requests are processed by the queue-worker component using a single topic (faas-request), for most use-cases this will be sufficient if most of your functions take a similar amount of time to execute. A problem may arise when you have a mixture of slow and fast running requests within the same single queue. A single slow task can hold up all the other requests and this is because the queue has FIFO semantics - first in, first out.

To use multiple queues you need to do two things:

1) Annotate your functions with a com.openfaas.queue queue or (topic) name

Imagine that your new queue is called slow-queue, you would run the following:

  1. faas-cli store deploy figlet --annotation com.openfaas.queue=slow-queue

2) Create a queue-worker for the new queue name

You now need to deploy a new queue-worker for the queue name, so that it can subscribe to messages and invoke functions without affecting the default queue.

On Kubernetes:

  1. export CORE_NS=openfaas
  2. kubectl get -n $CORE_NS deploy/queue-worker -o yaml --export > slow-queue-queue-worker.yaml

Now replace “queue-worker” with “slow-queue-queue-worker” in app: queue-worker and name: queue-worker.

  1. sed -ie s/app:\ queue-worker/app:\ slow-queue-worker/g slow-queue-queue-worker.yaml
  2. sed -ie s/name:\ queue-worker/name:\ slow-queue-worker/g slow-queue-queue-worker.yaml

Edit the faas_nats_channel environment variable, place slow-queue in the value field:

  1. - name: faas_nats_channel
  2. value: slow-queue

Deploy the new queue worker for the slow-queue queue:

  1. export CORE_NS=openfaas
  2. kubectl create -f slow-queue-queue-worker.yaml --namespace $CORE_NS

You can now invoke your function as per normal and watch the logs of the new queue worker:

  1. kubectl logs deploy/slow-queue-worker -n openfaas &
  2. curl http://127.0.0.1:8080/async-function/figlet -d "Ran on the slow-queue"

Verbose Output

The Queue Worker component enables asynchronous processing of function requests. The default verbosity level hides the message content, but this can be viewed by setting write_debug to true when deploying.

Callback request headers

The following additional request headers will be set when invoking the call back URL:

HeaderDescription
X-Call-IdThe original function call’s tracing UUID
X-Duration-SecondsTime taken in seconds to execute the original function call
X-Function-StatusHTTP status code returned by the original function call
X-Function-NameThe name of the original function that was executed