VReplication

VReplication is a core component of Vitess that can be used to composemany features. It can be used for the following use cases:

  • Resharding: Legacy workflows of vertical and horizontal resharding.New workflows of resharding from an unsharded to a sharded keyspace andvice-versa. Resharding from an unsharded to an unsharded keyspace usinga different vindex than the source keyspace.
  • Materialized Views: You can specify a materialization rule that createsa view of the source table into a target keyspace. This materializationcan use a different primary vindex than the source. It can also materializea subset of the source columns, or add new expressions from the source.This view will be kept up-to-date in real time. One can also materializereference tables onto all shards and have Vitess perform efficientlocal joins with those materialized tables.
  • Realtime rollups; The materialization expression can include aggregationexpressions in which case, Vitess will create a rolled up version of thesource table which can be used for realtime analytics.
  • Backfilling lookup vindexes: VReplication can be used to backfill anewly created lookup vindex. Workflows can be built manage the switchingfrom a backfill mode to the vindex itself keeping it up-to-date.
  • Schema deployment: We can use VReplication to recreate the workflowperformed by gh-ost and thereby support zero-downtime schema deploymentsin Vitess natively.
  • Data migration: VReplication can be setup to migrate data from anexisting system into Vitess. The replication could also be reversed aftera cutover giving you the option to rollback a migration if something wentwrong.
  • Change notification: The streamer component of VReplication can beused for the application or a systems operator to subscribe to changenotification and use it to keep downstream systems up-to-date with thesource.The VReplication feature itself is a fairly low level one that isexpected to be used as a building block for the above use cases. However,it’s still possible to directly issue commands to do some of theactivities.

Feature description

VReplication works as a stream or combination of streams. Each streamestablishes a replication from a source keyspace/shard into a targetkeyspace/shard.

A given stream can replicate multiple tables. For each table, you canspecify a select statement that represents both the transformationrule and the filtering rule. The select expressions specify thetransformation, and the where clause specifies the filtering.

The select expressions can be any non-aggregate MySQL expression, orthey can also be count or sum as aggregate expressions. Aggregateexpressions combined with the corresponding group by clauses willallow you to materialize real-time rollups of the source table, whichcan be used for analytics. The target table can have a different namefrom the source.

For a sharded system like Vitess, multiple VReplication streamsmay be needed to achieve the necessary goals. This is because therewill be multiple source shards as well as destination shards, andthe relationship between them may not be one to one.

VReplication performs the following essential functions:

  • Copy data from the source to the destination table in a consistentfashion. For large data, this copy can be long-running. It can beinterrupted and resumed. If interrupted, VReplication can keepthe copied portion up-to-date with respect to the source, and itcan resume the copy processat a point that’s consistent with the current replication position.
  • After copying is finished, it can continuously replicate the datafrom the source to destination.
  • The copying rule can be expressed as a select statement. Thestatement should be simple enough that the materialized table canbe kept up-to-date from the data coming from the binlog. Forexample, joins are not supported.
  • Correctness verification (to be implemented): VReplication canverify that the target table is an exact representation ofthe select statement from the source by capturing consistentsnapshots of the source and target and comparing them against eachother. This step can be done without the need to create specialsnapshot replicas.
  • Journaling (to be implemented): If there is any kind of trafficcut-over where we start writing to a different table than we usedto before, VReplication will save the current binlog positionsinto a journal table. This can be used by other streams to resumereplication from the new source.
  • Routing rules: Although this feature is itself not a directfunctionality of VReplication, it works hand in hand with it. It allowsyou to specify sophisticated rules about where to route queriesdepending on the type of workflow being performed. For example,it can be used to control the cut-over during resharding. Inthe case of materialized views, it can be used to establishequivalence of tables, which will allow VTGate to compute the most optimalplans given the available options.

VReplicationExec

The VReplicationExec command is used to manage vreplication streams.The commands are issued as SQL statements. For example, a selectcan be used to see the current list of streams. An insert canbe used to create one, etc. By design, the metadata for vreplicationstreams are stored in a vreplication table in the vt database.VReplication uses the ‘pull’ model. This means that a stream iscreated on the target side, and the target pulls the data by findingthe appropriate source.

The table schema is as follows:

  1. CREATE TABLE _vt.vreplication (
  2. id INT AUTO_INCREMENT,
  3. workflow VARBINARY(1000),
  4. source VARBINARY(10000) NOT NULL,
  5. pos VARBINARY(10000) NOT NULL,
  6. stop_pos VARBINARY(10000) DEFAULT NULL,
  7. max_tps BIGINT(20) NOT NULL,
  8. max_replication_lag BIGINT(20) NOT NULL,
  9. cell VARBINARY(1000) DEFAULT NULL,
  10. tablet_types VARBINARY(100) DEFAULT NULL,
  11. time_updated BIGINT(20) NOT NULL,
  12. transaction_timestamp BIGINT(20) NOT NULL,
  13. state VARBINARY(100) NOT NULL,
  14. message VARBINARY(1000) DEFAULT NULL,
  15. db_name VARBINARY(255) NOT NULL,
  16. PRIMARY KEY (id)
  17. )

