Cluster Architecture

The Cluster architecture of ArangoDB is a CP master/master model with nosingle point of failure.

With “CP” in terms of the CAP theoremwe mean that in the presence of anetwork partition, the database prefers internal consistency overavailability. With “master/master” we mean that clients can send theirrequests to an arbitrary node, and experience the same view on thedatabase regardless. “No single point of failure” means that the clustercan continue to serve requests, even if one machine fails completely.

In this way, ArangoDB has been designed as a distributed multi-modeldatabase. This section gives a short outline on the Cluster architecture andhow the above features and capabilities are achieved.

Structure of an ArangoDB Cluster

An ArangoDB Cluster consists of a number of ArangoDB instanceswhich talk to each other over the network. They play different roles,which will be explained in detail below.

The current configurationof the Cluster is held in the Agency, which is a highly-availableresilient key/value store based on an odd number of ArangoDB instancesrunning Raft Consensus Protocol.

For the various instances in an ArangoDB Cluster there are three distinctroles:

  • Agents
  • Coordinators
  • DB-Servers.

In the following sections we will shed light on each of them.

ArangoDB Cluster

Agents

One or multiple Agents form the Agency in an ArangoDB Cluster. TheAgency is the central place to store the configuration in a Cluster. Itperforms leader elections and provides other synchronization services forthe whole Cluster. Without the Agency none of the other components canoperate.

While generally invisible to the outside the Agency is the heart of theCluster. As such, fault tolerance is of course a must have for theAgency. To achieve that the Agents are using theRaft Consensus Algorithm.The algorithm formally guaranteesconflict free configuration management within the ArangoDB Cluster.

At its core the Agency manages a big configuration tree. It supportstransactional read and write operations on this tree, and other serverscan subscribe to HTTP callbacks for all changes to the tree.

Coordinators

Coordinators should be accessible from the outside. These are the onesthe clients talk to. They will coordinate cluster tasks likeexecuting queries and running Foxx services. They know where thedata is stored and will optimize where to run user supplied queries orparts thereof. Coordinators are stateless and can thus easily be shut downand restarted as needed.

DB-Servers

DB-Servers are the ones where the data is actually hosted. Theyhost shards of data and using synchronous replication a DB-Server mayeither be leader or follower for a shard. Document operations are firstapplied on the leader and then synchronously replicated to all followers.

Shards must not be accessed from the outside but indirectly through theCoordinators. They may also execute queries in part or as a whole whenasked by a Coordinator.

See Sharding below for more information.

Many sensible configurations

This architecture is very flexible and thus allows many configurations,which are suitable for different usage scenarios:

  • The default configuration is to run exactly one Coordinator andone DB-Server on each machine. This achieves the classicalmaster/master setup, since there is a perfect symmetry between thedifferent nodes, clients can equally well talk to any one of theCoordinators and all expose the same view to the data store. _Agents_can run on separate, less powerful machines.
  • One can deploy more Coordinators than DB-Servers. This is a sensibleapproach if one needs a lot of CPU power for the Foxx services,because they run on the Coordinators.
  • One can deploy more DB-Servers than Coordinators if more data capacityis needed and the query performance is the lesser bottleneck
  • One can deploy a Coordinator on each machine where an applicationserver (e.g. a node.js server) runs, and the Agents and DB-Servers_on a separate set of machines elsewhere. This avoids a network hopbetween the application server and the database and thus decreaseslatency. Essentially, this moves some of the database distributionlogic to the machine where the client runs.As you can see, the _Coordinator layer can be scaled and deployed independentlyfrom the DB-Server layer.

It is a best practice and a recommended approach to run Agent instanceson different machines than DB-Server instances.

When deploying using the tool Starterthis can be achieved by using the options —cluster.start-dbserver=false and—cluster.start-coordinator=false on the first three machines where the Starter_is started, if the desired _Agency__size is 3, or on the first 5 machinesif the desired Agency__size is 5.

