Mirroring

Agent mirrors can be used interchangeably when processing a search query. Manticore instance (can be multiple) hosting the distributed table where the mirrored agents are defined keeps track of mirror status (alive or dead) and response times, and does automatic failover and load balancing based on that.

Agent mirrors

  1. agent = node1|node2|node3:9312:shard2

The above example declares that ‘node1:9312’, ‘node2:9312’, and ‘node3:9312’ all have a table called shard2, and can be used as interchangeable mirrors. If any single of those servers go down, the queries will be distributed between the other two. When it gets back up, master will detect that and begin routing queries to all three nodes again.

Mirror may also include individual table list, as:

  1. agent = node1:9312:node1shard2|node2:9312:node2shard2

This works essentially the same as the previous example, but different table names will be used when querying different severs: node1shard2 when querying node1:9312, and node2shard when querying node2:9312.

By default, all queries are routed to the best of the mirrors. The best one is picked based on the recent statistics, as controlled by the ha_period_karma config directive. Master stores a number of metrics (total query count, error count, response time, etc) recently observed for every agent. It groups those by time spans, and karma is that time span length. The best agent mirror is then determined dynamically based on the last 2 such time spans. Specific algorithm that will be used to pick a mirror can be configured ha_strategy directive.

The karma period is in seconds and defaults to 60 seconds. Master stores up to 15 karma spans with per-agent statistics for instrumentation purposes (see SHOW AGENT STATUS statement). However, only the last 2 spans out of those are ever used for HA/LB logic.

When there are no queries, master sends a regular ping command every ha_ping_interval milliseconds in order to have some statistics and at least check, whether the remote host is still alive. ha_ping_interval defaults to 1000 msec. Setting it to 0 disables pings and statistics will only be accumulated based on actual queries.

Example:

  1. # sharding table over 4 servers total
  2. # in just 2 shards but with 2 failover mirrors for each shard
  3. # node1, node2 carry shard1 as local
  4. # node3, node4 carry shard2 as local
  5. # config on node1, node2
  6. agent = node3:9312|node4:9312:shard2
  7. # config on node3, node4
  8. agent = node1:9312|node2:9312:shard1

Load balancing

Load balancing is turned on by default for any distributed table using mirroring. By default queries are distributed randomly among the mirrors. To change this behaviour you can use ha_strategy.

ha_strategy

  1. ha_strategy = {random|nodeads|noerrors|roundrobin}

Agent mirror selection strategy for load balancing. Optional, default is random.

The strategy used for mirror selection, or in other words, choosing a specific agent mirror in a distributed table. Essentially, this directive controls how exactly master does the load balancing between the configured mirror agent nodes. The following strategies are implemented:

Simple random balancing

The default balancing mode. Simple linear random distribution among the mirrors. That is, equal selection probability are assigned to every mirror. Kind of similar to round-robin (RR), but unlike RR, does not impose a strict selection order.

  • Example

Example

  1. ha_strategy = random

Adaptive randomized balancing

The default simple random strategy does not take mirror status, error rate and, most importantly, actual response latencies into account. So to accommodate for heterogeneous clusters and/or temporary spikes in agent node load, we have a group of balancing strategies that dynamically adjusts the probabilities based on the actual query latencies observed by the master.

The adaptive strategies based on latency-weighted probabilities basically work as follows:

  • latency stats are accumulated in blocks of ha_period_karma seconds;
  • once per karma period latency-weighted probabilities get recomputed;
  • once per request (including ping requests) “dead or alive” flag is adjusted.

Currently, we begin with equal probabilities (or percentages, for brevity), and on every step, scale them by the inverse of the latencies observed during the last “karma” period, and then renormalize them. For example, if during the first 60 seconds after the master startup 4 mirrors had latencies of 10, 5, 30, and 3 msec/query respectively, the first adjustment step would go as follow:

  • initial percentages: 0.25, 0.25, 0.25, 0.25;
  • observed latencies: 10 ms, 5 ms, 30 ms, 3 ms;
  • inverse latencies: 0.1, 0.2, 0.0333, 0.333;
  • scaled percentages: 0.025, 0.05, 0.008333, 0.0833;
  • renormalized percentages: 0.15, 0.30, 0.05, 0.50.

Meaning that the 1st mirror would have a 15% chance of being chosen during the next karma period, the 2nd one a 30% chance, the 3rd one (slowest at 30 ms) only a 5% chance, and the 4th and the fastest one (at 3 ms) a 50% chance. Then, after that period, the second adjustment step would update those chances again, and so on.

The rationale here is, once the observed latencies stabilize, the latency weighted probabilities stabilize as well. So all these adjustment iterations are supposed to converge at a point where the average latencies are (roughly) equal over all mirrors.