The fields are explained in the following section.

This is the syntax of the command:

  1. VReplicationExec [-json] <tablet alias> <sql command>

Here’s an example of the command to list all existing streams fora given tablet.

  1. lvtctl.sh VReplicationExec 'tablet-100' 'select * from _vt.vreplication'

Creating a stream

It’s generally easier to send the VReplication command programmaticallyinstead of a bash script. This is because of the number of nested encodingsinvolved:

  • One of the arguments is an SQL statement, which can contain quotedstrings as values.
  • One of the strings in the SQL statement is a string encoded protobuf,which can contain quotes.
  • One of the parameters within the protobuf is an SQL select expressionfor the materialized view.However, you can use vreplgen.go to generate a fully escaped bash command.

Alternately, you can use a python program. Here’s an example:

  1. cmd = [
  2. './lvtctl.sh',
  3. 'VReplicationExec',
  4. 'test-200',
  5. """insert into _vt.vreplication
  6. (db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values
  7. ('vt_keyspace', 'keyspace:"lookup" shard:"0" filter:<rules:<match:"uproduct" filter:"select * from product" > >', '', 99999, 99999, 'master', 0, 0, 'Running')""",
  8. ]

The first argument to the command is the master tablet id of the target keyspace/shard.

The second argument is the SQL command. To start a new stream, you need an insert statement.The parameters are as follows:

  • db_name: This name must match the name of the MySQL database. In the future, thiswill not be required, and will be automatically filled in by the vttablet.
  • source: The protobuf representation of the stream source, explained below.
  • pos: For a brand new stream, this should be empty. To startfrom a specific position, a flavor-encoded position must be specified. Atypical position would look like this MySQL56/ac6c45eb-71c2-11e9-92ea-0a580a1c1026:1-1296.
  • max_tps: 99999, reserved.
  • max_replication_lag: 99999, reserved.
  • tablet_types: specifies a comma separated list of tablet types to replicate from.If empty, the default tablet type specified by the -vreplication_tablet_typecommand line flag is used.
  • time_updated: 0, reserved.
  • transaction_timestamp: 0, reserved.
  • state: ‘Running’ or ‘Stopped’.
  • cell: is an optional parameter that specifies the cell from which the streamcan be sourced.

The source field

The source field is a proto-encoding of the following structure:

  1. message BinlogSource {
  2. // the source keyspace
  3. string keyspace = 1;
  4. // the source shard
  5. string shard = 2;
  6. // list of filtering rules
  7. Filter filter = 6;
  8. // what to do if a DDL is encountered
  9. OnDDLAction on_ddl = 7;
  10. }
  11. message Filter {
  12. repeated Rule rules = 1;
  13. }
  14. message Rule {
  15. // match can be a table name or a regular expression
  16. // delineated by '/' and '/'.
  17. string match = 1;
  18. // filter can be an empty string or keyrange if the match
  19. // is a regular expression. Otherwise, it must be a select
  20. // query.
  21. string filter = 2;
  22. }
  23. enum OnDDLAction {
  24. IGNORE = 0;
  25. STOP = 1;
  26. EXEC = 2;
  27. EXEC_IGNORE = 3;
  28. }

Here are some examples of proto encodings:

  1. keyspace:"lookup" shard:"0" filter:<rules:<match:"uproduct" filter:"select * from product" > >

Meaning: replicate all columns and rows of product from lookup/0.productinto the uproduct table in target keyspace.

  1. keyspace:"user" shard:"-80" filter:<rules:<match:"morder" filter:"select * from uorder where in_keyrange(mname, \\'unicode_loose_md5\\', \\'-80\\')" > >

The double-backslash for the strings inside the select will first be escaped by the python script,which will cause the expression to internally be \'unicode_loose_md5\'. Since the entiresource is surrounded by single quotes when being sent as a value inside the outer insert statement,the single \ will escape the single quotes that follow. The final value in the source willtherefore be:

  1. keyspace:"user" shard:"-80" filter:<rules:<match:"morder" filter:"select * from uorder where in_keyrange(mname, 'unicode_loose_md5', '-80')" > >

Meaning: replicate all columns of user/-80.uorder where unicode_loose_md5(mname)is within -80 keyrange, into morder.

This particular stream generally wouldn’t make sense in isolation. This would typicallybe one of four streams that combine together to create a materialized view of uorderfrom the user keyspace into the target (merchant) keyspace, but sharded by usingmname as the primary vindex. The vindex used would be unicode_loose_md5 which shouldalso match the primary vindex of other tables in the target keyspace.

  1. keyspace:"user" shard:"-80" filter:<rules:<match:"sales" filter:"select pid, count(*) as kount, sum(price) as amount from uorder group by pid" > >

Meaning: create a materialized view of user/-80.uorder into sales of the targetkeyspace using the expression: select pid, count(*) as kount, sum(price) as amount from uorder group by pid.

This represents only one stream from source shard -80. Presumably, there will be onemore for the other -80 shard.

The ‘select’ features

