Globally Cached Lookups

Lookups are an experimental feature.

To use this Apache Druid extension, include druid-lookups-cached-global in the extensions load list.

Configuration

Static configuration is no longer supported. Lookups can be configured through dynamic configuration.

Globally cached lookups are appropriate for lookups which are not possible to pass at query time due to their size, or are not desired to be passed at query time because the data is to reside in and be handled by the Druid servers, and are small enough to reasonably populate in-memory. This usually means tens to tens of thousands of entries per lookup.

Globally cached lookups all draw from the same cache pool, allowing each process to have a fixed cache pool that can be used by cached lookups.

Globally cached lookups can be specified as part of the cluster wide config for lookups as a type of cachedNamespace

  1. {
  2. "type": "cachedNamespace",
  3. "extractionNamespace": {
  4. "type": "uri",
  5. "uri": "file:/tmp/prefix/",
  6. "namespaceParseSpec": {
  7. "format": "csv",
  8. "columns": [
  9. "[\"key\"",
  10. "\"value\"]"
  11. ]
  12. },
  13. "pollPeriod": "PT5M"
  14. },
  15. "firstCacheTimeout": 0
  16. }
  1. {
  2. "type": "cachedNamespace",
  3. "extractionNamespace": {
  4. "type": "jdbc",
  5. "connectorConfig": {
  6. "connectURI": "jdbc:mysql:\/\/localhost:3306\/druid",
  7. "user": "druid",
  8. "password": "diurd"
  9. },
  10. "table": "lookupTable",
  11. "keyColumn": "mykeyColumn",
  12. "valueColumn": "myValueColumn",
  13. "filter" : "myFilterSQL (Where clause statement e.g LOOKUPTYPE=1)",
  14. "tsColumn": "timeColumn"
  15. },
  16. "firstCacheTimeout": 120000,
  17. "injective":true
  18. }

The parameters are as follows

PropertyDescriptionRequiredDefault
extractionNamespaceSpecifies how to populate the local cache. See belowYes-
firstCacheTimeoutHow long to wait (in ms) for the first run of the cache to populate. 0 indicates to not waitNo0 (do not wait)
injectiveIf the underlying map is injective (keys and values are unique) then optimizations can occur internally by setting this to trueNofalse

If firstCacheTimeout is set to a non-zero value, it should be less than druid.manager.lookups.hostUpdateTimeout. If firstCacheTimeout is NOT set, then management is essentially asynchronous and does not know if a lookup succeeded or failed in starting. In such a case logs from the processes using lookups should be monitored for repeated failures.

Proper functionality of globally cached lookups requires the following extension to be loaded on the Broker, Peon, and Historical processes: druid-lookups-cached-global

Example configuration

In a simple case where only one tier exists (realtime_customer2) with one cachedNamespace lookup called country_code, the resulting configuration JSON looks similar to the following:

  1. {
  2. "realtime_customer2": {
  3. "country_code": {
  4. "version": "v0",
  5. "lookupExtractorFactory": {
  6. "type": "cachedNamespace",
  7. "extractionNamespace": {
  8. "type": "jdbc",
  9. "connectorConfig": {
  10. "connectURI": "jdbc:mysql:\/\/localhost:3306\/druid",
  11. "user": "druid",
  12. "password": "diurd"
  13. },
  14. "table": "lookupValues",
  15. "keyColumn": "value_id",
  16. "valueColumn": "value_text",
  17. "filter": "value_type='country'",
  18. "tsColumn": "timeColumn"
  19. },
  20. "firstCacheTimeout": 120000,
  21. "injective": true
  22. }
  23. }
  24. }
  25. }

Where the Coordinator endpoint /druid/coordinator/v1/lookups/realtime_customer2/country_code should return

  1. {
  2. "version": "v0",
  3. "lookupExtractorFactory": {
  4. "type": "cachedNamespace",
  5. "extractionNamespace": {
  6. "type": "jdbc",
  7. "connectorConfig": {
  8. "connectURI": "jdbc:mysql://localhost:3306/druid",
  9. "user": "druid",
  10. "password": "diurd"
  11. },
  12. "table": "lookupValues",
  13. "keyColumn": "value_id",
  14. "valueColumn": "value_text",
  15. "filter": "value_type='country'",
  16. "tsColumn": "timeColumn"
  17. },
  18. "firstCacheTimeout": 120000,
  19. "injective": true
  20. }
  21. }

Cache Settings

Lookups are cached locally on Historical processes. The following are settings used by the processes which service queries when setting namespaces (Broker, Peon, Historical)