nodeads

Latency-weighted probabilities, but dead mirrors are excluded from the selection. “Dead” mirror is defined as a mirror that resulted in multiple hard errors (eg. network failure, or no answer, etc) in a row.

  • Example

Example

  1. ha_strategy = nodeads

noerrors

Latency-weighted probabilities, but mirrors with worse errors/success ratio are excluded from the selection.

  • Example

Example

  1. ha_strategy = noerrors

Round-robin balancing

Simple round-robin selection, that is, selecting the 1st mirror in the list, then the 2nd one, then the 3rd one, etc, and then repeating the process once the last mirror in the list is reached. Unlike with the randomized strategies, RR imposes a strict querying order (1, 2, 3, …, N-1, N, 1, 2, 3, … and so on) and guarantees that no two subsequent queries will be sent to the same mirror.

  • Example

Example

  1. ha_strategy = roundrobin

Instance-wide options

ha_period_karma

  1. ha_period_karma = 2m

ha_period_karma defines agent mirror statistics window size, in seconds (or time suffixed). Optional, default is 60.

For a distributed table with agent mirrors in it, server tracks several different per-mirror counters. These counters are then used for failover and balancing. (Server picks the best mirror to use based on the counters.) Counters are accumulated in blocks of ha_period_karma seconds.

After beginning a new block, master may still use the accumulated values from the previous one, until the new one is half full. Thus, any previous history stops affecting the mirror choice after 1.5 times ha_period_karma seconds at most.

Despite that at most 2 blocks are used for mirror selection, up to 15 last blocks are actually stored, for instrumentation purposes. They can be inspected using SHOW AGENT STATUS statement.

ha_ping_interval

  1. ha_ping_interval = 3s

ha_ping_interval defines interval between agent mirror pings, in milliseconds (or time suffixed). Optional, default is 1000.

For a distributed table with agent mirrors in it, server sends all mirrors a ping command during the idle periods. This is to track the current agent status (alive or dead, network roundtrip, etc). The interval between such pings is defined by this directive.

To disable pings, set ha_ping_interval to 0.

Setting up replication

Write transaction (any result of INSERT, REPLACE, DELETE, TRUNCATE, UPDATE, COMMIT) can be replicated to other cluster nodes before the transaction is fully applied on the current node. Currently replication is supported for percolate and rt tables in Linux an MacOS. Manticore Search packages for Windows do not provide replication support.

Manticore’s replication is based on Galera library and features the following:

  • true multi-master - read and write to any node at any time
  • virtually synchronous replication - no slave lag, no data is lost after a node crash
  • hot standby - no downtime during failover (since there is no failover)
  • tightly coupled - all the nodes hold the same state. No diverged data between nodes allowed
  • automatic node provisioning - no need to manually back up the database and restore it on a new node
  • easy to use and deploy
  • detection and automatic eviction of unreliable nodes
  • certification based replication

To use replication in Manticore Search:

  • data_dir option should be set in section “searchd” of the configuration file. Replication is not supported in the plain mode
  • there should be either:
    • a listen directive specified (without specifying a protocol) containing an IP address accessible by other nodes
    • or node_address with an accessible IP address
  • optionally you can set unique values for server_id on each cluster node. If no value is set, the node will try to use the MAC address (or a random number if that fails) to generate the server_id.

If there is no replication listen directive set Manticore will use the first two free ports in the range of 200 ports after the default protocol listening port for each created cluster. To set replication ports manually the listen directive (of replication type) port range should be defined and the address/port range pairs should not intersect between different nodes on the same server. As a rule of thumb, the port range should specify no less than two ports per cluster.

Replication cluster

Replication cluster is a set of nodes among which a write transaction gets replicated. Replication is configured on per-table basis, meaning that one table can be assigned to only one cluster. There is no restriction on how many tables a cluster can have. All transactions such as INSERT, REPLACE, DELETE, TRUNCATE in any percolate or real-time table belonging to a cluster are replicated to all the other nodes in the cluster. Replication is multi-master, so writes to any particular node or to multiple nodes simultaneously work equally well.

In most cases you create cluster with CREATE CLUSTER <cluster name> and join cluster with JOIN CLUSTER <cluster name> at 'host:port', but in rare cases you may want to fine-tune the behaviour of CREATE/JOIN CLUSTER. The options are:

name

Specifies cluster name. Should be unique.

path

Data directory for a write-set cache replication and incoming tables from other nodes. Should be unique among the other clusters in the node. Default is data_dir. Should be specified in the form of a path to an existing directory relative to the data_dir.

nodes

List of address:port pairs for all the nodes in the cluster (comma separated). Node’s API interface should be used for this option. It can contain the current node’s address too. This list is used to join a node to the cluster and rejoin it after restart.

