User-Defined Actions

User-defined actions allow you to run functions and procedures implemented in a language of your choice on a schedule within TimescaleDB. This allows automatic periodic tasks that are not covered by existing policies and even enhancing existing policies with additional functionality.

Creating Procedures for Actions

The signature for actions is (job_id INT, config JSONB). It can either be implemented as function or procedure. The content of the config JSONB is completely up to the job and may also be NULL if no parameters are required.

Template for a procedure.

  1. CREATE OR REPLACE PROCEDURE user_defined_action(job_id int, config jsonb) LANGUAGE PLPGSQL AS
  2. $$
  3. BEGIN
  4. RAISE NOTICE 'Executing job % with config %', job_id, config;
  5. END
  6. $$;

Registering Actions

In order to register your action for execution within TimescaleDB’s job scheduler, you next need to add_job with the name of your action as well as the schedule on which it is run.

When registered, the action’s job_id and config are stored in the TimescaleDB catalog. The config JSONB can be modified with alter_job. job_id and config will be passed as arguments when the procedure is executed as background process or when expressly called with run_job.

Register the created job with the automation framework. add_job returns the job_id which can be used to execute the job manually with run_job:

  1. SELECT add_job('user_defined_action','1h', config => '{"hypertable":"metr"}');

To get a list of all currently registered jobs you can query timescaledb_information.jobs:

  1. SELECT * FROM timescaledb_information.jobs;

Testing and Debugging Jobs

Any background worker job can be run in foreground when executed with run_job. This can be useful to debug problems when combined with increased log level.

Since run_job is implemented as stored procedure it cannot be executed inside a SELECT query but has to be executed with CALL.

Set log level shown to client to DEBUG1 and run the job with the job id 1000:

  1. SET client_min_messages TO DEBUG1;
  2. CALL run_job(1000);

Altering and Dropping Actions

You can alter the config or scheduling parameters with alter_job.

Replace the entire JSON config for job with id 1000 with the specified JSON:

  1. SELECT alter_job(1000, config => '{"hypertable":"metrics"}');

Disable automatic scheduling of the job with id 1000. The job can still be run manually with run_job:

  1. SELECT alter_job(1000, scheduled => false);

Reenable automatic scheduling of the job with id 1000:

  1. SELECT alter_job(1000, scheduled => true);

Delete the job with id 1000 from the automation framework with delete_job:

  1. SELECT delete_job(1000);

Examples

The following section provides a number of examples of user-defined actions that you can specify and subsequently schedule as part of TimescaleDB’s automation framework.

Generic Retention

Create a generic data retention policy that applies to ALL hypertables, as opposed to just a single one as required by add_retention_policy. The policy could be further refined with additional filters, by adding a WHERE clause to the PERFORM query in the procedure definition.

  1. CREATE OR REPLACE PROCEDURE generic_retention (job_id int, config jsonb)
  2. LANGUAGE PLPGSQL
  3. AS $$
  4. DECLARE
  5. drop_after interval;
  6. BEGIN
  7. SELECT jsonb_object_field_text (config, 'drop_after')::interval INTO STRICT drop_after;
  8. IF drop_after IS NULL THEN
  9. RAISE EXCEPTION 'Config must have drop_after';
  10. END IF;
  11. PERFORM drop_chunks(format('%I.%I', table_schema, table_name), older_than => drop_after)
  12. FROM timescaledb_information.hypertables;
  13. END
  14. $$;

Register job to run daily dropping chunks on all hypertables that are older than 12 months.

  1. SELECT add_job('generic_retention','1d', config => '{"drop_after":"12 month"}');

Tiered Storage