The different instances that form a Cluster are supposed to be run in the sameData Center (DC), with reliable and high-speed network connection betweenall the machines participating to the Cluster.

Multi-datacenter Clusters, where the entire structure and content of a Cluster locatedin a specific DC is replicated to others Clusters located in different DCs, arepossible as well. See Datacenter to datacenter replication(DC2DC) for further details.

Sharding

Using the roles outlined above an ArangoDB Cluster is able to distributedata in so called shards across multiple DB-Servers. Shardingallows to use multiple machines to run a cluster of ArangoDBinstances that together constitute a single database. This enablesyou to store much more data, since ArangoDB distributes the dataautomatically to the different servers. In many situations one canalso reap a benefit in data throughput, again because the load canbe distributed to multiple machines.

Cluster Sharding

From the outside this process is fully transparent:An application may talk to any Coordinator andit will automatically figure out where the data is currently stored (read-case) or is to be stored (write-case). The information about the shards is shared across all Coordinators using the Agency.

Shards are configured per collection so multiple shards of data formthe collection as a whole. To determine in which shard the data is tobe stored ArangoDB performs a hash across the values. By default thishash is being created from the document _key.

For further information, please refer to theCluster Sharding section.

OneShard

The OneShard feature is only available in theEnterprise Edition,also available as managed service.

A OneShard deployment offers a practicable solution that enables significantperformance improvements by massively reducing cluster-internal communicationand allows running transactions with ACID guarantees on shard leaders.

By restricting collections to a single shard and placing them on one DB-Servernode, whole queries can be pushed to and executed on that server. TheCoordinator will only get back the final result. This setup is highlyrecommended for most graph use cases and join-heavy queries.

For graphs larger than what fits on a single DB-Server node, you can use theSmartGraphs feature to efficiently limit thenetwork hops between Coordinator and DB-Servers.

Without the OneShard feature query processing works as follows in a cluster:

  • The Coordinator accepts and analyzes the query.
  • If collections are accessed then the Coordinator distributes the accessesto collections to different DB-Servers that hold parts (shards) of thecollections in question.
  • This distributed access requires network-traffic from Coordinator toDB-Servers and back from DB-Servers to Coordinators and is thereforeexpensive.

Another cost factor is the memory and CPU time required on the Coordinatorwhen it has to process several concurrent complex queries. In suchsituations Coordinators may become a bottleneck in query processing,because they need to send and receive data on several connections, build upresults for collection accesses from the received parts followed by furtherprocessing.

OneShard vs. Sharded Cluster Setup

If the collections involved in a query have one shard and are guaranteed to beon the same DB-Server, then the OneShard optimization is applied to run thequery on the responsible node like on a single server. However, it still beinga cluster setup means collections can be replicated synchronously to ensureresilience etc.

How to use the OneShard feature?

The OneShard feature is enabled by default. If you use ArangoDB EnterpriseEdition and if the collections used in the query are eligible, then theoptimizer rule cluster-one-shard is applied automatically. To be eligible,all collections in question have to have a single shard only and thedistributeShardsLike property needs to be set to a common collection name(except the prototype collection) to ensure that their single shards are placedon the same DB-Server. There are multiple ways to achieve this:

  • If you want your entire cluster to be a OneShard deployment, use thestartup option—cluster.force-one-shard. It sets the immutable sharding databaseproperty to "single" for all newly created databases, which in turnenforces the OneShard conditions for collections that will be created in it.The _graphs system collection will be used for distributeShardsLike.

  • For individual OneShard databases, set the sharding database property to"single" to enforce the OneShard conditions for collections that will becreated in it. The _graphs system collection will be used fordistributeShardsLike. For non-OneShard databases the value is either"" or "flexible".

  • For individual OneShard collections, set the numberOfShards collectionproperty to 1 for the first collection which acts as sharding prototype forthe others. Set the distributeShardsLike property to the name of theprototype collection for all other collections. You may also use an existingcollection which does not have distributeShardsLike set itself for all yourcollections, such as the _graphs system collection.

