UDF(用户定义函数)

在有些应用场景中,应用逻辑需要的查询无法直接使用系统内置的函数来表示。利用 UDF(User Defined Function) 功能,TDengine 可以插入用户编写的处理代码并在查询中使用它们,就能够很方便地解决特殊应用场景中的使用需求。 UDF 通常以数据表中的一列数据做为输入,同时支持以嵌套子查询的结果作为输入。

TDengine 支持通过 C/C++ 语言进行 UDF 定义。接下来结合示例讲解 UDF 的使用方法。

用户可以通过 UDF 实现两类函数:标量函数和聚合函数。标量函数对每行数据输出一个值,如求绝对值 abs,正弦函数 sin,字符串拼接函数 concat 等。聚合函数对多行数据进行输出一个值,如求平均数 avg,最大值 max 等。

实现 UDF 时,需要实现规定的接口函数

  • 标量函数需要实现标量接口函数 scalarfn 。
  • 聚合函数需要实现聚合接口函数 aggfn_start , aggfn , aggfn_finish。
  • 如果需要初始化,实现 udf_init;如果需要清理工作,实现udf_destroy。

接口函数的名称是 UDF 名称,或者是 UDF 名称和特定后缀(_start, _finish, _init, _destroy)的连接。列表中的scalarfn,aggfn, udf需要替换成udf函数名。

实现标量函数

标量函数实现模板如下

  1. #include "taos.h"
  2. #include "taoserror.h"
  3. #include "taosudf.h"
  4. // initialization function. if no initialization, we can skip definition of it. The initialization function shall be concatenation of the udf name and _init suffix
  5. // @return error number defined in taoserror.h
  6. int32_t scalarfn_init() {
  7. // initialization.
  8. return TSDB_CODE_SUCCESS;
  9. }
  10. // scalar function main computation function
  11. // @param inputDataBlock, input data block composed of multiple columns with each column defined by SUdfColumn
  12. // @param resultColumn, output column
  13. // @return error number defined in taoserror.h
  14. int32_t scalarfn(SUdfDataBlock* inputDataBlock, SUdfColumn* resultColumn) {
  15. // read data from inputDataBlock and process, then output to resultColumn.
  16. return TSDB_CODE_SUCCESS;
  17. }
  18. // cleanup function. if no cleanup related processing, we can skip definition of it. The destroy function shall be concatenation of the udf name and _destroy suffix.
  19. // @return error number defined in taoserror.h
  20. int32_t scalarfn_destroy() {
  21. // clean up
  22. return TSDB_CODE_SUCCESS;
  23. }

scalarfn 为函数名的占位符,需要替换成函数名,如bit_and。

实现聚合函数

聚合函数的实现模板如下

  1. #include "taos.h"
  2. #include "taoserror.h"
  3. #include "taosudf.h"
  4. // Initialization function. if no initialization, we can skip definition of it. The initialization function shall be concatenation of the udf name and _init suffix
  5. // @return error number defined in taoserror.h
  6. int32_t aggfn_init() {
  7. // initialization.
  8. return TSDB_CODE_SUCCESS;
  9. }
  10. // aggregate start function. The intermediate value or the state(@interBuf) is initialized in this function. The function name shall be concatenation of udf name and _start suffix
  11. // @param interbuf intermediate value to initialize
  12. // @return error number defined in taoserror.h
  13. int32_t aggfn_start(SUdfInterBuf* interBuf) {
  14. // initialize intermediate value in interBuf
  15. return TSDB_CODE_SUCCESS;
  16. }
  17. // aggregate reduce function. This function aggregate old state(@interbuf) and one data bock(inputBlock) and output a new state(@newInterBuf).
  18. // @param inputBlock input data block
  19. // @param interBuf old state
  20. // @param newInterBuf new state
  21. // @return error number defined in taoserror.h
  22. int32_t aggfn(SUdfDataBlock* inputBlock, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
  23. // read from inputBlock and interBuf and output to newInterBuf
  24. return TSDB_CODE_SUCCESS;
  25. }
  26. // aggregate function finish function. This function transforms the intermediate value(@interBuf) into the final output(@result). The function name must be concatenation of aggfn and _finish suffix.
  27. // @interBuf : intermediate value
  28. // @result: final result
  29. // @return error number defined in taoserror.h
  30. int32_t int32_t aggfn_finish(SUdfInterBuf* interBuf, SUdfInterBuf *result) {
  31. // read data from inputDataBlock and process, then output to result
  32. return TSDB_CODE_SUCCESS;
  33. }
  34. // cleanup function. if no cleanup related processing, we can skip definition of it. The destroy function shall be concatenation of the udf name and _destroy suffix.
  35. // @return error number defined in taoserror.h
  36. int32_t aggfn_destroy() {
  37. // clean up
  38. return TSDB_CODE_SUCCESS;
  39. }

aggfn为函数名的占位符,需要修改为自己的函数名,如l2norm。

接口函数定义

接口函数的名称是 udf 名称,或者是 udf 名称和特定后缀(_start, _finish, _init, _destroy)的连接。以下描述中函数名称中的 scalarfn,aggfn, udf 需要替换成udf函数名。

