Streaming

Calcite has extended SQL and relational algebra in order to supportstreaming queries.

Introduction

Streams are collections to records that flow continuously, and forever.Unlike tables, they are not typically stored on disk, but flow over thenetwork and are held for short periods of time in memory.

Streams complement tables because they represent what is happening in thepresent and future of the enterprise whereas tables represent the past.It is very common for a stream to be archived into a table.

Like tables, you often want to query streams in a high-level languagebased on relational algebra, validated according to a schema, and optimizedto take advantage of available resources and algorithms.

Calcite’s SQL is an extension to standard SQL, not another ‘SQL-like’ language.The distinction is important, for several reasons:

  • Streaming SQL is easy to learn for anyone who knows regular SQL.
  • The semantics are clear, because we aim to produce the same results on astream as if the same data were in a table.
  • You can write queries that combine streams and tables (or the history ofa stream, which is basically an in-memory table).
  • Lots of existing tools can generate standard SQL.

If you don’t use the STREAM keyword, you are back in regularstandard SQL.

An example schema

Our streaming SQL examples use the following schema:

  • Orders (rowtime, productId, orderId, units) - a stream and a table
  • Products (rowtime, productId, name) - a table
  • Shipments (rowtime, orderId) - a stream

A simple query

Let’s start with the simplest streaming query:

  1. SELECT STREAM *
  2. FROM Orders;
  3. rowtime | productId | orderId | units
  4. ----------+-----------+---------+-------
  5. 10:17:00 | 30 | 5 | 4
  6. 10:17:05 | 10 | 6 | 1
  7. 10:18:05 | 20 | 7 | 2
  8. 10:18:07 | 30 | 8 | 20
  9. 11:02:00 | 10 | 9 | 6
  10. 11:04:00 | 10 | 10 | 1
  11. 11:09:30 | 40 | 11 | 12
  12. 11:24:11 | 10 | 12 | 4

This query reads all columns and rows from the Orders stream.Like any streaming query, it never terminates. It outputs a record whenevera record arrives in Orders.

Type Control-C to terminate the query.

The STREAM keyword is the main extension in streaming SQL. It tells thesystem that you are interested in incoming orders, not existing ones. The query

  1. SELECT *
  2. FROM Orders;
  3. rowtime | productId | orderId | units
  4. ----------+-----------+---------+-------
  5. 08:30:00 | 10 | 1 | 3
  6. 08:45:10 | 20 | 2 | 1
  7. 09:12:21 | 10 | 3 | 10
  8. 09:27:44 | 30 | 4 | 2
  9. 4 records returned.

is also valid, but will print out all existing orders and then terminate. Wecall it a relational query, as opposed to streaming. It has traditionalSQL semantics.

Orders is special, in that it has both a stream and a table. If you try to runa streaming query on a table, or a relational query on a stream, Calcite givesan error:

  1. SELECT * FROM Shipments;
  2. ERROR: Cannot convert stream 'SHIPMENTS' to a table
  3. SELECT STREAM * FROM Products;
  4. ERROR: Cannot convert table 'PRODUCTS' to a stream

Filtering rows

Just as in regular SQL, you use a WHERE clause to filter rows:

  1. SELECT STREAM *
  2. FROM Orders
  3. WHERE units > 3;
  4. rowtime | productId | orderId | units
  5. ----------+-----------+---------+-------
  6. 10:17:00 | 30 | 5 | 4
  7. 10:18:07 | 30 | 8 | 20
  8. 11:02:00 | 10 | 9 | 6
  9. 11:09:30 | 40 | 11 | 12
  10. 11:24:11 | 10 | 12 | 4

Projecting expressions

Use expressions in the SELECT clause to choose which columns to return orcompute expressions:

  1. SELECT STREAM rowtime,
  2. 'An order for ' || units || ' '
  3. || CASE units WHEN 1 THEN 'unit' ELSE 'units' END
  4. || ' of product #' || productId AS description
  5. FROM Orders;
  6. rowtime | description
  7. ----------+---------------------------------------
  8. 10:17:00 | An order for 4 units of product #30
  9. 10:17:05 | An order for 1 unit of product #10
  10. 10:18:05 | An order for 2 units of product #20
  11. 10:18:07 | An order for 20 units of product #30
  12. 11:02:00 | An order by 6 units of product #10
  13. 11:04:00 | An order by 1 unit of product #10
  14. 11:09:30 | An order for 12 units of product #40
  15. 11:24:11 | An order by 4 units of product #10