The prototype collection does not only control the sharding, but also thereplication factor for all collections which follow its example. If the_graphs system collection is used for distributeShardsLike, then thereplication factor can be adjusted by changing the replicationFactorproperty of the _graphs collection (affecting this and all followingcollections) or via the startup option —cluster.system-replication-factor(affecting all system collections and all following collections).

Example

The easiest way to make use of the OneShard feature is to create a databasewith the extra option { sharding: "single" }. As done in the followingexample:

  1. arangosh> db._createDatabase("oneShardDB", { sharding: "single" } )
  2. arangosh> db._useDatabase("oneShardDB")
  3. arangosh@oneShardDB> db._properties()
  4. {
  5. "id" : "6010005",
  6. "name" : "oneShardDB",
  7. "isSystem" : false,
  8. "sharding" : "single",
  9. "replicationFactor" : 1,
  10. "writeConcern" : 1,
  11. "path" : ""
  12. }

Now we can go ahead and create a collection as usual:

  1. arangosh@oneShardDB> db._create("example1")
  2. arangosh@oneShardDB> db.example1.properties()
  3. {
  4. "isSmart" : false,
  5. "isSystem" : false,
  6. "waitForSync" : false,
  7. "shardKeys" : [
  8. "_key"
  9. ],
  10. "numberOfShards" : 1,
  11. "keyOptions" : {
  12. "allowUserKeys" : true,
  13. "type" : "traditional"
  14. },
  15. "replicationFactor" : 2,
  16. "minReplicationFactor" : 1,
  17. "writeConcern" : 1,
  18. "distributeShardsLike" : "_graphs",
  19. "shardingStrategy" : "hash",
  20. "cacheEnabled" : false
  21. }

As you can see the numberOfShards is set to 1 and distributeShardsLikeis set to _graphs. These attributes have been automatically been setbecause we specified the { "sharding": "single" } options object whencreating the database. To do this manually one would create a collection inthe following way:

  1. db._create("example2", { "numberOfShards": 1 , "distributeShardsLike": "_graphs" })

Here we used again the _graphs collection, but any other existingcollection, that has not been created with the distributeShardsLikeoption could have been used here.

Running Queries

For this arangosh example, we first insert a few documents into a collection,then create a query and explain it to inspect the execution plan.

  1. arangosh@oneShardDB> for (let i = 0; i < 10000; i++) { db.example.insert({ "value" : i }); }
  2. arangosh@oneShardDB> q = "FOR doc IN @@collection FILTER doc.value % 2 == 0 SORT doc.value ASC LIMIT 10 RETURN doc";
  3. arangosh@oneShardDB> db._explain(q, { "@collection" : "example" })
  4. Query String (88 chars, cacheable: true):
  5. FOR doc IN @@collection FILTER doc.value % 2 == 0 SORT doc.value ASC LIMIT 10 RETURN doc
  6. Execution plan:
  7. Id NodeType Site Est. Comment
  8. 1 SingletonNode DBS 1 * ROOT
  9. 2 EnumerateCollectionNode DBS 10000 - FOR doc IN example /* full collection scan, 1 shard(s) */ FILTER ((doc.`value` % 2) == 0) /* early pruning */
  10. 5 CalculationNode DBS 10000 - LET #3 = doc.`value` /* attribute expression */ /* collections used: doc : example */
  11. 6 SortNode DBS 10000 - SORT #3 ASC /* sorting strategy: constrained heap */
  12. 7 LimitNode DBS 10 - LIMIT 0, 10
  13. 9 RemoteNode COOR 10 - REMOTE
  14. 10 GatherNode COOR 10 - GATHER
  15. 8 ReturnNode COOR 10 - RETURN doc
  16. Indexes used:
  17. none
  18. Optimization rules applied:
  19. Id RuleName
  20. 1 move-calculations-up
  21. 2 move-filters-up
  22. 3 move-calculations-up-2
  23. 4 move-filters-up-2
  24. 5 cluster-one-shard
  25. 6 sort-limit
  26. 7 move-filters-into-enumerate