options

Other options that are passed over directly to Galera replication plugin as described here Galera Documentation Parameters

Write statements

For SQL interface all write statements such as INSERT, REPLACE, DELETE, TRUNCATE, UPDATE that change the content of a cluster’s table should use cluster_name:index_name expression in place of a table name to make sure the change is propagated to all replicas in the cluster. An error will be triggered otherwise.

All write statements for HTTP interface to a cluster’s table should set cluster property along with table name. An error will be triggered otherwise.

Auto ID generated for a table in a cluster should be valid as soon as server_id is not misconfigured.

  • SQL
  • JSON
  • PHP
  • Python
  • Javascript
  • Java

SQL JSON PHP Python Javascript Java

  1. INSERT INTO posts:weekly_index VALUES ( 'iphone case' )
  2. TRUNCATE RTINDEX click_query:weekly_index
  3. UPDATE INTO posts:rt_tags SET tags=(101, 302, 304) WHERE MATCH ('use') AND id IN (1,101,201)
  4. DELETE FROM clicks:rt WHERE MATCH ('dumy') AND gid>206
  1. POST /insert -d '
  2. {
  3. "cluster":"posts",
  4. "index":"weekly_index",
  5. "doc":
  6. {
  7. "title" : "iphone case",
  8. "price" : 19.85
  9. }
  10. }'
  11. POST /delete -d '
  12. {
  13. "cluster":"posts",
  14. "index": "weekly_index",
  15. "id":1
  16. }'
  1. $index->addDocuments([
  2. 1, ['title' => 'iphone case', 'price' => 19.85]
  3. ]);
  4. $index->deleteDocument(1);
  1. indexApi.insert({"cluster":"posts","index":"weekly_index","doc":{"title":"iphone case","price":19.85}})
  2. indexApi.delete({"cluster":"posts","index":"weekly_index","id":1})
  1. res = await indexApi.insert({"cluster":"posts","index":"weekly_index","doc":{"title":"iphone case","price":19.85}});
  2. res = await indexApi.delete({"cluster":"posts","index":"weekly_index","id":1});
  1. InsertDocumentRequest newdoc = new InsertDocumentRequest();
  2. HashMap<String,Object> doc = new HashMap<String,Object>(){{
  3. put("title","Crossbody Bag with Tassel");
  4. put("price",19.85);
  5. }};
  6. newdoc.index("weekly_index").cluster("posts").id(1L).setDoc(doc);
  7. sqlresult = indexApi.insert(newdoc);
  8. DeleteDocumentRequest deleteRequest = new DeleteDocumentRequest();
  9. deleteRequest.index("weekly_index").cluster("posts").setId(1L);
  10. indexApi.delete(deleteRequest);

Read statements

Read statements such as SELECT, CALL PQ, DESCRIBE can use either regular table names not prepended with a cluster name or cluster_name:index_name. In this case cluster_name is just ignored.

In HTTP endpoint json/search you can specify cluster property if you like, but can also omit it.

  • SQL
  • JSON

SQL JSON

  1. SELECT * FROM weekly_index
  2. CALL PQ('posts:weekly_index', 'document is here')
  1. POST /search -d '
  2. {
  3. "cluster":"posts",
  4. "index":"weekly_index",
  5. "query":{"match":{"title":"keyword"}}
  6. }'
  7. POST /search -d '
  8. {
  9. "index":"weekly_index",
  10. "query":{"match":{"title":"keyword"}}
  11. }'

Cluster parameters

Replication plugin options can be changed using SET statement (see the example).

See Galera Documentation Parameters for a list of available options.

  • SQL
  • JSON

SQL JSON

  1. SET CLUSTER click_query GLOBAL 'pc.bootstrap' = 1
  1. POST /cli -d "
  2. SET CLUSTER click_query GLOBAL 'pc.bootstrap' = 1
  3. "

Cluster with diverged nodes

Sometimes replicated nodes can diverge from each other. The state of all the nodes might turn into non-primary due to a network split between nodes, a cluster crash, or if the replication plugin hits an exception when determining the primary component. Then it’s necessary to select a node and promote it to the primary component.

To determine which node needs to be a reference, compare the last_committed cluster status variable value on all nodes. If all the servers are already running there’s no need to start the cluster again. You just need to promote the most advanced node to the primary component with SET statement (see the example).

All other nodes will reconnect to the node and resync their data based on this node.

  • SQL
  • JSON

SQL JSON

  1. SET CLUSTER posts GLOBAL 'pc.bootstrap' = 1
  1. POST /cli -d "
  2. SET CLUSTER posts GLOBAL 'pc.bootstrap' = 1
  3. "

