Loki’s Architecture

This document will expand on the information detailed in the LokiOverview.

Multi Tenancy

All data - both in memory and in long-term storage - is partitioned by atenant ID, pulled from the X-Scope-OrgID HTTP header in the request when Lokiis running in multi-tenant mode. When Loki is not in multi-tenant mode, theheader is ignored and the tenant ID is set to “fake”, which will appear in theindex and in stored chunks.

Modes of Operation

modes_diagram

Loki has a set of components (defined below in Components) whichare internally referred to as modules. Each component spawns a gRPC server forinternal traffic and an HTTP/1 server for external API requests. All componentscome with an HTTP/1 server, but most only expose readiness, health, and metricsendpoints.

Which component Loki runs is determined by either the -target flag at thecommand line or the target: <string> section in Loki’s config file. When thevalue of target is all, Loki will run all of its components in a singleprocess. This is referred to as “single process”, “single binary”, or monolithicmode. Monolithic mode is the default deployment of Loki when Loki is installedusing Helm.

When target is not set to all (i.e., it is set to querier, ingester,or distributor), then Loki is said to be running in “horizontally scalable”,or microservices, mode.

Each component of Loki, such as the ingesters and distributors, communicate withone another over gRPC using the gRPC listen port defined in the Loki config.When running components in monolithic mode, this is still true: each component,although running in the same process, will connect to each other over the localnetwork for inter-component communication.

Single process mode is ideally suited for local development, small workloads,and for evaluation purposes. Monolithic mode can be scaled with multipleprocesses with the following limitations:

  1. Local index and local storage cannot currently be used when runningmonolithic mode with more than one replica, as each replica must be able toaccess the same storage backend, and local storage is not safe for concurrentaccess.
  2. Individual components cannot be scaled independently, so it is not possibleto have more read components than write components.

Components

Distributor

The distributor service is responsible for handling incoming streams byclients. It’s the first stop in the write path for log data. Once thedistributor receives a set of streams, each stream is validated for correctnessand to ensure that it is within the configured tenant (or global) limits. Validchunks are then split into batches and sent to multiple ingestersin parallel.

Hashing

Distributors use consistent hashing in conjunction with a configurablereplication factor to determine which instances of the ingester service shouldreceive a given stream.

A stream is a set of logs associated to a tenant and a unique labelset. Thestream is hashed using both the tenant ID and the labelset and then the hash isused to find the ingesters to send the stream to.

A hash ring stored in Consul is used to achieveconsistent hashing; all ingesters register themselves into the hashring with a set of tokens they own. Each token is a random unsigned 32-bitnumber. Along with a set of tokens, ingesters register their state into thehash ring. The state JOINING, and ACTIVE may all receive write requests, whileACTIVE and LEAVING ingesters may receive read requests. When doing a hashlookup, distributors only use tokens for ingesters who are in the appropriatestate for the request.

To do the hash lookup, distributors find the smallest appropriate token whosevalue is larger than the hash of the stream. When the replication factor islarger than 1, the next subsequent tokens (clockwise in the ring) that belong todifferent ingesters will also be included in the result.

The effect of this hash set up is that each token that an ingester owns isresponsible for a range of hashes. If there are three tokens with values 0, 25,and 50, then a hash of 3 would be given to the ingester that owns the token 25;the ingester owning token 25 is responsible for the hash range of 1-25.

Quorum consistency

Since all distributors share access to the same hash ring, write requests can besent to any distributor.

To ensure consistent query results, Loki usesDynamo-stylequorum consistency on reads and writes. This means that the distributor will waitfor a positive response of at least one half plus one of the ingesters to sendthe sample to before responding to the client that initiated the send.

Ingester

The ingester service is responsible for writing log data to long-termstorage backends (DynamoDB, S3, Cassandra, etc.) on the write path and returninglog data for in-memory queries on the read path.

Ingesters contain a lifecycler which manages the lifecycle of an ingester inthe hash ring. Each ingester has a state of either PENDING, JOINING,ACTIVE, LEAVING, or UNHEALTHY:

  1. PENDING is an Ingester’s state when it is waiting for a handoff fromanother ingester that is LEAVING.

  2. JOINING is an Ingester’s state when it is currently inserting its tokensinto the ring and initializing itself. It may receive write requests fortokens it owns.

  3. ACTIVE is an Ingester’s state when it is fully initialized. It may receiveboth write and read requests for tokens it owns.

  4. LEAVING is an Ingester’s state when it is shutting down. It may receiveread requests for data it still has in memory.

  5. UNHEALTHY is an Ingester’s state when it has failed to heartbeat toConsul. UNHEALHTY is set by the distributor when it periodically checks the ring.

Each log stream that an ingester receives is built up into a set of many“chunks” in memory and flushed to the backing storage backend at a configurableinterval.

Chunks are compressed and marked as read-only when:

  1. The current chunk has reached capacity (a configurable value).
  2. Too much time has passed without the current chunk being updated
  3. A flush occurs.

Whenever a chunk is compressed and marked as read-only, a writable chunk takesits place.

If an ingester process crashes or exits abruptly, all the data that has not yetbeen flushed will be lost. Loki is usually configured to replicate multiplereplicas (usually 3) of each log to mitigate this risk.