As it can be seen in the explain output almost the complete query isexecuted on the DB-Server (DBS for nodes 1-7) and only 10 documents aretransferred to the Coordinator. In case we do the same with a collectionthat consists of several shards we get a different result:

  1. arangosh> db._createDatabase("shardedDB")
  2. arangosh> db._useDatabase("shardedDB")
  3. arangosh@shardedDB> db._properties()
  4. {
  5. "id" : "6010017",
  6. "name" : "shardedDB",
  7. "isSystem" : false,
  8. "sharding" : "flexible",
  9. "replicationFactor" : 1,
  10. "writeConcern" : 1,
  11. "path" : ""
  12. }
  13. arangosh@shardedDB> db._create("example", { numberOfShards : 5})
  14. arangosh@shardedDB> for (let i = 0; i < 10000; i++) { db.example.insert({ "value" : i }); }
  15. arangosh@shardedDB> db._explain(q, { "@collection" : "example" })
  16. Query String (88 chars, cacheable: true):
  17. FOR doc IN @@collection FILTER doc.value % 2 == 0 SORT doc.value ASC LIMIT 10 RETURN doc
  18. Execution plan:
  19. Id NodeType Site Est. Comment
  20. 1 SingletonNode DBS 1 * ROOT
  21. 2 EnumerateCollectionNode DBS 10000 - FOR doc IN example /* full collection scan, 5 shard(s) */ FILTER ((doc.`value` % 2) == 0) /* early pruning */
  22. 5 CalculationNode DBS 10000 - LET #3 = doc.`value` /* attribute expression */ /* collections used: doc : example */
  23. 6 SortNode DBS 10000 - SORT #3 ASC /* sorting strategy: constrained heap */
  24. 11 RemoteNode COOR 10000 - REMOTE
  25. 12 GatherNode COOR 10000 - GATHER #3 ASC /* parallel, sort mode: heap */
  26. 7 LimitNode COOR 10 - LIMIT 0, 10
  27. 8 ReturnNode COOR 10 - RETURN doc
  28. Indexes used:
  29. none
  30. Optimization rules applied:
  31. Id RuleName
  32. 1 move-calculations-up
  33. 2 move-filters-up
  34. 3 move-calculations-up-2
  35. 4 move-filters-up-2
  36. 5 scatter-in-cluster
  37. 6 distribute-filtercalc-to-cluster
  38. 7 distribute-sort-to-cluster
  39. 8 remove-unnecessary-remote-scatter
  40. 9 sort-limit
  41. 10 move-filters-into-enumerate
  42. 11 parallelize-gather

It can be checked whether the OneShard feature is active or not byinspecting the explain output. If the list of rules containscluster-one-shard then the feature is active for the given query.

Without the OneShard feature all documents have potentially to be sent tothe Coordinator for further processing. With this simple query this is actuallynot true, because some other optimizations are performed that reduce the numberof documents. But still, a considerable amount of documents has to betransferred from DB-Server to Coordinator only to apply a LIMIT of 10documents there. The estimate for the RemoteNode is 10,000 in this example,whereas it is 10 in the OneShard case.

ACID Transactions on Leader Shards

ArangoDB’s transactional guarantees are tunable. For transactions to be ACIDon the leader shards in a cluster, a few things need to be considered:

  • The AQL query or Stream Transactionmust be eligible for the OneShard optimization, so that it is executed on asingle DB-Server node.
  • To ensure durability, enable waitForSync on query level to wait until datamodifications have been written to disk.
  • The collection option writeConcern: 2 makes sure that a transaction is onlysuccessful if at least one replica is in sync.
  • The RocksDB engine supports intermediate commits for larger documentoperations, potentially breaking the atomicity of transactions. To preventthis for individual queries you can increase intermediateCommitSize(default 512 MB) and intermediateCommitCount accordingly as query option.

Synchronous replication