接口函数返回值表示是否成功。如果返回值是 TSDB_CODE_SUCCESS,表示操作成功,否则返回的是错误代码。错误代码定义在 taoserror.h,和 taos.h 中的API共享错误码的定义。例如, TSDB_CODE_UDF_INVALID_INPUT 表示输入无效输入。TSDB_CODE_OUT_OF_MEMORY 表示内存不足。

接口函数参数类型见数据结构定义。

标量接口函数

int32_t scalarfn(SUdfDataBlock* inputDataBlock, SUdfColumn *resultColumn)

其中 scalarFn 是函数名的占位符。这个函数对数据块进行标量计算,通过设置resultColumn结构体中的变量设置值

参数的具体含义是:

  • inputDataBlock: 输入的数据块
  • resultColumn: 输出列

聚合接口函数

int32_t aggfn_start(SUdfInterBuf *interBuf)

int32_t aggfn(SUdfDataBlock* inputBlock, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf)

int32_t aggfn_finish(SUdfInterBuf* interBuf, SUdfInterBuf *result)

其中 aggfn 是函数名的占位符。首先调用aggfn_start生成结果buffer,然后相关的数据会被分为多个行数据块,对每个数据块调用 aggfn 用数据块更新中间结果,最后再调用 aggfn_finish 从中间结果产生最终结果,最终结果只能含 0 或 1 条结果数据。

参数的具体含义是:

  • interBuf:中间结果 buffer。
  • inputBlock:输入的数据块。
  • newInterBuf:新的中间结果buffer。
  • result:最终结果。

UDF 初始化和销毁

int32_t udf_init()

int32_t udf_destroy()

其中 udf 是函数名的占位符。udf_init 完成初始化工作。 udf_destroy 完成清理工作。如果没有初始化工作,无需定义udf_init函数。如果没有清理工作,无需定义udf_destroy函数。

UDF 数据结构

  1. typedef struct SUdfColumnMeta {
  2. int16_t type;
  3. int32_t bytes;
  4. uint8_t precision;
  5. uint8_t scale;
  6. } SUdfColumnMeta;
  7. typedef struct SUdfColumnData {
  8. int32_t numOfRows;
  9. int32_t rowsAlloc;
  10. union {
  11. struct {
  12. int32_t nullBitmapLen;
  13. char *nullBitmap;
  14. int32_t dataLen;
  15. char *data;
  16. } fixLenCol;
  17. struct {
  18. int32_t varOffsetsLen;
  19. int32_t *varOffsets;
  20. int32_t payloadLen;
  21. char *payload;
  22. int32_t payloadAllocLen;
  23. } varLenCol;
  24. };
  25. } SUdfColumnData;
  26. typedef struct SUdfColumn {
  27. SUdfColumnMeta colMeta;
  28. bool hasNull;
  29. SUdfColumnData colData;
  30. } SUdfColumn;
  31. typedef struct SUdfDataBlock {
  32. int32_t numOfRows;
  33. int32_t numOfCols;
  34. SUdfColumn **udfCols;
  35. } SUdfDataBlock;
  36. typedef struct SUdfInterBuf {
  37. int32_t bufLen;
  38. char* buf;
  39. int8_t numOfResult; //zero or one
  40. } SUdfInterBuf;

数据结构说明如下:

  • SUdfDataBlock 数据块包含行数 numOfRows 和列数 numCols。udfCols[i] (0 <= i <= numCols-1)表示每一列数据,类型为SUdfColumn*。
  • SUdfColumn 包含列的数据类型定义 colMeta 和列的数据 colData。
  • SUdfColumnMeta 成员定义同 taos.h 数据类型定义。
  • SUdfColumnData 数据可以变长,varLenCol 定义变长数据,fixLenCol 定义定长数据。
  • SUdfInterBuf 定义中间结构 buffer,以及 buffer 中结果个数 numOfResult

为了更好的操作以上数据结构,提供了一些便利函数,定义在 taosudf.h。

编译 UDF

用户定义函数的 C 语言源代码无法直接被 TDengine 系统使用,而是需要先编译为 动态链接库,之后才能载入 TDengine 系统。

例如,按照上一章节描述的规则准备好了用户定义函数的源代码 bit_and.c,以 Linux 为例可以执行如下指令编译得到动态链接库文件:

  1. gcc -g -O0 -fPIC -shared bit_and.c -o libbitand.so

这样就准备好了动态链接库 libbitand.so 文件,可以供后文创建 UDF 时使用了。为了保证可靠的系统运行,编译器 GCC 推荐使用 7.5 及以上版本。

管理和使用UDF

编译好的UDF,还需要将其加入到系统才能被正常的SQL调用。关于如何管理和使用UDF,参见UDF使用说明

示例代码

标量函数示例 bit_and

bit_add 实现多列的按位与功能。如果只有一列,返回这一列。bit_add 忽略空值。