PropertyDescriptionDefault
druid.lookup.namespace.cache.typeSpecifies the type of caching to be used by the namespaces. May be one of [offHeap, onHeap]. offHeap uses a temporary file for off-heap storage of the namespace (memory mapped files). onHeap stores all cache on the heap in standard java map types.onHeap
druid.lookup.namespace.numExtractionThreadsThe number of threads in the thread pool dedicated for lookup extraction and updates. This number may need to be scaled up, if you have a lot of lookups and they take long time to extract, to avoid timeouts.2
druid.lookup.namespace.numBufferedEntriesIf using off-heap caching, the number of records to be stored on an on-heap buffer.100,000

The cache is populated in different ways depending on the settings below. In general, most namespaces employ a pollPeriod at the end of which time they poll the remote resource of interest for updates.

onHeap uses ConcurrentMaps in the java heap, and thus affects garbage collection and heap sizing. offHeap uses an on-heap buffer and MapDB using memory-mapped files in the java temporary directory. So if total number of entries in the cachedNamespace is in excess of the buffer’s configured capacity, the extra will be kept in memory as page cache, and paged in and out by general OS tunings. It’s highly recommended that druid.lookup.namespace.numBufferedEntries is set when using offHeap, the value should be chosen from the range between 10% and 50% of the number of entries in the lookup.

Supported lookups

For additional lookups, please see our extensions list.

URI lookup

The remapping values for each globally cached lookup can be specified by a JSON object as per the following examples:

  1. {
  2. "type":"uri",
  3. "uri": "s3://bucket/some/key/prefix/renames-0003.gz",
  4. "namespaceParseSpec":{
  5. "format":"csv",
  6. "columns":[
  7. "[\"key\"",
  8. "\"value\"]"
  9. ]
  10. },
  11. "pollPeriod":"PT5M"
  12. }
  1. {
  2. "type":"uri",
  3. "uriPrefix": "s3://bucket/some/key/prefix/",
  4. "fileRegex":"renames-[0-9]*\\.gz",
  5. "namespaceParseSpec":{
  6. "format":"csv",
  7. "columns":[
  8. "[\"key\"",
  9. "\"value\"]"
  10. ]
  11. },
  12. "pollPeriod":"PT5M",
  13. "maxHeapPercentage": 10
  14. }
PropertyDescriptionRequiredDefault
pollPeriodPeriod between polling for updatesNo0 (only once)
uriURI for the file of interest, specified as a file, hdfs, s3 or gs pathNoUse uriPrefix
uriPrefixA URI that specifies a directory (or other searchable resource) in which to search for filesNoUse uri
fileRegexOptional regex for matching the file name under uriPrefix. Only used if uriPrefix is usedNo“.*”
namespaceParseSpecHow to interpret the data at the URIYes
maxHeapPercentageThe maximum percentage of heap size that the lookup should consume. If the lookup grows beyond this size, warning messages will be logged in the respective service logs.No10% of JVM heap size

