User-defined actions

User-defined actions allow you to run functions and procedures implemented in a language of your choice on a schedule within TimescaleDB. This allows automatic periodic tasks that are not covered by existing policies, or the ability to enhance existing policies with additional functionality.

User-defined actions have allow free-form configuration through a JSONB object which allows endless flexibility and reusability.

Examples

The following section provides a number of examples of user-defined actions that you can specify and subsequently schedule as part of TimescaleDB’s automation framework.

Generic retention

Create a generic data retention policy that applies to ALL hypertables, as opposed to just a single one as required by add_retention_policy. The policy could be further refined with additional filters, by adding a WHERE clause to the PERFORM query in the procedure definition.

  1. CREATE OR REPLACE PROCEDURE generic_retention (job_id int, config jsonb)
  2. LANGUAGE PLPGSQL
  3. AS $$
  4. DECLARE
  5. drop_after interval;
  6. BEGIN
  7. SELECT jsonb_object_field_text (config, 'drop_after')::interval INTO STRICT drop_after;
  8. IF drop_after IS NULL THEN
  9. RAISE EXCEPTION 'Config must have drop_after';
  10. END IF;
  11. PERFORM drop_chunks(format('%I.%I', table_schema, table_name), older_than => drop_after)
  12. FROM timescaledb_information.hypertables;
  13. END
  14. $$;

Register job to run daily dropping chunks on all hypertables that are older than 12 months.

  1. SELECT add_job('generic_retention','1d', config => '{"drop_after":"12 month"}');

Tiered Storage

Action that moves chunks older than a certain time to a different tablespace.

  1. CREATE OR REPLACE PROCEDURE move_chunks (job_id int, config jsonb)
  2. LANGUAGE PLPGSQL
  3. AS $$
  4. DECLARE
  5. ht REGCLASS;
  6. lag interval;
  7. destination name;
  8. chunk REGCLASS;
  9. tmp_name name;
  10. BEGIN
  11. SELECT jsonb_object_field_text (config, 'hypertable')::regclass INTO STRICT ht;
  12. SELECT jsonb_object_field_text (config, 'lag')::interval INTO STRICT lag;
  13. SELECT jsonb_object_field_text (config, 'tablespace') INTO STRICT destination;
  14. IF ht IS NULL OR lag IS NULL OR destination IS NULL THEN
  15. RAISE EXCEPTION 'Config must have hypertable, lag and destination';
  16. END IF;
  17. FOR chunk IN
  18. SELECT show.oid
  19. FROM show_chunks(ht, older_than => lag)
  20. SHOW (oid)
  21. INNER JOIN pg_class pgc ON pgc.oid = show.oid
  22. INNER JOIN pg_tablespace pgts ON pgts.oid = pgc.reltablespace
  23. WHERE pgts.spcname != destination;
  24. LOOP
  25. RAISE NOTICE 'Moving chunk: %', chunk::text;
  26. EXECUTE format('ALTER TABLE %s SET TABLESPACE %I;', chunk, destination);
  27. END LOOP;
  28. END
  29. $$;

Register job to run daily moving chunks older than 12 months on hypertable metrics to tablespace old_chunks.

  1. SELECT add_job('move_chunks','1d', config => '{"hypertable":"metrics","lag":"12 month","tablespace":"old_chunks"}');

The above action uses the simpler ALTER TABLE ... SET TABLESPACE for moving a chunk, but it could alternatively be written in terms of TimescaleDB’s move_chunk. The move_chunk function also requires an index as input, but performs data re-ordering as part of the move (for faster subsequent queries) and requires lower lock levels, so the chunk remains available for reads during the move.

Downsample and compress

Action that downsamples and compresses chunks on hypertable metrics older than a certain age. The example query computes a simple avg over hourly data for downsampling, but this query can be arbitrarily complex.

  1. CREATE OR REPLACE PROCEDURE downsample_compress (job_id int, config jsonb)
  2. LANGUAGE PLPGSQL
  3. AS $$
  4. DECLARE
  5. lag interval;
  6. chunk REGCLASS;
  7. tmp_name name;
  8. BEGIN
  9. SELECT jsonb_object_field_text (config, 'lag')::interval INTO STRICT lag;
  10. IF lag IS NULL THEN
  11. RAISE EXCEPTION 'Config must have lag';
  12. END IF;
  13. FOR chunk IN
  14. SELECT show.oid
  15. FROM show_chunks('metrics', older_than => lag) SHOW (oid)
  16. INNER JOIN pg_class pgc ON pgc.oid = show.oid
  17. INNER JOIN pg_namespace pgns ON pgc.relnamespace = pgns.oid
  18. INNER JOIN timescaledb_information.chunks chunk ON chunk.chunk_name = pgc.relname
  19. AND chunk.chunk_schema = pgns.nspname
  20. WHERE chunk.is_compressed::bool = FALSE
  21. LOOP
  22. RAISE NOTICE 'Processing chunk: %', chunk::text;
  23. -- build name for temp table
  24. SELECT '_tmp' || relname
  25. FROM pg_class
  26. WHERE oid = chunk INTO STRICT tmp_name;
  27. -- copy downsampled chunk data into temp table
  28. EXECUTE format($sql$ CREATE UNLOGGED TABLE %I AS
  29. SELECT time_bucket('1h', time), device_id, avg(value) FROM %s GROUP BY 1, 2;
  30. $sql$, tmp_name, chunk);
  31. -- clear original chunk
  32. EXECUTE format('TRUNCATE %s;', chunk);
  33. -- copy downsampled data back into chunk
  34. EXECUTE format('INSERT INTO %s(time, device_id, value) SELECT * FROM %I;', chunk, tmp_name);
  35. -- drop temp table
  36. EXECUTE format('DROP TABLE %I;', tmp_name);
  37. PERFORM compress_chunk (chunk);
  38. COMMIT;
  39. END LOOP;
  40. END
  41. $$;

Register job to run daily downsampling and compressing chunks older than 12 months.

  1. SELECT add_job('downsample_compress','1d', config => '{"lag":"12 month"}');