We recommend that you always include the rowtime column in the SELECTclause. Having a sorted timestamp in each stream and streaming query makes itpossible to do advanced calculations later, such as GROUP BY and JOIN.

Tumbling windows

There are several ways to compute aggregate functions on streams. Thedifferences are:

  • How many rows come out for each row in?
  • Does each incoming value appear in one total, or more?
  • What defines the “window”, the set of rows that contribute to a given output row?
  • Is the result a stream or a relation?

There are various window types:

  • tumbling window (GROUP BY)
  • hopping window (multi GROUP BY)
  • sliding window (window functions)
  • cascading window (window functions)

and the following diagram shows the kinds of query in which to use them:

Window types

First we’ll look a tumbling window, which is defined by a streamingGROUP BY. Here is an example:

  1. SELECT STREAM CEIL(rowtime TO HOUR) AS rowtime,
  2. productId,
  3. COUNT(*) AS c,
  4. SUM(units) AS units
  5. FROM Orders
  6. GROUP BY CEIL(rowtime TO HOUR), productId;
  7. rowtime | productId | c | units
  8. ----------+-----------+---------+-------
  9. 11:00:00 | 30 | 2 | 24
  10. 11:00:00 | 10 | 1 | 1
  11. 11:00:00 | 20 | 1 | 7
  12. 12:00:00 | 10 | 3 | 11
  13. 12:00:00 | 40 | 1 | 12

The result is a stream. At 11 o’clock, Calcite emits a sub-total for everyproductId that had an order since 10 o’clock, timestamped 11 o’clock.At 12 o’clock, it will emitthe orders that occurred between 11:00 and 12:00. Each input row contributes toonly one output row.

How did Calcite know that the 10:00:00 sub-totals were complete at 11:00:00,so that it could emit them? It knows that rowtime is increasing, and it knowsthat CEIL(rowtime TO HOUR) is also increasing. So, once it has seen a rowat or after 11:00:00, it will never see a row that will contribute to a 10:00:00total.

A column or expression that is increasing or decreasing is said to bemonotonic.

If column or expression has values that are slightly out of order,and the stream has a mechanism (such as punctuation or watermarks)to declare that a particular value will never be seen again, thenthe column or expression is said to be quasi-monotonic.

Without a monotonic or quasi-monotonic expression in the GROUP BY clause,Calcite isnot able to make progress, and it will not allow the query:

  1. SELECT STREAM productId,
  2. COUNT(*) AS c,
  3. SUM(units) AS units
  4. FROM Orders
  5. GROUP BY productId;
  6. ERROR: Streaming aggregation requires at least one monotonic expression in GROUP BY clause

Monotonic and quasi-monotonic columns need to be declared in the schema.The monotonicity isenforced when records enter the stream and assumed by queries that read fromthat stream. We recommend that you give each stream a timestamp column calledrowtime, but you can declare others to be monotonic, orderId, for example.

We discuss punctuation, watermarks, and other ways of making progressbelow.

Tumbling windows, improved

The previous example of tumbling windows was easy to write because the windowwas one hour. For intervals that are not a whole time unit, say 2 hours or2 hours and 17 minutes, you cannot use CEIL, and the expression gets morecomplicated.

Calcite supports an alternative syntax for tumbling windows:

  1. SELECT STREAM TUMBLE_END(rowtime, INTERVAL '1' HOUR) AS rowtime,
  2. productId,
  3. COUNT(*) AS c,
  4. SUM(units) AS units
  5. FROM Orders
  6. GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId;
  7. rowtime | productId | c | units
  8. ----------+-----------+---------+-------
  9. 11:00:00 | 30 | 2 | 24
  10. 11:00:00 | 10 | 1 | 1
  11. 11:00:00 | 20 | 1 | 7
  12. 12:00:00 | 10 | 3 | 11
  13. 12:00:00 | 40 | 1 | 12

