Setting up M3 Aggregator

Introduction

m3aggregator is used to cluster stateful downsampling and rollup of metrics before they are store in M3DB. The M3 Coordinator also performs this role but is not cluster aware. This means metrics will not get aggregated properly if you send metrics in round robin fashion to multiple M3 Coordinators for the same metrics ingestion source (e.g. Prometheus server).

Similar to M3DB, m3aggregator supports clustering and replication by default. This means that metrics are correctly routed to the instance(s) responsible for aggregating each metric and multiple m3aggregator replicas can be configured such that there are no single points of failure for aggregation.

Configuration

Before setting up m3aggregator, make sure that you have at least one M3DB node running and a dedicated m3coordinator setup.

We highly recommend running with at least a replication factor 2 for a m3aggregator deployment. If you run with replication factor 1 then when you restart an aggregator it will temporarily interrupt good the stream of aggregated metrics and there will be some data loss.

Topology

Initializing aggregator topology

You can setup a m3aggregator topology by issuing a request to your coordinator (be sure to use your own hostnames, number of shards and replication factor):

  1. curl -vvvsSf -H "Cluster-Environment-Name: namespace/m3db-cluster-name" -X POST http://m3dbnode-with-embedded-coordinator:7201/api/v1/services/m3aggregator/placement/init -d '{
  2. "num_shards": 64,
  3. "replication_factor": 2,
  4. "instances": [
  5. {
  6. "id": "m3aggregator01:6000",
  7. "isolation_group": "availability-zone-a",
  8. "zone": "embedded",
  9. "weight": 100,
  10. "endpoint": "m3aggregator01:6000",
  11. "hostname": "m3aggregator01",
  12. "port": 6000
  13. },
  14. {
  15. "id": "m3aggregator02:6000",
  16. "isolation_group": "availability-zone-b",
  17. "zone": "embedded",
  18. "weight": 100,
  19. "endpoint": "m3aggregator02:6000",
  20. "hostname": "m3aggregator02",
  21. "port": 6000
  22. }
  23. ]
  24. }'

Initializing m3msg topic for m3aggregator to receive from m3coordinators to aggregate metrics

Now we must setup a topic for the m3aggregator to receive unaggregated metrics from m3coordinator instances:

  1. curl -vvvsSf -H "Cluster-Environment-Name: namespace/m3db-cluster-name" -H "Topic-Name: aggregator_ingest" -X POST http://m3dbnode-with-embedded-coordinator:7201/api/v1/topic/init -d '{
  2. "numberOfShards": 64
  3. }'

Add m3aggregagtor consumer group to ingest topic

Add the m3aggregator placement to receive traffic from the topic (make sure to set message TTL to match your desired maximum in memory retry message buffer):

  1. curl -vvvsSf -H "Cluster-Environment-Name: namespace/m3db-cluster-name" -H "Topic-Name: aggregator_ingest" -X POST http://m3dbnode-with-embedded-coordinator:7201/api/v1/topic -d '{
  2. "consumerService": {
  3. "serviceId": {
  4. "name": "m3aggregator",
  5. "environment": "namespace/m3db-cluster-name",
  6. "zone": "embedded"
  7. },
  8. "consumptionType": "REPLICATED",
  9. "messageTtlNanos": "300000000000"
  10. }
  11. }'

Note: 300000000000 nanoseconds is a TTL of 5 minutes for messages to rebuffer for retry.

Initializing m3msg topic for m3coordinator to receive from m3aggregator to write to M3DB

Now we must setup a topic for the m3coordinator to receive aggregated metrics from m3aggregator instances to write to M3DB:

  1. curl -vvvsSf -H "Cluster-Environment-Name: namespace/m3db-cluster-name" -H "Topic-Name: aggregated_metrics" -X POST http://m3dbnode-with-embedded-coordinator:7201/api/v1/topic/init -d '{
  2. "numberOfShards": 64
  3. }'

Initializing m3coordinator topology

Then m3coordinator instances need to be configured to receive traffic for this topic (note ingest at port 7507 must match the configured port for your m3coordinator ingest server, see config at bottom of this guide):

  1. curl -vvvsSf -H "Cluster-Environment-Name: namespace/m3db-cluster-name" -X POST http://m3dbnode-with-embedded-coordinator:7201/api/v1/services/m3coordinator/placement/init -d '{
  2. "instances": [
  3. {
  4. "id": "m3coordinator01",
  5. "zone": "embedded",
  6. "endpoint": "m3coordinator01:7507",
  7. "hostname": "m3coordinator01",
  8. "port": 7507
  9. }
  10. ]
  11. }'

Note: When you add or remove m3coordinator instances they must be added to this placement.

Add m3coordinator consumer group to outbound topic