In an ArangoDB Cluster, the replication among the data stored by the _DB-Servers_is synchronous.

Synchronous replication works on a per-shard basis. Using the option replicationFactor,one configures for each collection how many copies of each shard are kept in the Cluster.

If a collection has a replication factor of 1, its data is notreplicated to other DB-Servers. This exposes you to a risk of data loss, ifthe machine running the DB-Server with the only copy of the data fails permanently.

The replication factor has to be set to a value equals or higher than 2to achieve minimal data redundancy via the synchronous replication.

An equal-or-higher-than 2 replication factor has to be set explicitlywhen the collection is created, or can be set later at run time if you forgotto set it at creation time.

When using a Cluster, please make sure all the collections that are important(and should not be lost in any case) have a replication factor equal or higherthan 2.

At any given time, one of the copies is declared to be the leader andall other replicas are followers. Internally, write operations for this shard_are always sent to the _DB-Server which happens to hold the leader copy,which in turn replicates the changes to all followers before the operationis considered to be done and reported back to the Coordinator.Internally, read operations are all served by the DB-Server holding the leader copy,this allows to provide snapshot semantics for complex transactions.

Using synchronous replication alone will guarantee consistency and high availabilityat the cost of reduced performance: write requests will have a higher latency(due to every write-request having to be executed on the followers) andread requests will not scale out as only the leader is being asked.

In a Cluster, synchronous replication will be managed by the Coordinators for the client. The data will always be stored on the DB-Servers.

The following example will give you an idea of how synchronous operationhas been implemented in ArangoDB Cluster:

  • Connect to a Coordinator via arangosh
  • Create a collection

127.0.0.1:8530@_system> db._create(“test”, {“replicationFactor”: 2})

  • The Coordinator will figure out a leader and one follower and createone shard (as this is the default)
  • Insert data

127.0.0.1:8530@_system> db.test.insert({“foo”: “bar”})

  • The Coordinator will write the data to the leader, which in turn willreplicate it to the follower.
  • Only when both were successful the result is reported to be successful:
  1. {
  2. "_id" : "test/7987",
  3. "_key" : "7987",
  4. "_rev" : "7987"
  5. }

Obviously, synchronous replication comes at the cost of an increased latency forwrite operations, simply because there is one more network hop within theCluster for every request. Therefore the user can set the _replicationFactor_to 1, which means that only one copy of each shard is kept, therebyswitching off synchronous replication. This is a suitable setting forless important or easily recoverable data for which low latency writeoperations matter.

Automatic failover

Failure of a follower

If a DB-Server that holds a follower copy of a shard fails, then the leader_can no longer synchronize its changes to that _follower. After a short timeout(3 seconds), the leader gives up on the follower and declares it to beout of sync.

One of the following two cases can happen:

a) If another DB-Server (that does not hold a replica for this shard already) is available in the Cluster, a new follower will automatically be created on this other DB-Server (so the replication factor constraint is satisfied again).

b) If no other DB-Server (that does not hold a replica for this shard already) is available, the service continues with one follower less than the number prescribed by the replication factor.

If the old DB-Server with the follower copy comes back, one of the followingtwo cases can happen:

a) If previously we were in case a), the DB-Server recognizes that there is a new follower that was elected in the meantime, so it will no longer be a follower for that shard.

b) If previously we were in case b), the DB-Server automatically resynchronizes its data with the leader. The replication factor constraint is now satisfied again and order is restored.

Failure of a leader

If a DB-Server that holds a leader copy of a shard fails, then the leader_can no longer serve any requests. It will no longer send a heartbeat tothe _Agency. Therefore, a supervision process running in the Raft__leader_of the Agency, can take the necessary action (after 15 seconds of missingheartbeats), namely to promote one of the _DB-Servers that hold in-syncreplicas of the shard to leader for that shard. This involves areconfiguration in the Agency and leads to the fact that Coordinators_now contact a different _DB-Server for requests to this shard. Serviceresumes. The other surviving replicas automatically resynchronize theirdata with the new leader.