One of either uri or uriPrefix must be specified, as either a local file system (file://), HDFS (hdfs://), S3 (s3://) or GCS (gs://) location. HTTP location is not currently supported.

The pollPeriod value specifies the period in ISO 8601 format between checks for replacement data for the lookup. If the source of the lookup is capable of providing a timestamp, the lookup will only be updated if it has changed since the prior tick of pollPeriod. A value of 0, an absent parameter, or null all mean populate once and do not attempt to look for new data later. Whenever an poll occurs, the updating system will look for a file with the most recent timestamp and assume that one with the most recent data set, replacing the local cache of the lookup data.

The namespaceParseSpec can be one of a number of values. Each of the examples below would rename foo to bar, baz to bat, and buck to truck. All parseSpec types assumes each input is delimited by a new line. See below for the types of parseSpec supported.

Only ONE file which matches the search will be used. For most implementations, the discriminator for choosing the URIs is by whichever one reports the most recent timestamp for its modification time.

csv lookupParseSpec

ParameterDescriptionRequiredDefault
columnsThe list of columns in the csv fileno if hasHeaderRow is setnull
keyColumnThe name of the column containing the keynoThe first column
valueColumnThe name of the column containing the valuenoThe second column
hasHeaderRowA flag to indicate that column information can be extracted from the input files’ header rownofalse
skipHeaderRowsNumber of header rows to be skippedno0

If both skipHeaderRows and hasHeaderRow options are set, skipHeaderRows is first applied. For example, if you set skipHeaderRows to 2 and hasHeaderRow to true, Druid will skip the first two lines and then extract column information from the third line.

example input

  1. bar,something,foo
  2. bat,something2,baz
  3. truck,something3,buck

example namespaceParseSpec

  1. "namespaceParseSpec": {
  2. "format": "csv",
  3. "columns": ["value","somethingElse","key"],
  4. "keyColumn": "key",
  5. "valueColumn": "value"
  6. }

tsv lookupParseSpec

ParameterDescriptionRequiredDefault
columnsThe list of columns in the tsv fileyesnull
keyColumnThe name of the column containing the keynoThe first column
valueColumnThe name of the column containing the valuenoThe second column
delimiterThe delimiter in the filenotab (\t)
listDelimiterThe list delimiter in the fileno(\u0001)
hasHeaderRowA flag to indicate that column information can be extracted from the input files’ header rownofalse
skipHeaderRowsNumber of header rows to be skippedno0

If both skipHeaderRows and hasHeaderRow options are set, skipHeaderRows is first applied. For example, if you set skipHeaderRows to 2 and hasHeaderRow to true, Druid will skip the first two lines and then extract column information from the third line.

example input

  1. bar|something,1|foo
  2. bat|something,2|baz
  3. truck|something,3|buck

example namespaceParseSpec

  1. "namespaceParseSpec": {
  2. "format": "tsv",
  3. "columns": ["value","somethingElse","key"],
  4. "keyColumn": "key",
  5. "valueColumn": "value",
  6. "delimiter": "|"
  7. }

customJson lookupParseSpec

ParameterDescriptionRequiredDefault
keyFieldNameThe field name of the keyyesnull
valueFieldNameThe field name of the valueyesnull

example input

  1. {"key": "foo", "value": "bar", "somethingElse" : "something"}
  2. {"key": "baz", "value": "bat", "somethingElse" : "something"}
  3. {"key": "buck", "somethingElse": "something", "value": "truck"}

example namespaceParseSpec

  1. "namespaceParseSpec": {
  2. "format": "customJson",
  3. "keyFieldName": "key",
  4. "valueFieldName": "value"
  5. }

With customJson parsing, if the value field for a particular row is missing or null then that line will be skipped, and will not be included in the lookup.

simpleJson lookupParseSpec

The simpleJson lookupParseSpec does not take any parameters. It is simply a line delimited JSON file where the field is the key, and the field’s value is the value.

example input

  1. {"foo": "bar"}
  2. {"baz": "bat"}
  3. {"buck": "truck"}

example namespaceParseSpec

  1. "namespaceParseSpec":{
  2. "format": "simpleJson"
  3. }

JDBC lookup

The JDBC lookups will poll a database to populate its local cache. If the tsColumn is set it must be able to accept comparisons in the format '2015-01-01 00:00:00'. For example, the following must be valid SQL for the table SELECT * FROM some_lookup_table WHERE timestamp_column > '2015-01-01 00:00:00'. If tsColumn is set, the caching service will attempt to only poll values that were written after the last sync. If tsColumn is not set, the entire table is pulled every time.

ParameterDescriptionRequiredDefault
connectorConfigThe connector config to use. You can set connectURI, user and password. You can selectively allow JDBC properties in connectURI. See JDBC connections security config for more details.Yes
tableThe table which contains the key value pairsYes
keyColumnThe column in table which contains the keysYes
valueColumnThe column in table which contains the valuesYes
filterThe filter to use when selecting lookups, this is used to create a where clause on lookup populationNoNo Filter
tsColumnThe column in table which contains when the key was updatedNoNot used
pollPeriodHow often to poll the DBNo0 (only once)
maxHeapPercentageThe maximum percentage of heap size that the lookup should consume. If the lookup grows beyond this size, warning messages will be logged in the respective service logs.No10% of JVM heap size
  1. {
  2. "type":"jdbc",
  3. "connectorConfig":{
  4. "connectURI":"jdbc:mysql://localhost:3306/druid",
  5. "user":"druid",
  6. "password":"diurd"
  7. },
  8. "table":"some_lookup_table",
  9. "keyColumn":"the_old_dim_value",
  10. "valueColumn":"the_new_dim_value",
  11. "tsColumn":"timestamp_column",
  12. "pollPeriod":600000,
  13. "maxHeapPercentage": 10
  14. }

If using JDBC, you will need to add your database’s client JAR files to the extension’s directory. For Postgres, the connector JAR is already included. See the MySQL extension documentation for instructions to obtain MySQL or MariaDB connector libraries. The connector JAR should reside in the classpath of Druid’s main class loader. To add the connector JAR to the classpath, you can copy the downloaded file to lib/ under the distribution root directory. Alternatively, create a symbolic link to the connector in the lib directory.

Introspection

Globally cached lookups have introspection points at /keys and /values which return a complete set of the keys and values (respectively) in the lookup. Introspection to / returns the entire map. Introspection to /version returns the version indicator for the lookup.