As you can see, it returns the same results as the previous query. The TUMBLEfunction returns a grouping key that is the same for all the rows that will endup in a given summary row; the TUMBLE_END function takes the same argumentsand returns the time at which that window ends;there is also a TUMBLE_START function.

TUMBLE has an optional parameter to align the window.In the following example,we use a 30 minute interval and 0:12 as the alignment time,so the query emits summaries at 12 and 42 minutes past each hour:

  1. SELECT STREAM
  2. TUMBLE_END(rowtime, INTERVAL '30' MINUTE, TIME '0:12') AS rowtime,
  3. productId,
  4. COUNT(*) AS c,
  5. SUM(units) AS units
  6. FROM Orders
  7. GROUP BY TUMBLE(rowtime, INTERVAL '30' MINUTE, TIME '0:12'),
  8. productId;
  9. rowtime | productId | c | units
  10. ----------+-----------+---------+-------
  11. 10:42:00 | 30 | 2 | 24
  12. 10:42:00 | 10 | 1 | 1
  13. 10:42:00 | 20 | 1 | 7
  14. 11:12:00 | 10 | 2 | 7
  15. 11:12:00 | 40 | 1 | 12
  16. 11:42:00 | 10 | 1 | 4

Hopping windows

Hopping windows are a generalization of tumbling windows that allow data tobe kept in a window for a longer than the emit interval.

For example, the following query emits a row timestamped 11:00 containing datafrom 08:00 to 11:00 (or 10:59.9 if we’re being pedantic),and a row timestamped 12:00 containing data from 09:00to 12:00.

  1. SELECT STREAM
  2. HOP_END(rowtime, INTERVAL '1' HOUR, INTERVAL '3' HOUR) AS rowtime,
  3. COUNT(*) AS c,
  4. SUM(units) AS units
  5. FROM Orders
  6. GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '3' HOUR);
  7. rowtime | c | units
  8. ----------+----------+-------
  9. 11:00:00 | 4 | 27
  10. 12:00:00 | 8 | 50

In this query, because the retain period is 3 times the emit period, every inputrow contributes to exactly 3 output rows. Imagine that the HOP functiongenerates a collection of group keys for incoming row, and places its valuesin the accumulators of each of those group keys. For example,HOP(10:18:00, INTERVAL '1' HOUR, INTERVAL '3') generates 3 periods

[08:00, 09:00)[09:00, 10:00)[10:00, 11:00)

This raises the possibility of allowing user-defined partitioning functionsfor users who are not happy with the built-in functions HOP and TUMBLE.

We can build complex complex expressions such as an exponentially decayingmoving average:

  1. SELECT STREAM HOP_END(rowtime),
  2. productId,
  3. SUM(unitPrice * EXP((rowtime - HOP_START(rowtime)) SECOND / INTERVAL '1' HOUR))
  4. / SUM(EXP((rowtime - HOP_START(rowtime)) SECOND / INTERVAL '1' HOUR))
  5. FROM Orders
  6. GROUP BY HOP(rowtime, INTERVAL '1' SECOND, INTERVAL '1' HOUR),
  7. productId

Emits:

  • a row at 11:00:00 containing rows in [10:00:00, 11:00:00);
  • a row at 11:00:01 containing rows in [10:00:01, 11:00:01).

The expression weighs recent orders more heavily than older orders.Extending the window from 1 hour to 2 hours or 1 year would havevirtually no effect on the accuracy of the result (but use more memoryand compute).

Note that we use HOP_START inside an aggregate function (SUM) because itis a value that is constant for all rows within a sub-total. Thiswould not be allowed for typical aggregate functions (SUM, COUNTetc.).

If you are familiar with GROUPING SETS, you may notice that partitioningfunctions can be seen as a generalization of GROUPING SETS, in that theyallow an input row to contribute to multiple sub-totals.The auxiliary functions for GROUPING SETS,such as GROUPING() and GROUP_ID,can be used inside aggregate functions, so it is not surprising thatHOP_START and HOP_END can be used in the same way.

GROUPING SETS

GROUPING SETS is valid for a streaming query provided that everygrouping set contains a monotonic or quasi-monotonic expression.

CUBE and ROLLUP are not valid for streaming query, because they willproduce at least one grouping set that aggregates everything (likeGROUP BY ()).