bit_and.c

  1. #include <string.h>
  2. #include <stdlib.h>
  3. #include <stdio.h>
  4. #include "taosudf.h"
  5. DLL_EXPORT int32_t bit_and_init() {
  6. return 0;
  7. }
  8. DLL_EXPORT int32_t bit_and_destroy() {
  9. return 0;
  10. }
  11. DLL_EXPORT int32_t bit_and(SUdfDataBlock* block, SUdfColumn *resultCol) {
  12. if (block->numOfCols < 2) {
  13. return TSDB_CODE_UDF_INVALID_INPUT;
  14. }
  15. for (int32_t i = 0; i < block->numOfCols; ++i) {
  16. SUdfColumn* col = block->udfCols[i];
  17. if (!(col->colMeta.type == TSDB_DATA_TYPE_INT)) {
  18. return TSDB_CODE_UDF_INVALID_INPUT;
  19. }
  20. }
  21. SUdfColumnMeta *meta = &resultCol->colMeta;
  22. meta->bytes = 4;
  23. meta->type = TSDB_DATA_TYPE_INT;
  24. meta->scale = 0;
  25. meta->precision = 0;
  26. SUdfColumnData *resultData = &resultCol->colData;
  27. resultData->numOfRows = block->numOfRows;
  28. for (int32_t i = 0; i < resultData->numOfRows; ++i) {
  29. if (udfColDataIsNull(block->udfCols[0], i)) {
  30. udfColDataSetNull(resultCol, i);
  31. continue;
  32. }
  33. int32_t result = *(int32_t*)udfColDataGetData(block->udfCols[0], i);
  34. int j = 1;
  35. for (; j < block->numOfCols; ++j) {
  36. if (udfColDataIsNull(block->udfCols[j], i)) {
  37. udfColDataSetNull(resultCol, i);
  38. break;
  39. }
  40. char* colData = udfColDataGetData(block->udfCols[j], i);
  41. result &= *(int32_t*)colData;
  42. }
  43. if (j == block->numOfCols) {
  44. udfColDataSet(resultCol, i, (char*)&result, false);
  45. }
  46. }
  47. return TSDB_CODE_SUCCESS;
  48. }

查看源码

聚合函数示例 l2norm

l2norm 实现了输入列的所有数据的二阶范数,即对每个数据先平方,再累加求和,最后开方。

l2norm.c

  1. #include <string.h>
  2. #include <stdlib.h>
  3. #include <stdio.h>
  4. #include <math.h>
  5. #include "taosudf.h"
  6. DLL_EXPORT int32_t l2norm_init() {
  7. return 0;
  8. }
  9. DLL_EXPORT int32_t l2norm_destroy() {
  10. return 0;
  11. }
  12. DLL_EXPORT int32_t l2norm_start(SUdfInterBuf *buf) {
  13. *(int64_t*)(buf->buf) = 0;
  14. buf->bufLen = sizeof(double);
  15. buf->numOfResult = 0;
  16. return 0;
  17. }
  18. DLL_EXPORT int32_t l2norm(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
  19. double sumSquares = *(double*)interBuf->buf;
  20. int8_t numNotNull = 0;
  21. for (int32_t i = 0; i < block->numOfCols; ++i) {
  22. SUdfColumn* col = block->udfCols[i];
  23. if (!(col->colMeta.type == TSDB_DATA_TYPE_INT ||
  24. col->colMeta.type == TSDB_DATA_TYPE_DOUBLE)) {
  25. return TSDB_CODE_UDF_INVALID_INPUT;
  26. }
  27. }
  28. for (int32_t i = 0; i < block->numOfCols; ++i) {
  29. for (int32_t j = 0; j < block->numOfRows; ++j) {
  30. SUdfColumn* col = block->udfCols[i];
  31. if (udfColDataIsNull(col, j)) {
  32. continue;
  33. }
  34. switch (col->colMeta.type) {
  35. case TSDB_DATA_TYPE_INT: {
  36. char* cell = udfColDataGetData(col, j);
  37. int32_t num = *(int32_t*)cell;
  38. sumSquares += (double)num * num;
  39. break;
  40. }
  41. case TSDB_DATA_TYPE_DOUBLE: {
  42. char* cell = udfColDataGetData(col, j);
  43. double num = *(double*)cell;
  44. sumSquares += num * num;
  45. break;
  46. }
  47. default:
  48. break;
  49. }
  50. ++numNotNull;
  51. }
  52. }
  53. *(double*)(newInterBuf->buf) = sumSquares;
  54. newInterBuf->bufLen = sizeof(double);
  55. if (interBuf->numOfResult == 0 && numNotNull == 0) {
  56. newInterBuf->numOfResult = 0;
  57. } else {
  58. newInterBuf->numOfResult = 1;
  59. }
  60. return 0;
  61. }
  62. DLL_EXPORT int32_t l2norm_finish(SUdfInterBuf* buf, SUdfInterBuf *resultData) {
  63. if (buf->numOfResult == 0) {
  64. resultData->numOfResult = 0;
  65. return 0;
  66. }
  67. double sumSquares = *(double*)(buf->buf);
  68. *(double*)(resultData->buf) = sqrt(sumSquares);
  69. resultData->bufLen = sizeof(double);
  70. resultData->numOfResult = 1;
  71. return 0;
  72. }

查看源码