Use a user-defined action to downsample and compress chunks

TimescaleDB lets you downsample and compress chunks by combining a continuous aggregate refresh policy with a compression policy.

If you want to implement features not supported by those policies, you can write a user-defined action to downsample and compress chunks instead. The following example downsamples raw data to an average over hourly data. This is an illustrative example, which can be done more simply with a continuous aggregate policy. But you can make the query arbitrarily complex.

Using a user-defined action to downsample and compress chunks

  1. Create a procedure that first queries the chunks of a hypertable to determine if they are older than the lag parameter. The hypertable in this example is named metrics. If the chunk is not already compressed, downsample it by taking the average of the raw data. Then compress it. A temporary table is used to store the data while calculating the average.

    1. CREATE OR REPLACE PROCEDURE downsample_compress (job_id int, config jsonb)
    2. LANGUAGE PLPGSQL
    3. AS $$
    4. DECLARE
    5. lag interval;
    6. chunk REGCLASS;
    7. tmp_name name;
    8. BEGIN
    9. SELECT jsonb_object_field_text (config, 'lag')::interval INTO STRICT lag;
    10. IF lag IS NULL THEN
    11. RAISE EXCEPTION 'Config must have lag';
    12. END IF;
    13. FOR chunk IN
    14. SELECT show.oid
    15. FROM show_chunks('metrics', older_than => lag) SHOW (oid)
    16. INNER JOIN pg_class pgc ON pgc.oid = show.oid
    17. INNER JOIN pg_namespace pgns ON pgc.relnamespace = pgns.oid
    18. INNER JOIN timescaledb_information.chunks chunk ON chunk.chunk_name = pgc.relname
    19. AND chunk.chunk_schema = pgns.nspname
    20. WHERE chunk.is_compressed::bool = FALSE
    21. LOOP
    22. RAISE NOTICE 'Processing chunk: %', chunk::text;
    23. -- build name for temp table
    24. SELECT '_tmp' || relname
    25. FROM pg_class
    26. WHERE oid = chunk INTO STRICT tmp_name;
    27. -- copy downsampled chunk data into temp table
    28. EXECUTE format($sql$ CREATE UNLOGGED TABLE %I AS
    29. SELECT time_bucket('1h', time), device_id, avg(value) FROM %s GROUP BY 1, 2;
    30. $sql$, tmp_name, chunk);
    31. -- clear original chunk
    32. EXECUTE format('TRUNCATE %s;', chunk);
    33. -- copy downsampled data back into chunk
    34. EXECUTE format('INSERT INTO %s(time, device_id, value) SELECT * FROM %I;', chunk, tmp_name);
    35. -- drop temp table
    36. EXECUTE format('DROP TABLE %I;', tmp_name);
    37. PERFORM compress_chunk (chunk);
    38. COMMIT;
    39. END LOOP;
    40. END
    41. $$;
  2. Register the job to run daily. In the config, set lag to 12 months to drop chunks containing data older than 12 months.

    1. SELECT add_job('downsample_compress','1d', config => '{"lag":"12 month"}');