Tutorial: Continuous Aggregates

One kind of query that occurs frequently when performing data analysis is aggregating data into summaries.

For instance if we want to find out the average of a column for each day in the dataset stored in a hypertable, we would run

  1. SELECT time_bucket('1 day', time_dimension_column_name) bucket,
  2. avg(column_name), stddev(column_name)
  3. FROM hypertable
  4. GROUP BY bucket;

However, performing this query as written requires scanning all the data within the hypertable, which can be inefficient if this query is called frequently. A continuous aggregate recomputes the query automatically at user-specified time intervals and materializes the results into a table. When the user queries the continuous aggregate, the system reads and processes the much smaller materialized table. This speeds up the query significantly. This is particularly useful when recomputing aggregates frequently for large data sets.

We will explore this feature using the data set from the Hello Timescale Tutorial

Pre-requisites

To complete this tutorial, you will need a cursory knowledge of the Structured Query Language (SQL). The tutorial will walk you through each SQL command, but it will be helpful if you’ve seen SQL before.

To start, install TimescaleDB. Once your installation is complete, we can proceed to ingesting or creating sample data and finishing the tutorial.

1. Download and Load Data

Let’s start by downloading the data.

This dataset contains two files:

  1. nyc_data_contagg.sql - A SQL file that will set up the necessary tables for the continuous aggregates tutorial.
  2. nyc_data_rides.csv - A CSV file with the ride data.

First, create a database, e.g., nyc_data with the extension:

  1. CREATE DATABASE nyc_data;
  2. \c nyc_data
  3. CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;

Next, download the file from the below link:

nyc_data.tar.gzContinuous aggregates - 图1

Then, follow these steps:

  1. # (1) unzip the archive
  2. tar -xvzf nyc_data.tar.gz
  3. # (2) import the table schemas
  4. psql -U postgres -d nyc_data -h localhost < nyc_data_contagg.sql
  5. # (3) import data
  6. psql -U postgres -d nyc_data -h localhost -c "\COPY rides FROM nyc_data_rides.csv CSV"

The data is now ready for you to use.

  1. # To access your database
  2. psql -U postgres -h localhost -d nyc_data

2. Create a continuous aggregate query

Let us assume we use the following query frequently to calculate hourly ride statistics.

  1. SELECT vendor_id, time_bucket('1h', pickup_datetime) as day,
  2. count(*) total_rides,
  3. avg(fare_amount) avg_fare,
  4. max(trip_distance) as max_trip_distance,
  5. min(trip_distance) as min_trip_distance
  6. FROM rides
  7. GROUP BY vendor_id, time_bucket('1h', pickup_datetime);

Every time the query is run, the database recomputes the results for the query by scanning the entire table. With continuous aggregates, we have a way of telling TimescaleDB to cache the results and update them when the underlying data in the rides table is modified.

Under the covers, TimescaleDB creates a materialization table where the result of this query is saved. The materialization table is updated at a schedule interval which is specified with the continuous aggregate policy. For example, if we specify a schedule interval of 30 minutes, the continuous aggregate checks every 30 minutes for changes (inserts/updates/deletes) that were made to the rides table, recomputes the aggregates for the modified rows and updates the values in the materialization table.

We use the CREATE MATERIALIZED VIEW statement and specify timescaledb.continuous in the WITH clause to create a continuous aggregate query. Then we use the
add_continuous_aggregation_policy() function to specify that we want to update the continuous aggregate view every 30 minutes.

TIP:If you have a lot of historical data to aggregate into the view, consider using the WITH NO DATA option as outlined in the alternative approach.

  1. CREATE MATERIALIZED VIEW cagg_rides_view WITH
  2. (timescaledb.continuous)
  3. AS
  4. SELECT vendor_id, time_bucket('1h', pickup_datetime) as day,
  5. count(*) total_rides,
  6. avg(fare_amount) avg_fare,
  7. max(trip_distance) as max_trip_distance,
  8. min(trip_distance) as min_trip_distance
  9. FROM rides
  10. GROUP BY vendor_id, time_bucket('1h', pickup_datetime);
  11. SELECT add_continuous_aggregate_policy('cagg_rides_view',
  12. start_offset => INTERVAL '1 week',
  13. end_offset => INTERVAL '1 hour',
  14. schedule_interval => INTERVAL '30 minutes');