Filtering after aggregation

As in standard SQL, you can apply a HAVING clause to filter rows emitted bya streaming GROUP BY:

  1. SELECT STREAM TUMBLE_END(rowtime, INTERVAL '1' HOUR) AS rowtime,
  2. productId
  3. FROM Orders
  4. GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId
  5. HAVING COUNT(*) > 2 OR SUM(units) > 10;
  6. rowtime | productId
  7. ----------+-----------
  8. 10:00:00 | 30
  9. 11:00:00 | 10
  10. 11:00:00 | 40

Sub-queries, views and SQL’s closure property

The previous HAVING query can be expressed using a WHERE clause on asub-query:

  1. SELECT STREAM rowtime, productId
  2. FROM (
  3. SELECT TUMBLE_END(rowtime, INTERVAL '1' HOUR) AS rowtime,
  4. productId,
  5. COUNT(*) AS c,
  6. SUM(units) AS su
  7. FROM Orders
  8. GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId)
  9. WHERE c > 2 OR su > 10;
  10. rowtime | productId
  11. ----------+-----------
  12. 10:00:00 | 30
  13. 11:00:00 | 10
  14. 11:00:00 | 40

HAVING was introduced in the early days of SQL, when a way was needed toperform a filter after aggregation. (Recall that WHERE filters rows beforethey enter the GROUP BY clause.)

Since then, SQL has become a mathematically closed language, which means thatany operation you can perform on a table can also perform on a query.

The closure property of SQL is extremely powerful. Not only does it renderHAVING obsolete (or, at least, reduce it to syntactic sugar), it makes viewspossible:

  1. CREATE VIEW HourlyOrderTotals (rowtime, productId, c, su) AS
  2. SELECT TUMBLE_END(rowtime, INTERVAL '1' HOUR),
  3. productId,
  4. COUNT(*),
  5. SUM(units)
  6. FROM Orders
  7. GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId;
  8. SELECT STREAM rowtime, productId
  9. FROM HourlyOrderTotals
  10. WHERE c > 2 OR su > 10;
  11. rowtime | productId
  12. ----------+-----------
  13. 10:00:00 | 30
  14. 11:00:00 | 10
  15. 11:00:00 | 40

Sub-queries in the FROM clause are sometimes referred to as “inline views”,but really, they are more fundamental than views. Views are just a convenientway to carve your SQL into manageable chunks by giving the pieces names andstoring them in the metadata repository.

Many people find that nested queries and views are even more useful on streamsthan they are on relations. Streaming queries are pipelines ofoperators all running continuously, and often those pipelines get quite long.Nested queries and views help to express and manage those pipelines.

And, by the way, a WITH clause can accomplish the same as a sub-query ora view:

  1. WITH HourlyOrderTotals (rowtime, productId, c, su) AS (
  2. SELECT TUMBLE_END(rowtime, INTERVAL '1' HOUR),
  3. productId,
  4. COUNT(*),
  5. SUM(units)
  6. FROM Orders
  7. GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), productId)
  8. SELECT STREAM rowtime, productId
  9. FROM HourlyOrderTotals
  10. WHERE c > 2 OR su > 10;
  11. rowtime | productId
  12. ----------+-----------
  13. 10:00:00 | 30
  14. 11:00:00 | 10
  15. 11:00:00 | 40

Converting between streams and relations

Look back at the definition of the HourlyOrderTotals view.Is the view a stream or a relation?

It does not contain the STREAM keyword, so it is a relation.However, it is a relation that can be converted into a stream.

You can use it in both relational and streaming queries:

  1. # A relation; will query the historic Orders table.
  2. # Returns the largest number of product #10 ever sold in one hour.
  3. SELECT max(su)
  4. FROM HourlyOrderTotals
  5. WHERE productId = 10;
  6. # A stream; will query the Orders stream.
  7. # Returns every hour in which at least one product #10 was sold.
  8. SELECT STREAM rowtime
  9. FROM HourlyOrderTotals
  10. WHERE productId = 10;

This approach is not limited to views and sub-queries.Following the approach set out in CQL [1], every queryin streaming SQL is defined as a relational query and converted to a streamusing the STREAM keyword in the top-most SELECT.