Action that moves chunks older than a certain time to a different tablespace.

  1. CREATE OR REPLACE PROCEDURE move_chunks (job_id int, config jsonb)
  2. LANGUAGE PLPGSQL
  3. AS $$
  4. DECLARE
  5. ht REGCLASS;
  6. lag interval;
  7. destination name;
  8. chunk REGCLASS;
  9. tmp_name name;
  10. BEGIN
  11. SELECT jsonb_object_field_text (config, 'hypertable')::regclass INTO STRICT ht;
  12. SELECT jsonb_object_field_text (config, 'lag')::interval INTO STRICT lag;
  13. SELECT jsonb_object_field_text (config, 'tablespace') INTO STRICT destination;
  14. IF ht IS NULL OR lag IS NULL OR destination IS NULL THEN
  15. RAISE EXCEPTION 'Config must have hypertable, lag and destination';
  16. END IF;
  17. FOR chunk IN
  18. SELECT show.oid
  19. FROM show_chunks(ht, older_than => lag)
  20. SHOW (oid)
  21. INNER JOIN pg_class pgc ON pgc.oid = show.oid
  22. INNER JOIN pg_tablespace pgts ON pgts.oid = pgc.reltablespace
  23. WHERE pgts.spcname != destination;
  24. LOOP
  25. RAISE NOTICE 'Moving chunk: %', chunk::text;
  26. EXECUTE format('ALTER TABLE %s SET TABLESPACE %I;', chunk, destination);
  27. END LOOP;
  28. END
  29. $$;

Register job to run daily moving chunks older than 12 months on hypertable metrics to tablespace old_chunks.

  1. SELECT add_job('move_chunks','1d', config => '{"hypertable":"metrics","lag":"12 month","tablespace":"old_chunks"}');

The above action uses the simpler ALTER TABLE ... SET TABLESPACE for moving a chunk, but it could alternatively be written in terms of TimescaleDB’s move_chunk. The move_chunk function also requires an index as input, but performs data re-ordering as part of the move (for faster subsequent queries) and requires lower lock levels, so the chunk remains available for reads during the move.

Downsample and Compress

Action that downsamples and compresses chunks on hypertable metrics older than a certain age. The example query computes a simple avg over hourly data for downsampling, but this query can be arbitrarily complex.

  1. CREATE OR REPLACE PROCEDURE downsample_compress (job_id int, config jsonb)
  2. LANGUAGE PLPGSQL
  3. AS $$
  4. DECLARE
  5. lag interval;
  6. chunk REGCLASS;
  7. tmp_name name;
  8. BEGIN
  9. SELECT jsonb_object_field_text (config, 'lag')::interval INTO STRICT lag;
  10. IF lag IS NULL THEN
  11. RAISE EXCEPTION 'Config must have lag';
  12. END IF;
  13. FOR chunk IN
  14. SELECT show.oid
  15. FROM show_chunks('metrics', older_than => lag) SHOW (oid)
  16. INNER JOIN pg_class pgc ON pgc.oid = show.oid
  17. INNER JOIN pg_namespace pgns ON pgc.relnamespace = pgns.oid
  18. INNER JOIN timescaledb_information.chunks chunk ON chunk.chunk_name = pgc.relname
  19. AND chunk.chunk_schema = pgns.nspname
  20. WHERE chunk.is_compressed::bool = FALSE
  21. LOOP
  22. RAISE NOTICE 'Processing chunk: %', chunk::text;
  23. -- build name for temp table
  24. SELECT '_tmp' || relname
  25. FROM pg_class
  26. WHERE oid = chunk INTO STRICT tmp_name;
  27. -- copy downsampled chunk data into temp table
  28. EXECUTE format($sql$ CREATE UNLOGGED TABLE %I AS
  29. SELECT time_bucket('1h', time), device_id, avg(value) FROM %s GROUP BY 1, 2;
  30. $sql$, tmp_name, chunk);
  31. -- clear original chunk
  32. EXECUTE format('TRUNCATE %s;', chunk);
  33. -- copy downsampled data back into chunk
  34. EXECUTE format('INSERT INTO %s(time, device_id, value) SELECT * FROM %I;', chunk, tmp_name);
  35. -- drop temp table
  36. EXECUTE format('DROP TABLE %I;', tmp_name);
  37. PERFORM compress_chunk (chunk);
  38. COMMIT;
  39. END LOOP;
  40. END
  41. $$;

Register job to run daily downsampling and compressing chunks older than 12 months.

  1. SELECT add_job('downsample_compress','1d', config => '{"lag":"12 month"}');