Data Replication

Replication allows you to move data between streams in either a 1:1 mirror style or by multiplexing multiple source streams into a new stream. In future builds this will allow data to be replicated between accounts as well, ideal for sending data from a Leafnode into a central store.

Data Replication - 图1

Here we have 2 main streams - ORDERS and RETURNS - these streams are clustered across 3 nodes. These Streams have short retention periods and are memory based.

We create a ARCHIVE stream that has 2 sources set, the ARCHIVE will pull data from the sources into itself. This stream has a very long retention period and is file based and replicated across 3 nodes. Additional messages can be added to the ARCHIVE by sending to it directly.

Finally, we create a REPORT stream mirrored from ARCHIVE that is not clustered and retains data for a month. The REPORT Stream does not listen for any incoming messages, it can only consume data from ARCHIVE.

Mirrors

A mirror copies data from 1 other stream, as far as possible IDs and ordering will match exactly the source. A mirror does not listen on a subject for any data to be added. The Start Sequence and Start Time can be set, but no subject filter. A stream can only have 1 mirror and if it is a mirror it cannot also have any source.

Sources

A source is a stream where data is copied from, one stream can have multiple sources and will read data in from them all. The stream will also listen for messages on it’s own subject. We can therefore not maintain absolute ordering, but data from 1 single source will be in the correct order but mixed in with other streams. You might also find the timestamps of streams can be older and newer mixed in together as a result.

A Stream with sources may also listen on subjects, but could have no listening subject. When using the nats CLI to create sourced streams use --subjects to supply subjects to listen on.

A source can have Start Time or Start Sequence and can filter by a subject.

Configuration

The ORDERS and RETURNS streams as normal, I will not show how to create them.

  1. nats s report
  1. Obtaining Stream stats
  2. +---------+---------+-----------+----------+-------+------+---------+----------------------+
  3. | Stream | Storage | Consumers | Messages | Bytes | Lost | Deleted | Cluster |
  4. +---------+---------+-----------+----------+-------+------+---------+----------------------+
  5. | ORDERS | Memory | 0 | 0 | 0 B | 0 | 0 | n1-c2, n2-c2*, n3-c2 |
  6. | RETURNS | Memory | 0 | 0 | 0 B | 0 | 0 | n1-c2*, n2-c2, n3-c2 |
  7. +---------+---------+-----------+----------+-------+------+---------+----------------------+

We now add the ARCHIVE:

  1. nats s add ARCHIVE --source ORDERS --source RETURNS
  1. ? Storage backend file
  2. ? Retention Policy Limits
  3. ? Discard Policy Old
  4. ? Stream Messages Limit -1
  5. ? Message size limit -1
  6. ? Maximum message age limit -1
  7. ? Maximum individual message size -1
  8. ? Duplicate tracking time window 2m0s
  9. ? Allow message Roll-ups No
  10. ? Allow message deletion Yes
  11. ? Allow purging subjects or the entire stream Yes
  12. ? Replicas 1
  13. ? Adjust source "ORDERS" start Yes
  14. ? ORDERS Source Start Sequence 0
  15. ? ORDERS Source UTC Time Stamp (YYYY:MM:DD HH:MM:SS)
  16. ? ORDERS Source Filter source by subject
  17. ? Import "ORDERS" from a different JetStream domain No
  18. ? Import "ORDERS" from a different account No
  19. ? Adjust source "RETURNS" start No
  20. ? Import "RETURNS" from a different JetStream domain No
  21. ? Import "RETURNS" from a different account No
  22. Stream ARCHIVE was created
  23. Information for Stream ARCHIVE created 2022-01-21T11:49:52-08:00
  24. Configuration:
  25. Acknowledgements: true
  26. Retention: File - Limits
  27. Replicas: 1
  28. Discard Policy: Old
  29. Duplicate Window: 2m0s
  30. Allows Msg Delete: true
  31. Allows Purge: true
  32. Allows Rollups: false
  33. Maximum Messages: unlimited
  34. Maximum Bytes: unlimited
  35. Maximum Age: unlimited
  36. Maximum Message Size: unlimited
  37. Maximum Consumers: unlimited
  38. Sources: ORDERS
  39. RETURNS
  40. State:
  41. Messages: 0
  42. Bytes: 0 B
  43. FirstSeq: 0
  44. LastSeq: 0
  45. Active Consumers: 0