If the STREAM keyword is present in sub-queries or view definitions, it has noeffect.

At query preparation time, Calcite figures out whether the relations referencedin the query can be converted to streams or historical relations.

Sometimes a stream makes available some of its history (say the last 24 hours ofdata in an Apache Kafka [2] topic)but not all. At run time, Calcite figures out whether there is sufficienthistory to run the query, and if not, gives an error.

The “pie chart” problem: Relational queries on streams

One particular case where you need to convert a stream to a relationoccurs in what I call the “pie chart problem”. Imagine that you need towrite a web page with a chart, like the following, that summarizes thenumber of orders for each product over the last hour.

Pie chart

But the Orders stream only contains a few records, not an hour’s summary.We need to run a relational query on the history of the stream:

  1. SELECT productId, count(*)
  2. FROM Orders
  3. WHERE rowtime BETWEEN current_timestamp - INTERVAL '1' HOUR
  4. AND current_timestamp;

If the history of the Orders stream is being spooled to the Orders table,we can answer the query, albeit at a high cost. Better, if we can tell thesystem to materialize one hour summary into a table,maintain it continuously as the stream flows,and automatically rewrite queries to use the table.

Sorting

The story for ORDER BY is similar to GROUP BY.The syntax looks like regular SQL, but Calcite must be sure that it can delivertimely results. It therefore requires a monotonic expression on the leading edgeof your ORDER BY key.

  1. SELECT STREAM CEIL(rowtime TO hour) AS rowtime, productId, orderId, units
  2. FROM Orders
  3. ORDER BY CEIL(rowtime TO hour) ASC, units DESC;
  4. rowtime | productId | orderId | units
  5. ----------+-----------+---------+-------
  6. 10:00:00 | 30 | 8 | 20
  7. 10:00:00 | 30 | 5 | 4
  8. 10:00:00 | 20 | 7 | 2
  9. 10:00:00 | 10 | 6 | 1
  10. 11:00:00 | 40 | 11 | 12
  11. 11:00:00 | 10 | 9 | 6
  12. 11:00:00 | 10 | 12 | 4
  13. 11:00:00 | 10 | 10 | 1

Most queries will return results in the order that they were inserted,because the engine is using streaming algorithms, but you should not rely on it.For example, consider this:

  1. SELECT STREAM *
  2. FROM Orders
  3. WHERE productId = 10
  4. UNION ALL
  5. SELECT STREAM *
  6. FROM Orders
  7. WHERE productId = 30;
  8. rowtime | productId | orderId | units
  9. ----------+-----------+---------+-------
  10. 10:17:05 | 10 | 6 | 1
  11. 10:17:00 | 30 | 5 | 4
  12. 10:18:07 | 30 | 8 | 20
  13. 11:02:00 | 10 | 9 | 6
  14. 11:04:00 | 10 | 10 | 1
  15. 11:24:11 | 10 | 12 | 4

The rows with productId = 30 are apparently out of order, probably becausethe Orders stream was partitioned on productId and the partitioned streamssent their data at different times.

If you require a particular ordering, add an explicit ORDER BY:

  1. SELECT STREAM *
  2. FROM Orders
  3. WHERE productId = 10
  4. UNION ALL
  5. SELECT STREAM *
  6. FROM Orders
  7. WHERE productId = 30
  8. ORDER BY rowtime;
  9. rowtime | productId | orderId | units
  10. ----------+-----------+---------+-------
  11. 10:17:00 | 30 | 5 | 4
  12. 10:17:05 | 10 | 6 | 1
  13. 10:18:07 | 30 | 8 | 20
  14. 11:02:00 | 10 | 9 | 6
  15. 11:04:00 | 10 | 10 | 1
  16. 11:24:11 | 10 | 12 | 4

Calcite will probably implement the UNION ALL by merging using rowtime,which is only slightly less efficient.

You only need to add an ORDER BY to the outermost query. If you need to,say, perform GROUP BY after a UNION ALL, Calcite will add an ORDER BYimplicitly, in order to make the GROUP BY algorithm possible.

Table constructor

The VALUES clause creates an inline table with a given set of rows.

Streaming is disallowed. The set of rows never changes, and therefore a streamwould never return any rows.

  1. > SELECT STREAM * FROM (VALUES (1, 'abc'));
  2. ERROR: Cannot stream VALUES