Add the m3coordinator placement to receive traffic from the topic (make sure to set message TTL to match your desired maximum in memory retry message buffer):

  1. curl -vvvsSf -H "Cluster-Environment-Name: namespace/m3db-cluster-name" -H "Topic-Name: aggregated_metrics" -X POST http://m3dbnode-with-embedded-coordinator:7201/api/v1/topic -d '{
  2. "consumerService": {
  3. "serviceId": {
  4. "name": "m3coordinator",
  5. "environment": "namespace/m3db-cluster-name",
  6. "zone": "embedded"
  7. },
  8. "consumptionType": "SHARED",
  9. "messageTtlNanos": "300000000000"
  10. }
  11. }'

Note: 300000000000 nanoseconds is a TTL of 5 minutes for messages to rebuffer for retry.

Running

Dedicated Coordinator

Metrics will still arrive at the m3coordinator, they simply need to be forwarded to an m3aggregator. The m3coordinator then also needs to receive metrics that have been aggregated from the m3aggregator and store them in M3DB, so running an ingestion server should be configured.

Here is the config you should add to your m3coordinator:

  1. # This is for sending metrics to the remote m3aggregators
  2. downsample:
  3. remoteAggregator:
  4. client:
  5. type: m3msg
  6. m3msg:
  7. producer:
  8. writer:
  9. topicName: aggregator_ingest
  10. topicServiceOverride:
  11. zone: embedded
  12. environment: namespace/m3db-cluster-name
  13. placement:
  14. isStaged: true
  15. placementServiceOverride:
  16. namespaces:
  17. placement: /placement
  18. connection:
  19. numConnections: 4
  20. messagePool:
  21. size: 16384
  22. watermark:
  23. low: 0.2
  24. high: 0.5
  25. # This is for configuring the ingestion server that will receive metrics from the m3aggregators on port 7507
  26. ingest:
  27. ingester:
  28. workerPoolSize: 10000
  29. opPool:
  30. size: 10000
  31. retry:
  32. maxRetries: 3
  33. jitter: true
  34. logSampleRate: 0.01
  35. m3msg:
  36. server:
  37. listenAddress: "0.0.0.0:7507"
  38. retry:
  39. maxBackoff: 10s
  40. jitter: true

M3 Aggregator

You can run m3aggregator by either building and running the binary yourself:

  1. make m3aggregator
  2. ./bin/m3aggregator -f ./src/aggregator/config/m3aggregator.yml

Or you can run it with Docker using the Docker file located at docker/m3aggregator/Dockerfile or the publicly provided image quay.io/m3db/m3aggregator:latest.

