共计 10154 个字符,预计需要花费 26 分钟才能阅读完成。
TDengine 3.0.4.0 公布了一个重要个性:反对用 Python 语言编写的自定义函数(UDF)。这个个性极大节俭了 UDF 开发的工夫老本。作为时序大数据处理平台,不反对 Python UDF 显然是不残缺的。UDF 在实现本人业务中特有的逻辑时十分有用,比方量化交易场景计算自研的交易信号。本文内容由浅入深包含 4 个示例程序:
- 定义一个只接管一个整数的标量函数:输出 n,输入 ln(n^2 + 1)。
- 定义一个接管 n 个整数的标量函数,输出(x1, x2, …, xn), 输入每个值和它们的序号的乘积的和:x1 + 2 x2 + … + n xn。
- 定义一个标量函数,输出一个工夫戳,输入间隔这个工夫最近的下一个周日。实现这个函数要用到第三方库 moment。咱们在这个示例中解说应用第三方库的注意事项。
- 定义一个聚合函数,计算某一列最大值和最小值的差, 也就是实现 TDengien 内置的 spread 函数。
同时也蕴含大量实用的 debug 技巧。
本文假如你用的是 Linux 零碎,且已装置好了 TDengine 3.0.4.0+ 和 Python 3.x。
示例一:最简略的 UDF
编写一个只接管一个整数的 UDF 函数:输出 n,输入 ln(n^2 + 1)。
首先编写一个 Python 文件,存在零碎某个目录,比方 /root/udf/myfun.py 内容如下:
from math import log
def init():
pass
def destroy():
pass
def process(block):
rows, _ = block.shape()
return [log(block.data(i, 0) ** 2 + 1) for i in range(rows)]
这个文件蕴含 3 个函数,init 和 destroy 都是空函数,它们是 UDF 的生命周期函数,即便什么都不做也要定义。最要害的是 process 函数,它承受一个数据块,这个数据块对象有两个办法:
- shape() 返回数据块的行数和列数
- data(i, j) 返回 i 行 j 列的数据
标量函数的 process 办法传入的数据块有多少行,就须要返回多少个数据。上述代码中咱们疏忽的列数,因为咱们只想对每行的第一个数做计算。
接下来咱们在时序数据库(Time Series Database)TDengine 中创立对应的 UDF 函数,执行上面语句:
create function myfun as '/root/udf/myfun.py' outputtype double language 'Python'
taos> create function myfun as '/root/udf/myfun.py' outputtype double language 'Python';
Create OK, 0 row(s) affected (0.005202s)
看起来很顺利,接下来 show 一下零碎中所有的自定义函数,确认创立胜利:
taos> show functions;
name |
=================================
myfun |
Query OK, 1 row(s) in set (0.005767s)
接下来就来测试一下这个函数,测试之前先执行上面的 SQL 命令,制作些测试数据:
create database test;
create table t(ts timestamp, v1 int, v2 int, v3 int);
insert into t values('2023-05-01 12:13:14', 1, 2, 3);
insert into t values('2023-05-03 08:09:10', 2, 3, 4);
insert into t values('2023-05-10 07:06:05', 3, 4, 5);
测试 myfun 函数:
taos> select myfun(v1, v2) from t;
DB error: udf function execution failure (0.011088s)
可怜的是执行失败了,什么起因呢?
查看 udfd 过程的日志: /var/log/taos/udfd.log 发现以下错误信息:
05/24 22:46:28.733545 01665799 UDF ERROR can not load library libtaospyudf.so. error: operation not permitted
05/24 22:46:28.733561 01665799 UDF ERROR can not load python plugin. lib path libtaospyudf.so
谬误很明确:没有加载到 Python 插件 libtaospyudf.so,看官网文档原来是要先装置 taospyudf 这个 Python 包。于是:pip3 install taospyudf
装置过程会编译 C++ 源码,因而零碎上要有 cmake 和 gcc。编译生成的 libtaospyudf.so 文件主动会被复制到 /usr/local/lib/ 目录,因而如果是非 root 用户,装置时需加 sudo。装置完能够查看这个目录是否有了这个文件:
root@slave11 ~/udf $ ls -l /usr/local/lib/libtaos*
-rw-r--r-- 1 root root 671344 May 24 22:54 /usr/local/lib/libtaospyudf.so
这时再去执行 SQL 测试 UDF,会发现报同样的谬误,起因是新装置的共享库还未失效,还需执行命令:ldconfig
此时再去测试 UDF,终于胜利了:
taos> select myfun(v1) from t;
myfun(v1) |
============================
0.693147181 |
1.609437912 |
2.302585093 |
至此,咱们实现了第一个 UDF 😊,并学会了简略的 debug 办法。
示例一改良:异样解决
下面的 myfun 尽管测试测试通过了,然而有两个毛病:
- 这个标量函数只承受 1 列数据作为输出,如果用户传入了多列也不会抛异样。咱们冀望改成:如果用户输出多列,则揭示用户输出谬误,这个函数只接管 1 个参数。
taos> select myfun(v1, v2) from t;
myfun(v1, v2) |
============================
0.693147181 |
1.609437912 |
2.302585093 |
- 没有解决 null 值,如果用户输出了 null 值则会抛异样终止执行。咱们冀望改成:如果输出是 null,则输入也是 null,不影响后续执行。
因而 process 函数改良如下:
def process(block):
rows, cols = block.shape()
if cols > 1:
raise Exception(f"require 1 parameter but given {cols}")
return [None if block.data(i, 0) is None else log(block.data(i, 0) ** 2 + 1) for i in range(rows)]
而后执行上面的语句更新已有的 UDF:
create or replace function myfun as '/root/udf/myfun.py' outputtype double language 'Python';
再传入 myfun 两个参数,就会执行失败了,
taos> select myfun(v1, v2) from t;
DB error: udf function execution failure (0.014643s)
但遗憾的是咱们自定义的异样信息没有展现给用户,而是在插件的日志文件 /var/log/taos/taospyudf.log 中:
2023-05-24 23:21:06.790 ERROR [1666188] [doPyUdfScalarProc@507] call pyUdfScalar proc function. context 0x7faade26d180. error: Exception: require 1 parameter but given 2
At:
/var/lib/taos//.udf/myfun_3_1884e1281d9.py(12): process
至此,咱们学会了如何更新 UDF,并查看 UDF 输入的谬误日志。
(注:如果 UDF 更新后未失效,能够重启 taosd 试试,TDengine 3.0.5.0 及当前的版本会确保不重启 UDF 更新就能失效)
示例二:接管 n 个参数的 UDF
编写一个 UDF:输出(x1, x2, …, xn), 输入每个值和它们的序号的乘积的和:1 x1 + 2 x2 + … + n * xn。如果 x1 至 xn 中蕴含 null,则后果为 null。
这个示例与示例一的区别是,能够承受任意多列作为输出,且要解决每一列的值。编写 UDF 文件 /root/udf/nsum.py:
def init():
pass
def destroy():
pass
def process(block):
rows, cols = block.shape()
result = []
for i in range(rows):
total = 0
for j in range(cols):
v = block.data(i, j)
if v is None:
total = None
break
total += (j + 1) * block.data(i, j)
result.append(total)
return result
创立 UDF:
create function nsum as '/root/udf/nsum.py' outputtype double language 'Python';
测试:
taos> insert into t values('2023-05-25 09:09:15', 6, null, 8);
Insert OK, 1 row(s) affected (0.003675s)
taos> select ts, v1, v2, v3, nsum(v1, v2, v3) from t;
ts | v1 | v2 | v3 | nsum(v1, v2, v3) |
================================================================================================
2023-05-01 12:13:14.000 | 1 | 2 | 3 | 14.000000000 |
2023-05-03 08:09:10.000 | 2 | 3 | 4 | 20.000000000 |
2023-05-10 07:06:05.000 | 3 | 4 | 5 | 26.000000000 |
2023-05-25 09:09:15.000 | 6 | NULL | 8 | NULL |
Query OK, 4 row(s) in set (0.010653s)
示例三:应用第三方库
编写一个 UDF,输出一个工夫戳,输入间隔这个工夫最近的下一个周日。比方明天是 2023-05-25,则下一个周日是 2023-05-28。
实现这个函数要用到第三方库 momen。先装置这个库:pip3 install moment
而后编写 UDF 文件 /root/udf/nextsunday.py
import moment
def init():
pass
def destroy():
pass
def process(block):
rows, cols = block.shape()
if cols > 1:
raise Exception("require only 1 parameter")
if not type(block.data(0, 0)) is int:
raise Exception("type error")
return [moment.unix(block.data(i, 0)).replace(weekday=7).format('YYYY-MM-DD')
for i in range(rows)]
UDF 框架会将 TDengine 的 timestamp 类型映射为 Python 的 int 类型,所以这个函数只承受一个示意毫秒数的整数。process 办法先做参数查看,而后用 moment 包替换工夫的星期为星期日,最初格式化输入。输入的字符串长度是固定的 10 个字符长,因而能够这样创立 UDF 函数:
create function nextsunday as '/root/udf/nextsunday.py' outputtype binary(10) language 'Python';
此时测试函数,如果你是用 systemctl 启动的 taosd,必定会遇到谬误:
taos> select ts, nextsunday(ts) from t;
DB error: udf function execution failure (1.123615s)
tail -20 taospyudf.log
2023-05-25 11:42:34.541 ERROR [1679419] [PyUdf::PyUdf@217] py udf load module failure. error ModuleNotFoundError: No module named 'moment'
这是因为“moment”所在位置不在 python udf 插件默认的库搜寻门路中。怎么确认这一点呢?通过以下命令搜寻 taospyudf.log:
grep 'sys path' taospyudf.log | tail -1
2023-05-25 10:58:48.554 INFO [1679419] [doPyOpen@592] python sys path: ['','/lib/python38.zip','/lib/python3.8','/lib/python3.8/lib-dynload','/lib/python3/dist-packages','/var/lib/taos//.udf']
发现 python udf 插件默认搜寻的第三方库装置门路是:/lib/python3/dist-packages,而 moment 默认装置到了 /usr/local/lib/python3.8/dist-packages。上面咱们批改 python udf 插件默认的库搜寻门路,把以后 python 解释器默认应用的库门路全副加进去。
先关上 python3 命令行,查看以后的 sys.path
>>> import sys
>>> ":".join(sys.path)
'/usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/usr/local/lib/python3.8/dist-packages:/usr/lib/python3/dist-packages'
复制下面脚本的输入的字符串,而后编辑 /var/taos/taos.cfg 退出以下配置:
UdfdLdLibPath /usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/usr/local/lib/python3.8/dist-packages:/usr/lib/python3/dist-packages
保留后执行 systemctl restart taosd, 再测试就不报错了:
taos> select ts, nextsunday(ts) from t;
ts | nextsunday(ts) |
===========================================
2023-05-01 12:13:14.000 | 2023-05-07 |
2023-05-03 08:09:10.000 | 2023-05-07 |
2023-05-10 07:06:05.000 | 2023-05-14 |
2023-05-25 09:09:15.000 | 2023-05-28 |
Query OK, 4 row(s) in set (1.011474s)
示例四:定义聚合函数
编写一个聚合函数,计算某一列最大值和最小值的差。
聚合函数与标量函数的区别是:标量函数是多行输出对应多个输入,聚合函数是多行输出对应一个输入。聚合函数的执行过程有点像经典的 map-reduce 框架的执行过程,框架把数据分成若干块,每个 mapper 解决一个块,reducer 再把 mapper 的后果做聚合。不一样的中央在于,对于 TDengine Python UDF 中的 reduce 函数既有 map 的性能又有 reduce 的性能。reduce 函数承受两个参数:一个是本人要解决的数据,一个是别的工作执行 reduce 函数的处理结果。如上面的示例 /root/udf/myspread.py:
import io
import math
import pickle
LOG_FILE: io.TextIOBase = None
def init():
global LOG_FILE
LOG_FILE = open("/var/log/taos/spread.log", "wt")
log("init function myspead success")
def log(o):
LOG_FILE.write(str(o) + '\n')
def destroy():
log("close log file: spread.log")
LOG_FILE.close()
def start():
return pickle.dumps((-math.inf, math.inf))
def reduce(block, buf):
max_number, min_number = pickle.loads(buf)
log(f"initial max_number={max_number}, min_number={min_number}")
rows, _ = block.shape()
for i in range(rows):
v = block.data(i, 0)
if v > max_number:
log(f"max_number={v}")
max_number = v
if v < min_number:
log(f"min_number={v}")
min_number = v
return pickle.dumps((max_number, min_number))
def finish(buf):
max_number, min_number = pickle.loads(buf)
return max_number - min_number
在这个示例中咱们不光定义了一个聚合函数,还增加记录执行日志的性能,解说如下:
- init 函数不再是空函数,而是关上了一个文件用于写执行日志
- log 函数是记录日志的工具,主动将传入的对象转成字符串,加换行符输入
- destroy 函数用来在执行完结敞开文件
- start 返回了初始的 buffer,用来存聚合函数的两头后果,咱们把最大值初始化为负无穷大,最小值初始化为正无穷大
- reduce 解决每个数据块并聚合后果
- finish 函数将最终的 buffer 转换成最终的输入
执行上面的 SQL 语句创立对应的 UDF:
create or replace aggregate function myspread as '/root/udf/myspread.py' outputtype double bufsize 128 language 'Python';
这个 SQL 语句与创立标量函数的 SQL 语句有两个重要区别:
- 减少了 aggregate 关键字
- 减少了 bufsize 关键字,用来指定存储两头后果的内存大小,这个数值能够大于理论应用的数值。本例两头后果是两个浮点数组成的 tuple,序列化后理论占用大小只有 32 个字节,但指定的 bufsize 是 128,能够用 python 命令行打印理论占用的字节数
>>> len(pickle.dumps((12345.6789, 23456789.9877)))
32
测试这个函数,能够看到 myspread 的输入后果和内置的 spread 函数的输入后果是统一的。
taos> select myspread(v1) from t;
myspread(v1) |
============================
5.000000000 |
Query OK, 1 row(s) in set (0.013486s)
taos> select spread(v1) from t;
spread(v1) |
============================
5.000000000 |
Query OK, 1 row(s) in set (0.005501s)
最初,查看咱们本人打印的执行日志,从日志能够看出,reduce 函数被执行了 3 次。执行过程中 max 值被更新了 4 次,min 值只被更新 1 次。
root@slave11 /var/log/taos $ cat spread.log
init function myspead success
initial max_number=-inf, min_number=inf
max_number=1
min_number=1
initial max_number=1, min_number=1
max_number=2
max_number=3
initial max_number=3, min_number=1
max_number=6
close log file: spread.log
通过这个示例,咱们学会了如何定义聚合函数,并打印自定义的日志信息。
要点总结
- 创立标量函数的语法
CREATE FUNCTION function_name AS library_path OUTPUTTYPE output_type LANGUAGE 'Python';
OUTPUTTYPE 对应的是 TDengine 的数据类型,如 TIMESTAMP,BIGINT,VARCHAR(64),类型映射关系见官网文档:https://docs.taosdata.com/develop/udf/。
- 创立聚合函数的语法
CREATE AGGREGATE FUNCTION function_name library_path OUTPUTTYPE output_type LANGUAGE 'Python';
- 更新 UDF 的语法
更新标量函数
CREATE OR REPLACE FUNCTION function_name AS OUTPUTTYPE int LANGUAGE 'Python';
更新聚合函数
CREATE OR REPLACE AGGREGATE FUNCTION function_name AS OUTPUTTYPE BUFSIZE buf_size int LANGUAGE 'Python';
留神:如果加了“AGGREGATE”关键字,更新之后函数将被当作聚合函数,无论之前是什么类型的函数。相同,如果没有加“AGGREGATE”关键字,更新之后的函数将被当作标量函数,无论之前是什么类型的函数。
- 同名的 UDF 每更新一次,版本号会减少 1。用
select * from ins_functions \G;
可查看 UDF 的残缺信息,包含 UDF 的源码。 - 查看和删除已有的 UDF
SHOW functions;
DROP FUNCTION function_name;
- 装置 taospyudf 动静库
sudo pip3 install taospyudf
装置过程会从源码编译出共享库 libtaospyudf.so,因而零碎上要有 cmake 和 gcc,编译后这个库会被装置到 /usr/local/lib。装置完别忘了执行命令 ldconfig 更新零碎动态链接库。 -
调试 Python UDF 的两个重要日志文件
- /var/log/taos/udfdlog.* 这个文件是 UDF 框架的日志。框架负责加载各语言 UDF 的插件,执行 UDF 的生命周期函数
- /var/log/taos/taospyudf.log 这个文件是 libtaospyudf.so 输入的日志,每个文件最大 50M,最多保留 5 个。
- 定义标量函数最重要是要实现 process 函数,同时必须定义 init 和 destroy 函数即便什么都不做。
def init():
pass
def process(block: datablock) -> tuple[output_type]:
rows, cols = block.shape()
result = []
for i in range(rows):
for j in range(cols):
cell_data = block.data(i, j)
# your logic here
return result
def destroy():
pass
- 定义聚合函数最重要是要实现 start, reduce 和 finish,同样必须定义 init 和 destroy 函数。
def init():
def destroy():
def start() -> bytes:
def reduce(inputs: datablock, buf: bytes) -> bytes
def finish(buf: bytes) -> output_type:
start 生成最后后果 buffer,而后输出数据会被分为多个行数据块,对每个数据块 inputs 和以后两头后果 buf 调用 reduce,失去新的两头后果,最初再调用 finish 从两头后果 buf 产生最终输入。
- 应用第三方 python 库。
应用第三方库须要查看这个库是否装置到了 Python UDF 插件默认的库搜寻门路,如果没有须要批改 taos.cfg,增加 UdfdLdLibPath 配置,库门路用冒号分隔。 - UDF 内无奈通过 print 函数输入日志,须要本人写文件或用 python 内置的 logging 库写文件。