In addition to the above, one of the following two cases cases can happen:

a) If another DB-Server (that does not hold a replica for this shard already) is available in the Cluster, a new follower will automatically be created on this other DB-Server (so the replication factor constraint is satisfied again).b) If no other DB-Server (that does not hold a replica for this shard already) is available the service continues with one follower less than the number prescribed by the replication factor.

When the DB-Server with the original leader copy comes back, it recognizesthat a new leader was elected in the meantime, and one of the followingtwo cases can happen:

a) If previously we were in case a), since also a new follower was created and the replication factor constraint is satisfied, the DB-Server will no longer be a follower for that shard.b) If previously we were in case b), the DB-Server notices that it now holds a follower__replica of that shard and it resynchronizes its data with the new leader. The replication factor constraint is now satisfied again, and order is restored.

The following example will give you an idea of how _failover_has been implemented in ArangoDB Cluster:

  • The leader of a shard (let’s name it DBServer001) is going down.
  • A Coordinator is asked to return a document:

127.0.0.1:8530@_system> db.test.document(“100069”)

  • The Coordinator determines which server is responsible for this documentand finds DBServer001
  • The Coordinator tries to contact DBServer001 and timeouts because it isnot reachable.
  • After a short while the supervision (running in parallel on the Agency)will see that heartbeats from DBServer001 are not coming in
  • The supervision promotes one of the followers (say DBServer002), thatis in sync, to be leader and makes DBServer001 a follower.
  • As the Coordinator continues trying to fetch the document it will see thatthe leader changed to DBServer002
  • The Coordinator tries to contact the new leader (DBServer002) and returnsthe result:
  1. {
  2. "_key" : "100069",
  3. "_id" : "test/100069",
  4. "_rev" : "513",
  5. "foo" : "bar"
  6. }
  • After a while the supervision declares DBServer001 to be completely dead.
  • A new follower is determined from the pool of DB-Servers.
  • The new follower syncs its data from the leader and order is restored.Please note that there may still be timeouts. Depending on when exactlythe request has been done (in regard to the supervision) and dependingon the time needed to reconfigure the Cluster the Coordinator might failwith a timeout error.

Shard movement and resynchronization

All shard data synchronizations are done in an incremental way, such thatresynchronizations are quick. This technology allows to move shards(follower and leader ones) between DB-Servers without service interruptions.Therefore, an ArangoDB Cluster can move all the data on a specific DB-Server_to other _DB-Servers and then shut down that server in a controlled way.This allows to scale down an ArangoDB Cluster without service interruption,loss of fault tolerance or data loss. Furthermore, one can re-balance thedistribution of the shards, either manually or automatically.

All these operations can be triggered via a REST/JSON API or via thegraphical web UI. All fail-over operations are completely handled withinthe ArangoDB Cluster.

Microservices and zero administation

The design and capabilities of ArangoDB are geared towards usage inmodern microservice architectures of applications. With theFoxx services it is very easy to deploy a datacentric microservice within an ArangoDB Cluster.

In addition, one can deploy multiple instances of ArangoDB within thesame project. One part of the project might need a scalable documentstore, another might need a graph database, and yet another might needthe full power of a multi-model database actually mixing the variousdata models. There are enormous efficiency benefits to be reaped bybeing able to use a single technology for various roles in a project.

To simplify life of the devops in such a scenario we try as much aspossible to use a zero administration approach for ArangoDB. A runningArangoDB Cluster is resilient against failures and essentially repairsitself in case of temporary failures.

Deployment

An ArangoDB Cluster can be deployed in several ways, e.g. by manuallystarting all the needed instances, by using the toolStarter, in Docker and in Kubernetes.

See the Cluster Deploymentchapter for instructions.

Cluster ID

Every ArangoDB instance in a Cluster is assigned a uniqueID during its startup. Using its ID a node is identifiablethroughout the Cluster. All cluster operations will communicatevia this ID.