Tutorials

Tutorial: Continuous Aggregates

One kind of query that occurs frequently when performing data analysis is aggregating data into summaries.

For instance if we want to find out the average of a column for each day in the dataset stored in a hypertable, we would run

  1. SELECT time_bucket('1 day', time_dimension_column_name) bucket, avg(column_name), stddev(column_name)
  2. FROM hypertable
  3. GROUP BY bucket;

However, performing this query as written requires scanning all the data within the table, which can be inefficient if this query is called frequently. A continuous aggregate view recomputes the query automatically at user specified time intervals and materializes the results into a table. When the user queries the view, the system reads and processes the much smaller materialized table. This speeds up the query significantly. This is particularly useful when recomputing aggregates frequently for large data sets.

We will explore this feature using the data set from the Hello Timescale Tutorial

Pre-requisites

To complete this tutorial, you will need a cursory knowledge of the Structured Query Language (SQL). The tutorial will walk you through each SQL command, but it will be helpful if you’ve seen SQL before.

To start, install TimescaleDB. Once your installation is complete, we can proceed to ingesting or creating sample data and finishing the tutorial.

1. Download and Load Data

Let’s start by downloading the data.

This dataset contains two files:

  1. nyc_data_contagg.sql - A SQL file that will set up the necessary tables for the continuous aggregates tutorial.
  2. nyc_data_rides.csv - A CSV file with the ride data.

First, create a database, e.g., nyc_data with the extension:

  1. CREATE DATABASE nyc_data;
  2. \c nyc_data
  3. CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;

Next, download the file from the below link:

nyc_data.tar.gzContinuous aggregates - 图1

Then, follow these steps:

  1. # (1) unzip the archive
  2. tar -xvzf nyc_data.tar.gz
  3. # (2) import the table schemas
  4. psql -U postgres -d nyc_data -h localhost < nyc_data_contagg.sql
  5. # (3) import data
  6. psql -U postgres -d nyc_data -h localhost -c "\COPY rides FROM nyc_data_rides.csv CSV"

The data is now ready for you to use.

  1. # To access your database
  2. psql -U postgres -h localhost -d nyc_data

2. Create a continuous aggregate query

Let us assume we use the following query frequently to calculate hourly ride statistics.

  1. SELECT vendor_id, time_bucket('1h', pickup_datetime) as day,
  2. count(*) total_rides,
  3. avg(fare_amount) avg_fare,
  4. max(trip_distance) as max_trip_distance,
  5. min(trip_distance) as min_trip_distance
  6. FROM rides
  7. GROUP BY vendor_id, time_bucket('1h', pickup_datetime);

Everytime the query is run, the database recomputes the results for the query by scanning the entire table. With continuous aggregate queries, we have a way of telling TimescaleDB to cache the results and update them when the underlying data in the rides table is modified. Under the covers, TimescaleDB creates a materialization table where the result of this query is saved. The materialization table is updated at a refresh interval which is specified when the continuous aggregate query is created. For example, if we specify a refresh interval of 10 minutes, the continuous aggregate query checks the changes (inserts/updates/deletes) that were made to the rides table, recomputes the aggregates for the modified rows and updates the values in the materialization table.

We use the CREATE VIEW statement and specify timescaledb.continuous in the WITH clause to create a continuous aggregate query. We use timescaledb.refresh_interval parameter to specify that we want to update the continuous aggregate query every 30 minutes.

  1. CREATE VIEW cagg_rides_view WITH
  2. (timescaledb.continuous, timescaledb.refresh_interval = '30m')
  3. AS
  4. SELECT vendor_id, time_bucket('1h', pickup_datetime) as day,
  5. count(*) total_rides,
  6. avg(fare_amount) avg_fare,
  7. max(trip_distance) as max_trip_distance,
  8. min(trip_distance) as min_trip_distance
  9. FROM rides
  10. GROUP BY vendor_id, time_bucket('1h', pickup_datetime);

Note that a continuous aggregate query requires a group by clause with a time_bucket expression and the time_bucket expression uses the time dimension column of the rides hypertable.

  1. \d cagg_rides_view
  2. View "public.cagg_rides_view"
  3. Column | Type | Collation | Nullable | Default
  4. -------------------+-----------------------------+-----------+----------+---------
  5. vendor_id | text | | |
  6. day | timestamp without time zone | | |
  7. total_rides | bigint | | |
  8. avg_fare | numeric | | |
  9. max_trip_distance | numeric | | |
  10. min_trip_distance | numeric | | |