Sliding windows

Standard SQL features so-called “analytic functions” that can be used in theSELECT clause. Unlike GROUP BY, these do not collapse records. For eachrecord that goes in, one record comes out. But the aggregate function is basedon a window of many rows.

Let’s look at an example.

  1. SELECT STREAM rowtime,
  2. productId,
  3. units,
  4. SUM(units) OVER (ORDER BY rowtime RANGE INTERVAL '1' HOUR PRECEDING) unitsLastHour
  5. FROM Orders;

The feature packs a lot of power with little effort. You can have multiplefunctions in the SELECT clause, based on multiple window specifications.

The following example returns orders whose average order size over the last10 minutes is greater than the average order size for the last week.

  1. SELECT STREAM *
  2. FROM (
  3. SELECT STREAM rowtime,
  4. productId,
  5. units,
  6. AVG(units) OVER product (RANGE INTERVAL '10' MINUTE PRECEDING) AS m10,
  7. AVG(units) OVER product (RANGE INTERVAL '7' DAY PRECEDING) AS d7
  8. FROM Orders
  9. WINDOW product AS (
  10. ORDER BY rowtime
  11. PARTITION BY productId))
  12. WHERE m10 > d7;

For conciseness, here we use a syntax where you partially define a windowusing a WINDOW clause and then refine the window in each OVER clause.You could also define all windows in the WINDOW clause, or all windows inline,if you wish.

But the real power goes beyond syntax. Behind the scenes, this query ismaintaining two tables, and adding and removing values from sub-totals usingwith FIFO queues. But you can access those tables without introducing a joininto the query.

Some other features of the windowed aggregation syntax:

  • You can define windows based on row count.
  • The window can reference rows that have not yet arrived.(The stream will wait until they have arrived).
  • You can compute order-dependent functions such as RANK and median.

Cascading windows

What if we want a query that returns a result for every record, like asliding window, but resets totals on a fixed time period, like atumbling window? Such a pattern is called a cascading window. Hereis an example:

  1. SELECT STREAM rowtime,
  2. productId,
  3. units,
  4. SUM(units) OVER (PARTITION BY FLOOR(rowtime TO HOUR)) AS unitsSinceTopOfHour
  5. FROM Orders;

It looks similar to a sliding window query, but the monotonicexpression occurs within the PARTITION BY clause of the window. Asthe rowtime moves from from 10:59:59 to 11:00:00,FLOOR(rowtime TO HOUR) changes from 10:00:00 to 11:00:00,and therefore a new partition starts.The first row to arrive in the new hour will start anew total; the second row will have a total that consists of two rows,and so on.

Calcite knows that the old partition will never be used again, soremoves all sub-totals for that partition from its internal storage.

Analytic functions that using cascading and sliding windows can becombined in the same query.

Joining streams to tables

There are two kinds of join where streams are concerned: stream-to-tablejoin and stream-to-stream join.

A stream-to-table join is straightforward if the contents of the tableare not changing. This query enriches a stream of orders witheach product’s list price:

  1. SELECT STREAM o.rowtime, o.productId, o.orderId, o.units,
  2. p.name, p.unitPrice
  3. FROM Orders AS o
  4. JOIN Products AS p
  5. ON o.productId = p.productId;
  6. rowtime | productId | orderId | units | name | unitPrice
  7. ----------+-----------+---------+-------+ -------+-----------
  8. 10:17:00 | 30 | 5 | 4 | Cheese | 17
  9. 10:17:05 | 10 | 6 | 1 | Beer | 0.25
  10. 10:18:05 | 20 | 7 | 2 | Wine | 6
  11. 10:18:07 | 30 | 8 | 20 | Cheese | 17
  12. 11:02:00 | 10 | 9 | 6 | Beer | 0.25
  13. 11:04:00 | 10 | 10 | 1 | Beer | 0.25
  14. 11:09:30 | 40 | 11 | 12 | Bread | 100
  15. 11:24:11 | 10 | 12 | 4 | Beer | 0.25

What should happen if the table is changing? For example,suppose the unit price of product 10 is increased to 0.35 at 11:00.Orders placed before 11:00 should have the old price, and ordersplaced after 11:00 should reflect the new price.