Replication and cluster

To use replication define one listen port for SphinxAPI protocol and one listen for replication address and port range in the config. Define data_dir folder for incoming tables.

  • ini

ini

  1. searchd {
  2. listen = 9312
  3. listen = 192.168.1.101:9360-9370:replication
  4. data_dir = /var/lib/manticore/
  5. ...
  6. }

Create a cluster at the server that has local tables that need to be replicated

  • SQL
  • JSON
  • PHP
  • Python
  • Javascript
  • Java

SQL JSON PHP Python Javascript Java

  1. CREATE CLUSTER posts
  1. POST /cli -d "
  2. CREATE CLUSTER posts
  3. "
  1. $params = [
  2. 'cluster' => 'posts'
  3. ]
  4. ];
  5. $response = $client->cluster()->create($params);
  1. utilsApi.sql('CREATE CLUSTER posts')
  1. res = await utilsApi.sql('CREATE CLUSTER posts');
  1. utilsApi.sql("CREATE CLUSTER posts");

Add these local tables to cluster

  • SQL
  • JSON
  • PHP
  • Python
  • Javascript
  • Java

SQL JSON PHP Python Javascript Java

  1. ALTER CLUSTER posts ADD pq_title
  2. ALTER CLUSTER posts ADD pq_clicks
  1. POST /cli -d "
  2. ALTER CLUSTER posts ADD pq_title
  3. "
  4. POST /cli -d "
  5. ALTER CLUSTER posts ADD pq_clicks
  6. "
  1. $params = [
  2. 'cluster' => 'posts',
  3. 'body' => [
  4. 'operation' => 'add',
  5. 'index' => 'pq_title'
  6. ]
  7. ];
  8. $response = $client->cluster()->alter($params);
  9. $params = [
  10. 'cluster' => 'posts',
  11. 'body' => [
  12. 'operation' => 'add',
  13. 'index' => 'pq_clicks'
  14. ]
  15. ];
  16. $response = $client->cluster()->alter($params);
  1. utilsApi.sql('ALTER CLUSTER posts ADD pq_title')
  2. utilsApi.sql('ALTER CLUSTER posts ADD pq_clicks')
  1. res = await utilsApi.sql('ALTER CLUSTER posts ADD pq_title');
  2. res = await utilsApi.sql('ALTER CLUSTER posts ADD pq_clicks');
  1. utilsApi.sql("ALTER CLUSTER posts ADD pq_title");
  2. utilsApi.sql("ALTER CLUSTER posts ADD pq_clicks");

All other nodes that want replica of cluster’s tables should join cluster as

  • SQL
  • JSON
  • PHP
  • Python
  • Javascript
  • Java

SQL JSON PHP Python Javascript Java

  1. JOIN CLUSTER posts AT '192.168.1.101:9312'
  1. POST /cli -d "
  2. JOIN CLUSTER posts AT '192.168.1.101:9312'
  3. "
  1. $params = [
  2. 'cluster' => 'posts',
  3. 'body' => [
  4. '192.168.1.101:9312'
  5. ]
  6. ];
  7. $response = $client->cluster->join($params);
  1. utilsApi.sql('JOIN CLUSTER posts AT \'192.168.1.101:9312\'')
  1. res = await utilsApi.sql('JOIN CLUSTER posts AT \'192.168.1.101:9312\'');
  1. utilsApi.sql("JOIN CLUSTER posts AT '192.168.1.101:9312'");

When running queries for SQL prepend the table name with the cluster name posts: or use cluster property for HTTP request object.

  • SQL
  • JSON
  • PHP
  • Python
  • Javascript
  • Java

SQL JSON PHP Python Javascript Java

  1. INSERT INTO posts:pq_title VALUES ( 3, 'test me' )
  1. POST /insert -d '
  2. {
  3. "cluster":"posts",
  4. "index":"pq_title",
  5. "id": 3
  6. "doc":
  7. {
  8. "title" : "test me"
  9. }
  10. }'
  1. $index->addDocuments([
  2. 3, ['title' => 'test me']
  3. ]);
  1. indexApi.insert({"cluster":"posts","index":"pq_title","id":3"doc":{"title":"test me"}})
  1. res = await indexApi.insert({"cluster":"posts","index":"pq_title","id":3"doc":{"title":"test me"}});
  1. InsertDocumentRequest newdoc = new InsertDocumentRequest();
  2. HashMap<String,Object> doc = new HashMap<String,Object>(){{
  3. put("title","test me");
  4. }};
  5. newdoc.index("pq_title").cluster("posts").id(3L).setDoc(doc);
  6. sqlresult = indexApi.insert(newdoc);

Now all such queries that modify tables in the cluster are replicated to all nodes in the cluster.