Note that a continuous aggregate query requires a group by clause with a time_bucket expression and the time_bucket expression uses the time dimension column of the rides hypertable.

  1. \d cagg_rides_view
  2. View "public.cagg_rides_view"
  3. Column | Type | Collation | Nullable | Default
  4. -------------------+-----------------------------+-----------+----------+---------
  5. vendor_id | text | | |
  6. day | timestamp without time zone | | |
  7. total_rides | bigint | | |
  8. avg_fare | numeric | | |
  9. max_trip_distance | numeric | | |
  10. min_trip_distance | numeric | | |

We can view the metadata for the continuous aggregate by selecting information from the timescaledb_information.continuous_aggregates view joined to the timescaledb_information.jobs view.

  1. SELECT view_name, schedule_interval,
  2. config ->> 'start_offset' as start_offset,
  3. config ->> 'end_offset' as end_offset,
  4. next_start,
  5. materialization_hypertable_name
  6. FROM timescaledb_information.continuous_aggregates ca
  7. INNER JOIN timescaledb_information.jobs j
  8. ON ca.materialization_hypertable_name = j.hypertable_name
  9. WHERE view_name = 'cagg_rides_view';
  10. -[ RECORD 1 ]-------------------+---------------------------
  11. view_name | cagg_rides_view
  12. schedule_interval | 00:30:00
  13. start_offset | 7 days
  14. end_offset | 01:00:00
  15. next_start | 2020-10-29 15:59:51
  16. materialization_hypertable_name | _materialized_hypertable_9

As you can see from the result, the schedule_interval is set to 30 minutes. The computed aggregates are saved in the materialization table, _timescaledb_internal._materialized_hypertable_9.

What are start_offset and end_offset? These intervals determine the window of time TimescaleDB will look at when refreshing the data in the continuous aggregate. start_offset determines the oldest timestamp in the hypertable that TimescaleDB will look for changes to data in the underlying hypertable, while end_offset is the most recent timestamp interval that will be considered.

TIP:Comparing to TimescaleDB 1.x, start_offset is related to ignore_invalidation_older_than and end_offset is similar to refresh_lag, although the semantics of these new parameters are much simpler to understand.

For example, if we expect frequent updates to the rides table for the current hour, we do not want to materialize the aggregates for that range. We would set the end_offset = INTERVAL '1h' to indicate that. (If you don’t specify an end_offset, the default value is twice the bucket_width used by the time_bucket expression.)

So given the continuous aggregate policy that we created above, the continuous aggregate will get refreshed every 30 minutes (schedule_interval) but will update the continuous aggregates only for the data that satisfies the condition:

  1. time_bucket('1h', pickup_datetime) > max(pickup_time) - INTERVAL '7 days'
  2. AND time_bucket('1h', pickup_datetime) < max(pickup_time) - '1h'

3. Queries using continuous aggregates

We can use the continuous aggregate just like any other VIEW in a SELECT query.

  1. SELECT vendor_id, day, total_rides FROM cagg_rides_view WHERE total_rides > 15000;
  2. vendor_id | day | total_rides
  3. -----------+---------------------+-------------
  4. 2 | 2016-01-01 01:00:00 | 15407
  5. (1 row)

4. Statistics for continuous aggregates

We can view information about the jobs that update the continuous aggregate using the timescaledb_information.job_stats view.

  1. SELECT * FROM timescaledb_information.job_stats js
  2. WHERE job_id = (
  3. SELECT job_id FROM timescaledb_information.jobs j
  4. INNER JOIN timescaledb_information.continuous_aggregates ca
  5. ON j.hypertable_name = ca.materialization_hypertable_name
  6. WHERE ca.view_name = 'cagg_rides_view'
  7. );
  8. -[ RECORD 1 ]----------+------------------------------
  9. hypertable_schema | _timescaledb_internal
  10. hypertable_name | _materialized_hypertable_9
  11. job_id | 1006
  12. last_run_started_at | 2020-10-29 15:59:51.814305+00
  13. last_successful_finish | 2020-10-29 15:59:51.823749+00
  14. last_run_status | Success
  15. job_status | Scheduled
  16. last_run_duration | 00:00:00.009444
  17. next_start | 2020-10-29 16:29:51.823749+00
  18. total_runs | 2
  19. total_successes | 2
  20. total_failures | 0
  21. ---- fetch max pickup_datetime for comparison with completed_threshold ----
  22. SELECT max(pickup_datetime) FROM rides;
  23. -[ RECORD 1 ]------------
  24. max | 2016-01-31 23:59:59