When a flush occurs to a persistent storage provider, the chunk is hashed basedon its tenant, labels, and contents. This means that multiple ingesters with thesame copy of data will not write the same data to the backing store twice, butif any write failed to one of the replicas, multiple differing chunk objectswill be created in the backing store. See Querier for how data isdeduplicated.

Handoff

By default, when an ingester is shutting down and tries to leave the hash ring,it will wait to see if a new ingester tries to enter before flushing and willtry to initiate a handoff. The handoff will transfer all of the tokens andin-memory chunks owned by the leaving ingester to the new ingester.

Before joining the hash ring, ingesters will wait in PENDING state for ahandoff to occur. After a configurable timeout, ingesters in the PENDING statethat have not received a transfer will join the ring normally, inserting a newset of tokens.

This process is used to avoid flushing all chunks when shutting down, which is aslow process.

Querier

The querier service handles queries using the LogQL querylanguage, fetching logs both from the ingesters and long-term storage.

Queriers query all ingesters for in-memory data before falling back torunning the same query against the backend store. Because of the replicationfactor, it is possible that the querier may receive duplicate data. To resolvethis, the querier internally deduplicates data that has the same nanosecondtimestamp, label set, and log message.

Chunk Format

  1. -------------------------------------------------------------------
  2. | | |
  3. | MagicNumber(4b) | version(1b) |
  4. | | |
  5. -------------------------------------------------------------------
  6. | block-1 bytes | checksum (4b) |
  7. -------------------------------------------------------------------
  8. | block-2 bytes | checksum (4b) |
  9. -------------------------------------------------------------------
  10. | block-n bytes | checksum (4b) |
  11. -------------------------------------------------------------------
  12. | #blocks (uvarint) |
  13. -------------------------------------------------------------------
  14. | #entries(uvarint) | mint, maxt (varint) | offset, len (uvarint) |
  15. -------------------------------------------------------------------
  16. | #entries(uvarint) | mint, maxt (varint) | offset, len (uvarint) |
  17. -------------------------------------------------------------------
  18. | #entries(uvarint) | mint, maxt (varint) | offset, len (uvarint) |
  19. -------------------------------------------------------------------
  20. | #entries(uvarint) | mint, maxt (varint) | offset, len (uvarint) |
  21. -------------------------------------------------------------------
  22. | checksum(from #blocks) |
  23. -------------------------------------------------------------------
  24. | #blocks section byte offset |
  25. -------------------------------------------------------------------

mint and maxt describe the minimum and maximum Unix nanosecond timestamp,respectively.

Block Format

A block is comprised of a series of entries, each of which is an individual logline.

Note that the bytes of a block are stored compressed using Gzip. The followingis their form when uncompressed:

  1. -------------------------------------------------------------------
  2. | ts (varint) | len (uvarint) | log-1 bytes |
  3. -------------------------------------------------------------------
  4. | ts (varint) | len (uvarint) | log-2 bytes |
  5. -------------------------------------------------------------------
  6. | ts (varint) | len (uvarint) | log-3 bytes |
  7. -------------------------------------------------------------------
  8. | ts (varint) | len (uvarint) | log-n bytes |
  9. -------------------------------------------------------------------

ts is the Unix nanosecond timestamp of the logs, while len is the length inbytes of the log entry.

Chunk Store

The chunk store is Loki’s long-term data store, designed to supportinteractive querying and sustained writing without the need for backgroundmaintenance tasks. It consists of:

Unlike the other core components of Loki, the chunk store is not a separateservice, job, or process, but rather a library embedded in the two servicesthat need to access Loki data: the ingester and querier.

The chunk store relies on a unified interface to the“NoSQL“ stores (DynamoDB, Bigtable, andCassandra) that can be used to back the chunk store index. This interfaceassumes that the index is a collection of entries keyed by:

  • A hash key. This is required for all reads and writes.
  • A range key. This is required for writes and can be omitted for reads,which can be queried by prefix or range.

The interface works somewhat differently across the supported databases:

  • DynamoDB supports range and hash keys natively. Index entries are thusmodelled directly as DynamoDB entries, with the hash key as the distributionkey and the range as the DynamoDB range key.
  • For Bigtable and Cassandra, index entries are modelled as individual columnvalues. The hash key becomes the row key and the range key becomes the columnkey.

A set of schemas are used to map the matchers and label sets used on reads andwrites to the chunk store into appropriate operations on the index. Schemas havebeen added as Loki has evolved, mainly in an attempt to better load balancewrites and improve query performance.

The current schema recommendation is the v10 schema.

Read Path

To summarize, the read path works as follows:

  1. The querier receives an HTTP/1 request for data.
  2. The querier passes the query to all ingesters for in-memory data.
  3. The ingesters receive the read request and return data matching the query, ifany.
  4. The querier lazily loads data from the backing store and runs the queryagainst it if no ingesters returned data.
  5. The querier iterates over all received data and deduplicates, returning afinal set of data over the HTTP/1 connection.

Write Path

chunk_diagram

To summarize, the write path works as follows:

  1. The distributor receives an HTTP/1 request to store data for streams.
  2. Each stream is hashed using the hash ring.
  3. The distributor sends each stream to the appropriate ingesters and theirreplicas (based on the configured replication factor).
  4. Each ingester will create a chunk or append to an existing chunk for thestream’s data. A chunk is unique per tenant and per labelset.
  5. The distributor responds with a success code over the HTTP/1 connection.