We can view the metadata for the continuous aggregate in the timescaledb_information.continuous_aggregates view.

  1. SELECT view_name, refresh_lag, refresh_interval, max_interval_per_job,
  2. ignore_invalidation_older_than, materialization_hypertable
  3. FROM timescaledb_information.continuous_aggregates;
  4. -[ RECORD 1 ]--------------+-------------------------------------------------
  5. view_name | cagg_rides_view
  6. refresh_lag | 02:00:00
  7. refresh_interval | 00:30:00
  8. max_interval_per_job | 20:00:00
  9. ignore_invalidation_older_than | 7 days
  10. materialization_hypertable | _timescaledb_internal._materialized_hypertable_2

The refresh_interval is set to 30 minutes. The computed aggregates are saved in the materialization table, _timescaledb_internal._materialized_hypertable_2.

What are refresh_lag and max_interval_per_job? We use the timescaledb.refresh_lag parameter to indicate by how much does the continuous aggregate query lag behind the data in the rides table. For example, if we expect frequent updates to the rides table for the current hour, we do not want to precompute the aggregates for that range. We would set the refresh_lag = '1h' to indicate that. (The default value is twice the bucket_width used by the time_bucket expression. This is the 2 hours shown for refresh_lag in the view output above). So the continuous aggregate will get refreshed every 30 minutes (refresh_interval) but will update the continuous aggregates only for the data that satisfies the condition: time_bucket('1h', pickup_datetime) < max(pickup_time) - '1h' (if the refresh_lag is set to 1 hour)

The timescaledb.max_interval_per_job parameter is used when we want to limit the amount of data processed by an update of the continuous aggregate and use smaller or bigger batch sizes (the batching is done automatically by TimescaleDB). timescaledb.max_interval_per_job specifies the batch size.

The parameters refresh_lag and max_interval_per_job can be specified while creating or altering a continuous aggregate. Refer to the documentation for the syntax.

The timescaledb.ignore_invalidation_older_than parameter is used when you want to avoid updating a continuous aggregate when modifying (that is, inserting, updating, or deleting) older record. Setting this to, for example, “1 week” will ensure that any update for a record that is older than 1 week will not trigger an update of the continuous aggregate.

3. Queries using continuous aggregates

We can use the continuous aggregate, just like any other view, in a SELECT query.

  1. SELECT vendor_id, day, total_rides FROM cagg_rides_view WHERE total_rides > 15000;
  2. vendor_id | day | total_rides
  3. -----------+---------------------+-------------
  4. 2 | 2016-01-01 01:00:00 | 15407
  5. (1 row)

4. Statistics for continuous aggregates

We can view information about the jobs that updated the continuous aggregates using the timescaledb_information.continuous_aggregate_stats view.

  1. SELECT * FROM timescaledb_information.continuous_aggregate_stats;
  2. -[ RECORD 1 ]----------+------------------------------
  3. view_name | cagg_rides_view
  4. completed_threshold | 2016-01-31 22:00:00
  5. invalidation_threshold | 2016-01-31 22:00:00
  6. job_id | 1000
  7. last_run_started_at | 2019-04-25 10:48:08.15141-04
  8. last_run_status | Success
  9. job_status | scheduled
  10. last_run_duration | 00:00:00.042841
  11. next_scheduled_run | 2019-04-25 11:18:08.194251-04
  12. total_runs | 1
  13. total_successes | 1
  14. total_failures | 0
  15. total_crashes | 0
  16. ---- fetch max pickup_datetime for comparison with completed_threshold ----
  17. SELECT max(pickup_datetime) FROM rides;
  18. -[ RECORD 1 ]------------
  19. max | 2016-01-31 23:59:59

The column job_id gives the id of the background worker that updates the continuous aggregate query. next_scheduled_run says when the next scheduled update will occur. The completed_threshold shows that rows with pickup_time value < ‘2016-01-31 22:00:00’ (from the rides table) were used to update the continuous aggregate. Since the refresh_lag is set to 2 hours, the completed threshold is 2 hours behind the maximum pickup_time in the
rides table. After a job completes, the invalidation_threshold and completed_threshold will be the same. These values differ when a background job is running.

5. Alter and Refresh of continuous aggregates

The parameters passed in the WITH clause can be modified using ALTER VIEW. We can modify the refresh_lag to 1 hour using ALTER VIEW.

  1. ALTER VIEW cagg_rides_view SET (timescaledb.refresh_lag='1h');
  2. ALTER VIEW
  3. SELECT view_name, refresh_lag, refresh_interval, max_interval_per_job, materialization_hypertable
  4. FROM timescaledb_information.continuous_aggregates;
  5. -[ RECORD 1 ]--------------+-------------------------------------------------
  6. view_name | cagg_rides_view
  7. refresh_lag | 01:00:00
  8. refresh_interval | 00:30:00
  9. max_interval_per_job | 20:00:00
  10. materialization_hypertable | _timescaledb_internal._materialized_hypertable_2

You will find more details about the API in the documentation.