The column job_id gives the id of the background worker that updates the continuous aggregate. next_start says when the next scheduled update will occur. last_sucessful_finish and last_run_duration allow you to see when the scheduled aggregate finished and how long it took to run.

5. Update the Continuous Aggregate schedule

Altering the schedule or parameters of a continuous aggregate policy is accomplished by first removing the existing policy and then creating a new updated policy with the desired settings.

To modify the policy that currently exists on the cagg_rides_view so that the start_offset is now equal to 1 month, you would:

  1. SELECT remove_continuous_aggregate_policy('cagg_rides_view');
  2. SELECT add_continuous_aggregate_policy('cagg_rides_view',
  3. start_offset => interval '1 month',
  4. end_offset => INTERVAL '1 hour',
  5. schedule_interval => INTERVAL '30 minutes');
  6. SELECT view_name, schedule_interval,
  7. config ->> 'start_offset' as start_offset,
  8. config ->> 'end_offset' as end_offset,
  9. next_start,
  10. materialization_hypertable_name
  11. FROM timescaledb_information.continuous_aggregates ca
  12. INNER JOIN timescaledb_information.jobs j
  13. ON ca.materialization_hypertable_name = j.hypertable_name
  14. WHERE view_name = 'cagg_rides_view';
  15. -[ RECORD 1 ]-------------------+---------------------------
  16. view_name | cagg_rides_view
  17. schedule_interval | 00:30:00
  18. start_offset | 1 mon
  19. end_offset | 01:00:00
  20. next_start | 2020-10-29 15:59:51
  21. materialization_hypertable_name | _materialized_hypertable_9

Notice that the start_offset is now set to 1 mon.

Using WITH NO DATA when creating a Continuous Aggregate

If you have a lot of historical data, we suggest creating the continuous aggregate using the WITH NO DATA parameter for the CREATE MATERIALIZED VIEW command. Doing so will allow the continuous aggregate to be created instantly (you won’t have to wait for the data to be aggregated on creation!). Data will then begin to populate as the continuous aggregate policy begins to run.

However, only data newer than start_offset would begin to populate the continuous aggregate. If you have historical data that is older than the start_offset INTERVAL, you need to manually refresh history up to the current start_offset to allow real-time queries to run efficiently.

Using the example in Step 2 above, if we had years worth of data, a better approach to creating the continuous aggregate is shown in the SQL below which:

  1. Creates the continuous aggregate using the WITH NO DATA,
  2. Refreshes the history explicitly, then
  3. Adds the continuous aggregate refresh policy separately.
  1. CREATE MATERIALIZED VIEW cagg_rides_view WITH
  2. (timescaledb.continuous)
  3. AS
  4. SELECT vendor_id, time_bucket('1h', pickup_datetime) as day,
  5. count(*) total_rides,
  6. avg(fare_amount) avg_fare,
  7. max(trip_distance) as max_trip_distance,
  8. min(trip_distance) as min_trip_distance
  9. FROM rides
  10. GROUP BY vendor_id, time_bucket('1h', pickup_datetime)
  11. WITH NO DATA;
  12. CALL refresh_continuous_aggregate('cagg_rides_view', NULL, localtimestamp - INTERVAL '1 week');
  13. SELECT add_continuous_aggregate_policy('cagg_rides_view',
  14. start_offset => INTERVAL '1 week',
  15. end_offset => INTERVAL '1 hour',
  16. schedule_interval => INTERVAL '30 minutes');

You will find more details about the Continuous Aggregate API in its documentation.