One way to implement this is to have a table that keeps every versionwith a start and end effective date, ProductVersions in the followingexample:

  1. SELECT STREAM *
  2. FROM Orders AS o
  3. JOIN ProductVersions AS p
  4. ON o.productId = p.productId
  5. AND o.rowtime BETWEEN p.startDate AND p.endDate
  6. rowtime | productId | orderId | units | productId1 | name | unitPrice
  7. ----------+-----------+---------+-------+ -----------+--------+-----------
  8. 10:17:00 | 30 | 5 | 4 | 30 | Cheese | 17
  9. 10:17:05 | 10 | 6 | 1 | 10 | Beer | 0.25
  10. 10:18:05 | 20 | 7 | 2 | 20 | Wine | 6
  11. 10:18:07 | 30 | 8 | 20 | 30 | Cheese | 17
  12. 11:02:00 | 10 | 9 | 6 | 10 | Beer | 0.35
  13. 11:04:00 | 10 | 10 | 1 | 10 | Beer | 0.35
  14. 11:09:30 | 40 | 11 | 12 | 40 | Bread | 100
  15. 11:24:11 | 10 | 12 | 4 | 10 | Beer | 0.35

The other way to implement this is to use a database with temporal support(the ability to find the contents of the database as it was at any momentin the past), and the system needs to know that the rowtime column ofthe Orders stream corresponds to the transaction timestamp of theProducts table.

For many applications, it is not worth the cost and effort of temporalsupport or a versioned table. It is acceptable to the application thatthe query gives different results when replayed: in this example, on replay,all orders of product 10 are assigned the later unit price, 0.35.

Joining streams to streams

It makes sense to join two streams if the join condition somehow forcesthem to remain a finite distance from one another. In the following query,the ship date is within one hour of the order date:

  1. SELECT STREAM o.rowtime, o.productId, o.orderId, s.rowtime AS shipTime
  2. FROM Orders AS o
  3. JOIN Shipments AS s
  4. ON o.orderId = s.orderId
  5. AND s.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL '1' HOUR;
  6. rowtime | productId | orderId | shipTime
  7. ----------+-----------+---------+----------
  8. 10:17:00 | 30 | 5 | 10:55:00
  9. 10:17:05 | 10 | 6 | 10:20:00
  10. 11:02:00 | 10 | 9 | 11:58:00
  11. 11:24:11 | 10 | 12 | 11:44:00

Note that quite a few orders do not appear, because they did not shipwithin an hour. By the time the system receives order 10, timestamped 11:24:11,it has already removed orders up to and including order 8, timestamped 10:18:07,from its hash table.

As you can see, the “lock step”, tying together monotonic or quasi-monotoniccolumns of the two streams, is necessary for the system to make progress.It will refuse to execute a query if it cannot deduce a lock step.

DML

It’s not only queries that make sense against streams;it also makes sense to run DML statements (INSERT, UPDATE, DELETE,and also their rarer cousins UPSERT and REPLACE) against streams.

DML is useful because it allows you do materialize streamsor tables based on streams,and therefore save effort when values are used often.

Consider how streaming applications often consist of pipelines of queries,each query transforming input stream(s) to output stream(s).The component of a pipeline can be a view:

  1. CREATE VIEW LargeOrders AS
  2. SELECT STREAM * FROM Orders WHERE units > 1000;

or a standing INSERT statement:

  1. INSERT INTO LargeOrders
  2. SELECT STREAM * FROM Orders WHERE units > 1000;

These look similar, and in both cases the next step(s) in the pipelinecan read from LargeOrders without worrying how it was populated.There is a difference in efficiency: the INSERT statement does thesame work no matter how many consumers there are; the view does workproportional to the number of consumers, and in particular, does nowork if there are no consumers.

Other forms of DML make sense for streams. For example, the followingstanding UPSERT statement maintains a table that materializes a summaryof the last hour of orders:

  1. UPSERT INTO OrdersSummary
  2. SELECT STREAM productId,
  3. COUNT(*) OVER lastHour AS c
  4. FROM Orders
  5. WINDOW lastHour AS (
  6. PARTITION BY productId
  7. ORDER BY rowtime
  8. RANGE INTERVAL '1' HOUR PRECEDING)