You can use a config like so, making note of the topics used such as aggregator_ingest and aggregated_metrics and the corresponding environment namespace/m3db-cluster-name:

  1. logging:
  2. level: info
  3. metrics:
  4. scope:
  5. prefix: m3aggregator
  6. prometheus:
  7. onError: none
  8. handlerPath: /metrics
  9. listenAddress: 0.0.0.0:6002
  10. timerType: histogram
  11. sanitization: prometheus
  12. samplingRate: 1.0
  13. extended: none
  14. m3msg:
  15. server:
  16. listenAddress: 0.0.0.0:6000
  17. retry:
  18. maxBackoff: 10s
  19. jitter: true
  20. consumer:
  21. messagePool:
  22. size: 16384
  23. watermark:
  24. low: 0.2
  25. high: 0.5
  26. http:
  27. listenAddress: 0.0.0.0:6001
  28. readTimeout: 60s
  29. writeTimeout: 60s
  30. kvClient:
  31. etcd:
  32. env: namespace/m3db-cluster-name
  33. zone: embedded
  34. service: m3aggregator
  35. cacheDir: /var/lib/m3kv
  36. etcdClusters:
  37. - zone: embedded
  38. endpoints:
  39. - dbnode01:2379
  40. runtimeOptions:
  41. kvConfig:
  42. environment: namespace/m3db-cluster-name
  43. zone: embedded
  44. writeValuesPerMetricLimitPerSecondKey: write-values-per-metric-limit-per-second
  45. writeValuesPerMetricLimitPerSecond: 0
  46. writeNewMetricLimitClusterPerSecondKey: write-new-metric-limit-cluster-per-second
  47. writeNewMetricLimitClusterPerSecond: 0
  48. writeNewMetricNoLimitWarmupDuration: 0
  49. aggregator:
  50. hostID:
  51. resolver: environment
  52. envVarName: M3AGGREGATOR_HOST_ID
  53. instanceID:
  54. type: host_id
  55. verboseErrors: true
  56. metricPrefix: ""
  57. counterPrefix: ""
  58. timerPrefix: ""
  59. gaugePrefix: ""
  60. aggregationTypes:
  61. counterTransformFnType: empty
  62. timerTransformFnType: suffix
  63. gaugeTransformFnType: empty
  64. aggregationTypesPool:
  65. size: 1024
  66. quantilesPool:
  67. buckets:
  68. - count: 256
  69. capacity: 4
  70. - count: 128
  71. capacity: 8
  72. stream:
  73. eps: 0.001
  74. capacity: 32
  75. streamPool:
  76. size: 4096
  77. samplePool:
  78. size: 4096
  79. floatsPool:
  80. buckets:
  81. - count: 4096
  82. capacity: 16
  83. - count: 2048
  84. capacity: 32
  85. - count: 1024
  86. capacity: 64
  87. client:
  88. type: m3msg
  89. m3msg:
  90. producer:
  91. writer:
  92. topicName: aggregator_ingest
  93. topicServiceOverride:
  94. zone: embedded
  95. environment: namespace/m3db-cluster-name
  96. placement:
  97. isStaged: true
  98. placementServiceOverride:
  99. namespaces:
  100. placement: /placement
  101. messagePool:
  102. size: 16384
  103. watermark:
  104. low: 0.2
  105. high: 0.5
  106. placementManager:
  107. kvConfig:
  108. namespace: /placement
  109. environment: namespace/m3db-cluster-name
  110. zone: embedded
  111. placementWatcher:
  112. key: m3aggregator
  113. initWatchTimeout: 10s
  114. hashType: murmur32
  115. bufferDurationBeforeShardCutover: 10m
  116. bufferDurationAfterShardCutoff: 10m
  117. bufferDurationForFutureTimedMetric: 10m # Allow test to write into future.
  118. resignTimeout: 1m
  119. flushTimesManager:
  120. kvConfig:
  121. environment: namespace/m3db-cluster-name
  122. zone: embedded
  123. flushTimesKeyFmt: shardset/%d/flush
  124. flushTimesPersistRetrier:
  125. initialBackoff: 100ms
  126. backoffFactor: 2.0
  127. maxBackoff: 2s
  128. maxRetries: 3
  129. electionManager:
  130. election:
  131. leaderTimeout: 10s
  132. resignTimeout: 10s
  133. ttlSeconds: 10
  134. serviceID:
  135. name: m3aggregator
  136. environment: namespace/m3db-cluster-name
  137. zone: embedded
  138. electionKeyFmt: shardset/%d/lock
  139. campaignRetrier:
  140. initialBackoff: 100ms
  141. backoffFactor: 2.0
  142. maxBackoff: 2s
  143. forever: true
  144. jitter: true
  145. changeRetrier:
  146. initialBackoff: 100ms
  147. backoffFactor: 2.0
  148. maxBackoff: 5s
  149. forever: true
  150. jitter: true
  151. resignRetrier:
  152. initialBackoff: 100ms
  153. backoffFactor: 2.0
  154. maxBackoff: 5s
  155. forever: true
  156. jitter: true
  157. campaignStateCheckInterval: 1s
  158. shardCutoffCheckOffset: 30s
  159. flushManager:
  160. checkEvery: 1s
  161. jitterEnabled: true
  162. maxJitters:
  163. - flushInterval: 5s
  164. maxJitterPercent: 1.0
  165. - flushInterval: 10s
  166. maxJitterPercent: 0.5
  167. - flushInterval: 1m
  168. maxJitterPercent: 0.5
  169. - flushInterval: 10m
  170. maxJitterPercent: 0.5
  171. - flushInterval: 1h
  172. maxJitterPercent: 0.25
  173. numWorkersPerCPU: 0.5
  174. flushTimesPersistEvery: 10s
  175. maxBufferSize: 5m
  176. forcedFlushWindowSize: 10s
  177. flush:
  178. handlers:
  179. - dynamicBackend:
  180. name: m3msg
  181. hashType: murmur32
  182. producer:
  183. writer:
  184. topicName: aggregated_metrics
  185. topicServiceOverride:
  186. zone: embedded
  187. environment: namespace/m3db-cluster-name
  188. messagePool:
  189. size: 16384
  190. watermark:
  191. low: 0.2
  192. high: 0.5
  193. passthrough:
  194. enabled: true
  195. forwarding:
  196. maxConstDelay: 5m # Need to add some buffer window, since timed metrics by default are delayed by 1min.
  197. entryTTL: 1h
  198. entryCheckInterval: 10m
  199. maxTimerBatchSizePerWrite: 140
  200. defaultStoragePolicies: []
  201. maxNumCachedSourceSets: 2
  202. discardNaNAggregatedValues: true
  203. entryPool:
  204. size: 4096
  205. counterElemPool:
  206. size: 4096
  207. timerElemPool:
  208. size: 4096
  209. gaugeElemPool:
  210. size: 4096

Usage

Send metrics as usual to your m3coordinator instances in round robin fashion (or any other load balancing strategy), the metrics will be forwarded to the m3aggregator instances, then once aggregated they will be returned to the m3coordinator instances to write to M3DB.