ReactorQL

JetLinks封装了一套使用SQL来进行实时数据处理的工具包查看源代码ReactorQL,使用SQL处理实时数据. - 图1 (opens new window)。 通过将SQL翻译为reactorReactorQL,使用SQL处理实时数据. - 图2 (opens new window)来进行数据处理。 规则引擎中的数据转发以及可视化规则中的ReactorQL节点均使用此工具包实现。 默认情况下,SQL中的表名就是事件总线中的topic,如: select * from "/device/*/*/message/property/*", 表示订阅/device/*/*/message/property/*下的实时消息.

场景

  1. 处理实时数据
  2. 聚合计算实时数据
  3. 跨数据源联合数据处理

SQL例子

TIP

聚合处理实时数据时,必须使用interval函数或者_window函数.

当温度大于40度时,将数据转发到下一步.

  1. select
  2. this.properties.temperature temperature,
  3. this.deviceId deviceId
  4. from
  5. "/device/*/*/message/property/**" -- 订阅所有设备的所有属性消息
  6. where this.properties.temperature > 40

处理指定多个型号的设备数据

  1. select * from (
  2. select
  3. this.properties.temperature temperature,
  4. this.deviceId deviceId
  5. from
  6. "/device/T0001/*/message/property/**" -- 订阅T0001型号下的所有设备消息
  7. where this.properties.temperature > 40
  8. union all -- 实时数据只能使用 union all
  9. select
  10. this.properties.temperature temperature,
  11. this.deviceId deviceId
  12. from
  13. "/device/T0002/*/message/property/**" -- 订阅T0002型号下的所有设备消息
  14. where this.properties.temperature > 42
  15. )

计算每5分钟的温度平均值,当平均温度大于40度时,将数据转发到下一步.

  1. select
  2. avg(this.properties.temperature) temperature
  3. from
  4. "/device/*/*/message/property/**" -- 订阅所有设备的所有属性消息
  5. group by interval('5m')
  6. having temperature > 40 --having 必须使用别名.

计算每10条数据为一个窗口,每2条数据滚动的平均值.

  1. [1,2,3,4,5,6,7,8,9,10] 第一组
  2. [3,4,5,6,7,8,9,10,11,12] 第二组
  3. [5,6,7,8,9,10,11,12,13,14] 第三组
  1. select
  2. avg(this.properties.temperature) temperature
  3. from
  4. "/device/*/*/message/property/**" -- 订阅所有设备的所有属性消息
  5. group by _window(10,2)
  6. having temperature > 40 --having 必须使用别名.

聚合统计平均值,并且提取聚合结果中的数据.

  1. select
  2. rows_to_array(idList) deviceIdList, --将[{deviceId:1},{deviceId:2}] 转为[1,2]
  3. avgTemp,
  4. from
  5. (
  6. select
  7. collect_list((select this.deviceId deviceId)) idList, --聚合结果里的
  8. avg(temperature) avgTemp ,
  9. from "/device/*/*/message/property/**" ,
  10. group by interval('1m') having avgTemp > 40
  11. )

5分钟之内只取第一次。

  1. select *
  2. from "/device/*/*/message/event/fire_alarm"
  3. _window('5m'),take(1) -- -1为取最后一次

限流: 10秒内超过2次则获取最后一条数据

  1. select * from
  2. ( select * from "/device/demo-device/device001/message/event/alarm" )
  3. group by
  4. _window('10s') --时间窗口
  5. ,trace() -- 跟踪分组内行号信息
  6. ,take(-1) --取最后一条数据
  7. having
  8. row.index > 2 -- 跟踪分组内的行号
  9. and
  10. row.elapsed>1000 -- 距离上一行的时间

注意

SQL中的this表示主表当前的数据,如果存在嵌套属性的时候,必须指定this或者以表别名开头. 如: this.properties.temperature ,写成: properties.temperature是无法获取到值到.

SQL支持列表