Punctuation

Punctuation[5] allows a stream query to make progresseven if there are not enough values in a monotonic key to push the results out.

(I prefer the term “rowtime bounds”,and watermarks[6] are a related concept,but for these purposes, punctuation will suffice.)

If a stream has punctuation enabled then it may not be sorted but isnevertheless sortable. So, for the purposes of semantics, it is sufficientto work in terms of sorted streams.

By the way, an out-of-order stream is also sortable if it is t-sorted(i.e. every record is guaranteed to arrive within t seconds of itstimestamp) or k-sorted (i.e. every record is guaranteed to be no morethan k positions out of order). So queries on these streams can beplanned similarly to queries on streams with punctuation.

And, we often want to aggregate over attributes that are nottime-based but are nevertheless monotonic. “The number of times a teamhas shifted between winning-state and losing-state” is one suchmonotonic attribute. The system needs to figure out for itself that itis safe to aggregate over such an attribute; punctuation does not addany extra information.

I have in mind some metadata (cost metrics) for the planner:

  • Is this stream sorted on a given attribute (or attributes)?
  • Is it possible to sort the stream on a given attribute? (For finiterelations, the answer is always “yes”; for streams it depends on theexistence of punctuation, or linkage between the attributes and thesort key.)
  • What latency do we need to introduce in order to perform that sort?
  • What is the cost (in CPU, memory etc.) of performing that sort?We already have (1), in BuiltInMetadata.Collation.For (2), the answer is always “true” for finite relations.But we’ll need to implement (2), (3) and (4) for streams.

State of the stream

Not all concepts in this article have been implemented in Calcite.And others may be implemented in Calcite but not in a particular adaptersuch as SamzaSQL [3] [4].

Implemented

  • Streaming SELECT, WHERE, GROUP BY, HAVING, UNION ALL, ORDER BY
  • FLOOR and CEIL functions
  • Monotonicity
  • Streaming VALUES is disallowed

Not implemented

The following features are presented in this document as if Calcitesupports them, but in fact it does not (yet). Full support meansthat the reference implementation supports the feature (includingnegative cases) and the TCK tests it.

  • Stream-to-stream JOIN
  • Stream-to-table JOIN
  • Stream on view
  • Streaming UNION ALL with ORDER BY (merge)
  • Relational query on stream
  • Streaming windowed aggregation (sliding and cascading windows)
  • Check that STREAM in sub-queries and views is ignored
  • Check that streaming ORDER BY cannot have OFFSET or LIMIT
  • Limited history; at run time, check that there is sufficient historyto run the query.
  • Quasi-monotonicity
  • HOP and TUMBLE (and auxiliary HOP_START, HOP_END,TUMBLE_START, TUMBLE_END) functions

To do in this document

  • Re-visit whether you can stream VALUES
  • OVER clause to define window on stream
  • Consider whether to allow CUBE and ROLLUP in streaming queries,with an understanding that some levels of aggregation will never complete(because they have no monotonic expressions) and thus will never be emitted.
  • Fix the UPSERT example to remove records for products that have notoccurred in the last hour.
  • DML that outputs to multiple streams; perhaps an extension to the standardREPLACE statement.

Functions

The following functions are not present in standard SQLbut are defined in streaming SQL.

Scalar functions:

  • FLOOR(dateTime TO intervalType) rounds a date, time or timestamp valuedown to a given interval type
  • CEIL(dateTime TO intervalType) rounds a date, time or timestamp valueup to a given interval type

Partitioning functions:

  • HOP(t, emit, retain) returns a collection of group keys for a rowto be part of a hopping window
  • HOP(t, emit, retain, align) returns a collection of group keys for a rowto be part of a hopping window with a given alignment
  • TUMBLE(t, emit) returns a group key for a rowto be part of a tumbling window
  • TUMBLE(t, emit, align) returns a group key for a rowto be part of a tumbling window with a given alignment

TUMBLE(t, e) is equivalent to TUMBLE(t, e, TIME '00:00:00').

TUMBLE(t, e, a) is equivalent to HOP(t, e, e, a).

HOP(t, e, r) is equivalent to HOP(t, e, r, TIME '00:00:00').

References