The select statement has the following features (and restrictions):

  • The Select expressions can be any deterministic MySQL expression.Subqueries are not supported. Among aggregate expressions, onlycount(*) and sum(col) are supported.
  • The where clause can only contain the in_keyrange construct. Ithas two forms:
    • in_keyrange('-80'): The row’s source keyrange matched against -80.
    • in_keyrange(col, 'hash', '-80'): The keyrange is computed usinghash(col) and matched against -80.
  • group by: can be specified if using aggregations. The group byexpressions are expected to cover the non-aggregated columns justlike regular SQL requires.
  • No other constructs like order by, limit, joins, etc. are allowed.

The pos field

For starting a brand new vreplication stream, the pos field must be empty.The empty string signifies that there’s no starting point for the vreplication.This causes VReplication to copy the contents of the source table first, and thenstart the replication.

For large tables, this is done in chunks. After each chunk is copied, replicationis resumed until it’s caught up. VReplication ensures that only changes that affectexisting rows are applied. Following this another chunk is copied, and so on,until all tables are completed. After that, replication runs indefinitely.

It’s a shared row

The vreplication row is shared between the operator and Vreplication itself.Once the row is created, the VReplication streamupdates various fields of the row to save and report on its own status. For example, thepos field is continuously updated as it makes forward progress.

While copying, the state field is updated as Init or Copying.

Updating a stream

You can change any field of the stream by issuing a VReplicationExec with anupdate statement. You are required to specify the id of the row you intend toupdate. You can only update one row at a time.

Typically, you can update the row and change the state to Stopped to stop astream, or to Running to restart a stopped stream.

You can also update the row to set a stop_pos, which will make the replicationstop once it reaches the specified position.

Deleting a stream

You can delete a stream by issuing a delete statement. This will stop the replicationand delete the row. This statement is destructive. All data about the replicationstate will be permanently deleted.

Other properties of VReplication

Fast replay

VReplication has the capability to batch transactions if the send rate of the sourceexceeds the replay rate of the destination. This allows it to catch up very quicklywhen there is a backlog. Load tests have shown a 3-20X improvement over traditionalMySQL replication depending on the workload.

Accurate lag tracking

The source vttablet sends its current time along with every event. This allows thetarget to correct for clock skew while estimating replication lag. Additionally,the source starts sending heartbeats if there is nothing to send. If the targetreceives no events from the source at all, it knows that it’s definitely laggedand starts reporting itself accordingly.

Self-replication

VReplication allows you to set the source keyspace/shard to be the same as the target.This is especially useful for performing schema rollouts: you can create the targettable with the intended schema and vreplicate from the source table to the newtarget. Once caught up, you can cutover to write to the target table.In this situation, an apply onthe target generates a binlog event that will be picked up by the source andsent to the target. Typically, it will be an empty transaction. In such cases,the target does not generally apply these transactions, because such an applicationwill generate yet another event. However, there are situations where one needsto apply empty transactions, especially if it’s a required stopping point.VReplication can differentiate between these situations and apply eventsonly as needed.

Deadlocks and lock wait timeouts

It is possible that multiple streams can conflict with each other and causedeadlocks or lock waits. When such things happen, VReplication silently retriessuch transactions without reporting an error. It does increment a counter sothat the frequency of such occurrences can be tracked.

Automatic retries

If any other error is encountered, the replication is retried after a short wait.Each time, the stream searches from the full list of available sources and picksone at random.

on_ddl

The source specification allows you to specify a value for on_ddl. This allowsyou to specify what to do with DDL SQL statements when they are encounteredin the replication stream from the source. The values can be as follows:

  • IGNORE: Ignore all DDLs (this is also the default, if a value for on_ddlis not provided).
  • STOP: Stop when DDL is encountered. This allows you to make any necessarychanges to the target. Once changes are made, updating the state to Runningwill cause VReplication to continue from just after the point where itencountered the DDL.
  • EXEC: Apply the DDL, but stop if an error is encountered while applying it.
  • EXEC_IGNORE: Apply the DDL, but ignore any errors and continue replicating.

Failover continuation

If a failover is performed on the target keyspace/shard, the new master willautomatically resume VReplication from where the previous master left off.

Monitoring and troubleshooting

VTTablet /debug/status

The first place to look at is the /debug/status page of the target mastervttablet. The bottom of the page shows the status of all the VReplicationstreams.

Typically, if there is a problem, the Last Message column will display theerror. Sometimes, it’s possible that the stream cannot find a source. If so,the Source Tablet would be empty.

VTTablet logfile

If the errors are not clear or if they keep disappearing, the VTTablet logfilewill contain information about what it’s been doing with each stream.

VReplicationExec select

The current status of the streams can also be fetched by issuing aVReplicationExec command with select * from _vt.vreplication.

Monitoring variables

VReplication also reports the following variables that can be scraped bymonitoring tools like prometheus:

  • VReplicationStreamCount: Number of VReplication streams.
  • VReplicationSecondsBehindMasterMax: Max vreplication seconds behind master.
  • VReplicationSecondsBehindMaster: vreplication seconds behind master per stream.
  • VReplicationSource: The source for each VReplication stream.
  • VReplicationSourceTablet: The source tablet for each VReplication stream.Thresholds and alerts can be set to draw attention to potential problems.