函数/表达式用途示例说明
+加法运算temp+10对应函数: math.plus(temp,10)
-减法运算temp-10对应函数: math.sub(temp,10)
乘法运算temp10对应函数: math.mul(temp,10)
/除法运算temp/10对应函数: math.divi(temp,10)
%取模运算temp%2对应函数: math.mod(temp,2)
&位与运算val&3对应函数: bit_and(val,3)
|位或运算val|3对应函数: bit_or(val,3)
^异或运算val^3对应函数: bit_mutex(val,3)
<<位左移运算val<<2对应函数: bit_left_shift(val,2)
>>位右移运算val>>2对应函数: bit_right_shift(val,2)
||字符拼接val||’度’对应函数: concat(val,’度’)
avg平均值avg(val)聚合函数,平均值
sum合计值sum(val)聚合函数,合计值
count总数count(1)聚合函数,计数
max最大值max(val)聚合函数,最大值
min最小值min(val)聚合函数,最小值
take取指定数量数据take(5,-1) —取5个中的最后一个通常配合分组函数_window使用
>大于val > 10
<小于val < 10
=等于val = 10
!=不等于val !=10等同于: <> ,如: val <> 10
>=大于等于val>=10
<=小于等于val<=10
in在..之中val in (1,2,3)
not in不在..之中val not in (1,2,3)
like模糊匹配name like ‘a%’not like 同理
between在之间val between 1 and 10
now当前时间now()默认返回时间戳,可传入格式化参数.
date_format格式化日期date_format(now(),’yyyy-MM-dd’)
cast转换类型cast(val as boolean)支持类型: string,boolean,int,double,float,date,decimal,long
interval时间分组interval(‘10s’)分组函数,按时间分组
_window窗口分组_window(10)窗口,支持按数量和时间窗口
collect_list聚合结果转为listcollect_list((select deviceId))把聚合的的结果转为list
rows_to_array将结果集转为单元素数组rows_to_array(idList)把只有一个属性的结果集中的属性转为集合
new_map创建一个mapnew_map(‘k1’,v1,’k2’,v2)
new_array创建一个集合new_array(1,2,3,4)
math.ceil向上取整math.ceil(val)
math.floor向下取整math.floor(val)
math.round四舍五入math.round(val)
math.loglog运算math.log(val)
math.sin正弦math.sin(val)
math.asin反正弦math.asin(val)
math.sinh双曲正弦math.sinh(val)
math.cos余弦math.cos(val)
math.acos反余弦math.acos(val)
math.cosh双曲余弦math.cosh(val)
math.tan正切math.tan(val)
math.atan反正切math.atan(val)
math.tanh双曲正切math.tanh(val)
if条件取值if(a<1,’when true’,’when false’)
range范围判断,等同于between andrange(val,1,10)
median中位数median(val)中位数 (Pro)
skewness偏度特征值skewness(val)偏度特征值聚合函数 (Pro)
kurtosis峰度特征值kurtosis(val)峰度特征值聚合函数 (Pro)
variance方差variance(val)方差聚合函数 (Pro)
geo_mean几何平均数geo_mean(val)几何平均数聚合函数 (Pro)
sum_of_squ平方和sum_of_squ(val)平方和聚合函数 (Pro)
std_dev标准差std_dev(val)标准差聚合函数 (Pro)
slope斜度slope(val)使用最小二乘回归模型计算斜度,大于0为向上,小于0为向下 (Pro)
time转换时间time(‘now-1d’)使用表达式来转换时间,返回毫秒时间戳(Pro)
jsonatajsonata表达式jsonata(‘$abs(val)’)使用jsonata表达式来提取行数据(Pro)
spelspel表达式spel(‘#val’)使用spel表达式来提取行数据(Pro)
env获取配置信息env(‘key’,’默认值’)获取系统配置信息(Pro)
_window_until打开窗口直到满足条件_window_until(this.success)打开窗口直到满足条件(Pro)
_window_until_change打开窗口直到值变更_window_until_change(this.state)打开窗口直到值变更(Pro)

拓展函数

TIP

以下功能只在专业版中支持

device.properties

获取设备已保存的全部最新属性,(注意: 由于使用es存储设备数据,此数据并不是完全实时的)

  1. select
  2. device.properties(this.deviceId) props,
  3. this.properties reports
  4. from "/device/*/*/message/property/report"

TIP

device.properties(this.deviceId,'property1','property2')还可以通过参数获取指定的属性,如果未设置则获取全部属性。

device.properties.history

查询设备历史数据

  1. --聚合查询
  2. select * from device.properties.history(
  3. select avg(temperature) avgVal
  4. from "deviceId" -- from 支持: 按设备ID查询: "deviceId", 查询多个设备: device('1','2') 按产品查询: product('id')
  5. where timestamp between now()-86400000 and now()
  6. )
  1. --按时间分组
  2. select * from device.properties.history(
  3. select avg(temperature) avgVal
  4. from "deviceId"
  5. where timestamp between now()-86400000 and now()
  6. group by interval('1d')
  7. )
  1. -- 订阅实时数据,然后查询对应设备的历史数据
  2. select
  3. (
  4. select maxVal,avgVal from
  5. device.properties.history(
  6. select
  7. max(temp3) maxVal,
  8. avg(temp3) avgVal
  9. from device(t.deviceId)
  10. -- 前一天的数据
  11. where timestamp between time('now-1d') and t.timestamp
  12. )
  13. ) $this,
  14. t.properties.temp3 temp3
  15. from "/device/*/*/message/property/**" t

device.properties.latest

TIP

此功能需要开启设备最新数据存储

查询设备最新的数据

  1. select * from device.properties.latest(
  2. select
  3. temperature
  4. from "productId" --表名为产品ID
  5. where id = 'deviceId' -- id则为设备ID
  6. )

聚合查询

  1. select * from device.properties.latest(
  2. select
  3. avg(temperature) temperature
  4. from "productId" --表名为产品ID
  5. )

device.tags

获取设备标签信息

  1. select device.tags(this.deviceId) from "/device/*/*/message/property/report"

TIP

device.tags(this.deviceId,'tag1','tag2')还可以通过参数获取指定的标签,如果未设置则获取全部标签。

device.selector

选择设备,如:

  1. select dev.deviceId from "/device/*/*/message/property/report" t
  2. -- 获取和上报属性在同一个分组里,并且产品idlight-product的设备
  3. left join (
  4. select this.id deviceId from
  5. device.selector(same_group(t.deviceId),product('light-product'))
  6. ) dev

支持参数:

  1. in_gourp(‘groupId’) 在指定的设备分组中
  2. in_group_tree(‘groupId’) 在指定分组中(包含下级分组)
  3. same_group(‘deviceId’) 在指定设备的相同分组中
  4. product(‘productId’) 指定产品ID对应的设备
  5. tag(‘tag1Key’,’tag1Value’,’tag2Key’,’tag2Value’) 按指定的标签获取
  6. state(‘online’) 按指定的状态获取
  7. in_tenant(‘租户ID’) 在指定租户中的设备
  8. org(‘机构ID’) 在指定机构中

mqtt.client.publish

推送消息到mqtt客户端.

  1. select
  2. mqtt.client.publish(
  3. 'networkId' -- 第一个参数: 网络组件中mqtt客户端的ID
  4. ,'topic' -- 第二个参数: topic
  5. ,'JSON' -- 第三个参数: 消息类型: JSON,STRING,BINARY,HEX
  6. ,this -- 消息体,会根据消息类型转为不同格式的消息
  7. ) publishSuccess -- 返回推送结果 true false
  8. from "/rule-engine/device/alarm/sensor-1/**"

mqtt.client.subscribe

从mqtt客户端订阅消息

  1. select
  2. t.did deviceId,
  3. t.l location,
  4. t.v value
  5. from mqtt.client.subscribe(
  6. 'networkId' -- 第一个参数: 网络组件中mqtt客户端的ID
  7. ,'JSON' -- 第二个参数: 消息类型: JSON,STRING,BINARY,HEX
  8. ,'topic' -- topic
  9. ,'topic2' -- topic2
  10. ) t
  11. where t.v > 30 -- 过滤条件

http.request

发起http请求

  1. select
  2. http.request(
  3. 'networkId' -- 第一个参数: 网络组件中http客户端的ID
  4. -- 下面的参数两两对应组成键值对,注意: 使用逗号(,)分割.
  5. ,'url','https://www.baidu.com'
  6. ,'method','POST'
  7. ,'contentType','application/json'
  8. -- 请求头
  9. ,'headers',new_map('key1','value1','key2','value2')
  10. -- body参数在contentTypeapplication/json时生效
  11. ,'body',new_map('key1','value1','key2','value2')
  12. -- requestParam参数在contentType不为json时生效,相当于:application/x-www-form-urlencoded的处理方式
  13. ,'requestParam',new_map('key1','value1','key2','value2')
  14. -- 直接拼接到url上的参数 https://www.baidu.com?key1=value1&key2=value2
  15. ,'queryParameters',new_map('key1','value1','key2','value2')
  16. ) response
  17. from dual

message.subscribe

订阅消息网关中的消息

  1. select
  2. t.topic topic,
  3. t.message.deviceId deviceId,
  4. t.message.headers.productId productId,
  5. t.message.timestamp ts
  6. from message.subscribe(
  7. 'false' -- 是否订阅来自集群的消息(可选参数,默认为false)
  8. ,'/device/*/*/online'
  9. ) t

message.publish

推送消息到消息网关

  1. select
  2. message.publish(
  3. '/device-online/'||t.message.deviceId -- 推送到此topic
  4. ,t.message -- 消息内容
  5. ) subscribeNumber -- 返回有多少订阅者收到了消息
  6. from message.subscribe('/device/*/*/online') t