And we add the REPORT:

  1. nats s add REPORT --mirror ARCHIVE
  1. ? Storage backend file
  2. ? Retention Policy Limits
  3. ? Discard Policy Old
  4. ? Stream Messages Limit -1
  5. ? Message size limit -1
  6. ? Maximum message age limit -1
  7. ? Maximum individual message size -1
  8. ? Allow message Roll-ups No
  9. ? Allow message deletion Yes
  10. ? Allow purging subjects or the entire stream Yes
  11. ? Replicas 1
  12. ? Adjust mirror start No
  13. ? Import mirror from a different JetStream domain No
  14. ? Import mirror from a different account No
  15. Stream REPORT was created
  16. Information for Stream REPORT created 2022-01-21T11:50:55-08:00
  17. Configuration:
  18. Acknowledgements: true
  19. Retention: File - Limits
  20. Replicas: 1
  21. Discard Policy: Old
  22. Duplicate Window: 2m0s
  23. Allows Msg Delete: true
  24. Allows Purge: true
  25. Allows Rollups: false
  26. Maximum Messages: unlimited
  27. Maximum Bytes: unlimited
  28. Maximum Age: unlimited
  29. Maximum Message Size: unlimited
  30. Maximum Consumers: unlimited
  31. Mirror: ARCHIVE
  32. State:
  33. Messages: 0
  34. Bytes: 0 B
  35. FirstSeq: 0
  36. LastSeq: 0
  37. Active Consumers: 0

When configured we’ll see some additional information in a nats stream info output:

  1. nats stream info ARCHIVE

Output extract

  1. ...
  2. Source Information:
  3. Stream Name: ORDERS
  4. Lag: 0
  5. Last Seen: 2m23s
  6. Stream Name: RETURNS
  7. Lag: 0
  8. Last Seen: 2m15s
  9. ...
  10. $ nats stream info REPORT
  11. ...
  12. Mirror Information:
  13. Stream Name: ARCHIVE
  14. Lag: 0
  15. Last Seen: 2m35s
  16. ...

Here the Lag is how far behind we were reported as being last time we saw a message.

We can confirm all our setup using a nats stream report:

  1. nats s report
  1. +--------------------------------------------------------------------------------------------------------+
  2. | Stream Report |
  3. +---------+---------+-------------+-----------+----------+-------+------+---------+----------------------+
  4. | Stream | Storage | Replication | Consumers | Messages | Bytes | Lost | Deleted | Cluster |
  5. +---------+---------+-------------+-----------+----------+-------+------+---------+----------------------+
  6. | ARCHIVE | File | Sourced | 1 | 0 | 0 B | 0 | 0 | n1-c2*, n2-c2, n3-c2 |
  7. | ORDERS | Memory | | 1 | 0 | 0 B | 0 | 0 | n1-c2, n2-c2*, n3-c2 |
  8. | REPORT | File | Mirror | 0 | 0 | 0 B | 0 | 0 | n1-c2* |
  9. | RETURNS | Memory | | 1 | 0 | 0 B | 0 | 0 | n1-c2, n2-c2, n3-c2* |
  10. +---------+---------+-------------+-----------+----------+-------+------+---------+----------------------+
  11. +---------------------------------------------------------+
  12. | Replication Report |
  13. +---------+--------+---------------+--------+-----+-------+
  14. | Stream | Kind | Source Stream | Active | Lag | Error |
  15. +---------+--------+---------------+--------+-----+-------+
  16. | ARCHIVE | Source | ORDERS | never | 0 | |
  17. | ARCHIVE | Source | RETURNS | never | 0 | |
  18. | REPORT | Mirror | ARCHIVE | never | 0 | |
  19. +---------+--------+---------------+--------+-----+-------+

We then create some data in both ORDERS and RETURNS:

  1. nats req ORDERS.new "ORDER {{Count}}" --count 100
  2. nats req RETURNS.new "RETURN {{Count}}" --count 100

We can now see from a Stream Report that the data has been replicated:

  1. nats s report --dot replication.dot
  1. Obtaining Stream stats
  2. +---------+---------+-----------+----------+---------+------+---------+----------------------+
  3. | Stream | Storage | Consumers | Messages | Bytes | Lost | Deleted | Cluster |
  4. +---------+---------+-----------+----------+---------+------+---------+----------------------+
  5. | ORDERS | Memory | 1 | 100 | 3.3 KiB | 0 | 0 | n1-c2, n2-c2*, n3-c2 |
  6. | RETURNS | Memory | 1 | 100 | 3.5 KiB | 0 | 0 | n1-c2*, n2-c2, n3-c2 |
  7. | ARCHIVE | File | 1 | 200 | 27 KiB | 0 | 0 | n1-c2, n2-c2, n3-c2* |
  8. | REPORT | File | 0 | 200 | 27 KiB | 0 | 0 | n1-c2* |
  9. +---------+---------+-----------+----------+---------+------+---------+----------------------+
  10. +---------------------------------------------------------+
  11. | Replication Report |
  12. +---------+--------+---------------+--------+-----+-------+
  13. | Stream | Kind | Source Stream | Active | Lag | Error |
  14. +---------+--------+---------------+--------+-----+-------+
  15. | ARCHIVE | Source | ORDERS | 14.48s | 0 | |
  16. | ARCHIVE | Source | RETURNS | 9.83s | 0 | |
  17. | REPORT | Mirror | ARCHIVE | 9.82s | 0 | |
  18. +---------+--------+---------------+--------+-----+-------+

Here we also pass the --dot replication.dot argument that writes a GraphViz format map of the replication setup.

Data Replication - 图2