乐趣区

关于时序数据库:TDengine-3040-重要特性之-Python-UDF-实战分享

TDengine 3.0.4.0 公布了一个重要个性:反对用 Python 语言编写的自定义函数(UDF)。这个个性极大节俭了 UDF 开发的工夫老本。作为时序大数据处理平台,不反对 Python UDF 显然是不残缺的。UDF 在实现本人业务中特有的逻辑时十分有用,比方量化交易场景计算自研的交易信号。本文内容由浅入深包含 4 个示例程序:

  1. 定义一个只接管一个整数的标量函数:输出 n,输入 ln(n^2 + 1)。
  2. 定义一个接管 n 个整数的标量函数,输出(x1, x2, …, xn), 输入每个值和它们的序号的乘积的和:x1 + 2 x2 + … + n xn。
  3. 定义一个标量函数,输出一个工夫戳,输入间隔这个工夫最近的下一个周日。实现这个函数要用到第三方库 moment。咱们在这个示例中解说应用第三方库的注意事项。
  4. 定义一个聚合函数,计算某一列最大值和最小值的差, 也就是实现 TDengien 内置的 spread 函数。
    同时也蕴含大量实用的 debug 技巧。
    本文假如你用的是 Linux 零碎,且已装置好了 TDengine 3.0.4.0+ 和 Python 3.x。
    示例一:最简略的 UDF
    编写一个只接管一个整数的 UDF 函数:输出 n,输入 ln(n^2 + 1)。
    首先编写一个 Python 文件,存在零碎某个目录,比方 /root/udf/myfun.py 内容如下:
from math import log

def init():
    pass

def destroy():
    pass

def process(block):
    rows, _ = block.shape()
    return [log(block.data(i, 0) ** 2 + 1) for i in range(rows)]

这个文件蕴含 3 个函数,init 和 destroy 都是空函数,它们是 UDF 的生命周期函数,即便什么都不做也要定义。最要害的是 process 函数,它承受一个数据块,这个数据块对象有两个办法:

  1. shape() 返回数据块的行数和列数
  2. data(i, j) 返回 i 行 j 列的数据
    标量函数的 process 办法传入的数据块有多少行,就须要返回多少个数据。上述代码中咱们疏忽的列数,因为咱们只想对每行的第一个数做计算。
    接下来咱们在时序数据库(Time Series Database)TDengine 中创立对应的 UDF 函数,执行上面语句:
create function myfun as '/root/udf/myfun.py' outputtype double language 'Python'
 taos> create function myfun as '/root/udf/myfun.py' outputtype double language 'Python';
Create OK, 0 row(s) affected (0.005202s)

看起来很顺利,接下来 show 一下零碎中所有的自定义函数,确认创立胜利:

taos> show functions;
              name              |
=================================
 myfun                          |
Query OK, 1 row(s) in set (0.005767s)

接下来就来测试一下这个函数,测试之前先执行上面的 SQL 命令,制作些测试数据:

create database test;
create table t(ts timestamp, v1 int, v2 int, v3 int);
insert into t values('2023-05-01 12:13:14', 1, 2, 3);
insert into t values('2023-05-03 08:09:10', 2, 3, 4);
insert into t values('2023-05-10 07:06:05', 3, 4, 5);

测试 myfun 函数:

taos> select myfun(v1, v2) from t;

DB error: udf function execution failure (0.011088s)

可怜的是执行失败了,什么起因呢?
查看 udfd 过程的日志: /var/log/taos/udfd.log 发现以下错误信息:

05/24 22:46:28.733545 01665799 UDF ERROR can not load library libtaospyudf.so. error: operation not permitted
05/24 22:46:28.733561 01665799 UDF ERROR can not load python plugin. lib path libtaospyudf.so

谬误很明确:没有加载到 Python 插件 libtaospyudf.so,看官网文档原来是要先装置 taospyudf 这个 Python 包。于是:
pip3 install taospyudf
装置过程会编译 C++ 源码,因而零碎上要有 cmake 和 gcc。编译生成的 libtaospyudf.so 文件主动会被复制到 /usr/local/lib/ 目录,因而如果是非 root 用户,装置时需加 sudo。装置完能够查看这个目录是否有了这个文件:

root@slave11 ~/udf $ ls -l /usr/local/lib/libtaos*
-rw-r--r-- 1 root root 671344 May 24 22:54 /usr/local/lib/libtaospyudf.so

这时再去执行 SQL 测试 UDF,会发现报同样的谬误,起因是新装置的共享库还未失效,还需执行命令:
ldconfig
此时再去测试 UDF,终于胜利了:

taos> select myfun(v1) from t;
         myfun(v1)         |
============================
               0.693147181 |
               1.609437912 |
               2.302585093 |

至此,咱们实现了第一个 UDF 😊,并学会了简略的 debug 办法。
示例一改良:异样解决
下面的 myfun 尽管测试测试通过了,然而有两个毛病:

  1. 这个标量函数只承受 1 列数据作为输出,如果用户传入了多列也不会抛异样。咱们冀望改成:如果用户输出多列,则揭示用户输出谬误,这个函数只接管 1 个参数。
taos> select myfun(v1, v2) from t;
       myfun(v1, v2)       |
============================
               0.693147181 |
               1.609437912 |
               2.302585093 |
  1. 没有解决 null 值,如果用户输出了 null 值则会抛异样终止执行。咱们冀望改成:如果输出是 null,则输入也是 null,不影响后续执行。
    因而 process 函数改良如下:
def process(block):
    rows, cols = block.shape()
    if cols > 1:
        raise Exception(f"require 1 parameter but given {cols}")
    return [None if block.data(i, 0) is None else log(block.data(i, 0) ** 2 + 1) for i in range(rows)]

而后执行上面的语句更新已有的 UDF:

create or replace function myfun as '/root/udf/myfun.py' outputtype double language 'Python';

再传入 myfun 两个参数,就会执行失败了,

taos> select myfun(v1, v2) from t;

DB error: udf function execution failure (0.014643s)

但遗憾的是咱们自定义的异样信息没有展现给用户,而是在插件的日志文件 /var/log/taos/taospyudf.log 中:

2023-05-24 23:21:06.790 ERROR [1666188] [doPyUdfScalarProc@507] call pyUdfScalar proc function. context 0x7faade26d180. error: Exception: require 1 parameter but given 2

At:
  /var/lib/taos//.udf/myfun_3_1884e1281d9.py(12): process

至此,咱们学会了如何更新 UDF,并查看 UDF 输入的谬误日志。
(注:如果 UDF 更新后未失效,能够重启 taosd 试试,TDengine 3.0.5.0 及当前的版本会确保不重启 UDF 更新就能失效)
示例二:接管 n 个参数的 UDF
编写一个 UDF:输出(x1, x2, …, xn), 输入每个值和它们的序号的乘积的和:1 x1 + 2 x2 + … + n * xn。如果 x1 至 xn 中蕴含 null,则后果为 null。
这个示例与示例一的区别是,能够承受任意多列作为输出,且要解决每一列的值。编写 UDF 文件 /root/udf/nsum.py:

def init():
    pass


def destroy():
    pass


def process(block):
    rows, cols = block.shape()
    result = []
    for i in range(rows):
        total = 0
        for j in range(cols):
            v = block.data(i, j)
            if v is None:
                total = None
                break
            total += (j + 1) * block.data(i, j)
        result.append(total)
    return result

创立 UDF:

create function nsum as '/root/udf/nsum.py' outputtype double language 'Python';

测试:

taos> insert into t values('2023-05-25 09:09:15', 6, null, 8);
Insert OK, 1 row(s) affected (0.003675s)

taos> select ts, v1, v2, v3,  nsum(v1, v2, v3) from t;
           ts            |     v1      |     v2      |     v3      |     nsum(v1, v2, v3)      |
================================================================================================
 2023-05-01 12:13:14.000 |           1 |           2 |           3 |              14.000000000 |
 2023-05-03 08:09:10.000 |           2 |           3 |           4 |              20.000000000 |
 2023-05-10 07:06:05.000 |           3 |           4 |           5 |              26.000000000 |
 2023-05-25 09:09:15.000 |           6 |        NULL |           8 |                      NULL |
Query OK, 4 row(s) in set (0.010653s)

示例三:应用第三方库
编写一个 UDF,输出一个工夫戳,输入间隔这个工夫最近的下一个周日。比方明天是 2023-05-25,则下一个周日是 2023-05-28。
实现这个函数要用到第三方库 momen。先装置这个库:
pip3 install moment
而后编写 UDF 文件 /root/udf/nextsunday.py

import moment


def init():
    pass


def destroy():
    pass


def process(block):
    rows, cols = block.shape()
    if cols > 1:
        raise Exception("require only 1 parameter")
    if not type(block.data(0, 0)) is int:
        raise Exception("type error")
    return [moment.unix(block.data(i, 0)).replace(weekday=7).format('YYYY-MM-DD')
            for i in range(rows)]

UDF 框架会将 TDengine 的 timestamp 类型映射为 Python 的 int 类型,所以这个函数只承受一个示意毫秒数的整数。process 办法先做参数查看,而后用 moment 包替换工夫的星期为星期日,最初格式化输入。输入的字符串长度是固定的 10 个字符长,因而能够这样创立 UDF 函数:

create function nextsunday as '/root/udf/nextsunday.py' outputtype binary(10) language 'Python';

此时测试函数,如果你是用 systemctl 启动的 taosd,必定会遇到谬误:

taos> select ts, nextsunday(ts) from t;

DB error: udf function execution failure (1.123615s)
 tail -20 taospyudf.log  
2023-05-25 11:42:34.541 ERROR [1679419] [PyUdf::PyUdf@217] py udf load module failure. error ModuleNotFoundError: No module named 'moment'

这是因为“moment”所在位置不在 python udf 插件默认的库搜寻门路中。怎么确认这一点呢?通过以下命令搜寻 taospyudf.log:

grep 'sys path' taospyudf.log  | tail -1
2023-05-25 10:58:48.554 INFO  [1679419] [doPyOpen@592] python sys path: ['','/lib/python38.zip','/lib/python3.8','/lib/python3.8/lib-dynload','/lib/python3/dist-packages','/var/lib/taos//.udf']

发现 python udf 插件默认搜寻的第三方库装置门路是:/lib/python3/dist-packages,而 moment 默认装置到了 /usr/local/lib/python3.8/dist-packages。上面咱们批改 python udf 插件默认的库搜寻门路,把以后 python 解释器默认应用的库门路全副加进去。
先关上 python3 命令行,查看以后的 sys.path

>>> import sys
>>> ":".join(sys.path)
'/usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/usr/local/lib/python3.8/dist-packages:/usr/lib/python3/dist-packages'

复制下面脚本的输入的字符串,而后编辑 /var/taos/taos.cfg 退出以下配置:

UdfdLdLibPath /usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/usr/local/lib/python3.8/dist-packages:/usr/lib/python3/dist-packages

保留后执行 systemctl restart taosd, 再测试就不报错了:

taos> select ts, nextsunday(ts) from t;
           ts            | nextsunday(ts) |
===========================================
 2023-05-01 12:13:14.000 | 2023-05-07     |
 2023-05-03 08:09:10.000 | 2023-05-07     |
 2023-05-10 07:06:05.000 | 2023-05-14     |
 2023-05-25 09:09:15.000 | 2023-05-28     |
Query OK, 4 row(s) in set (1.011474s)

示例四:定义聚合函数
编写一个聚合函数,计算某一列最大值和最小值的差。
聚合函数与标量函数的区别是:标量函数是多行输出对应多个输入,聚合函数是多行输出对应一个输入。聚合函数的执行过程有点像经典的 map-reduce 框架的执行过程,框架把数据分成若干块,每个 mapper 解决一个块,reducer 再把 mapper 的后果做聚合。不一样的中央在于,对于 TDengine Python UDF 中的 reduce 函数既有 map 的性能又有 reduce 的性能。reduce 函数承受两个参数:一个是本人要解决的数据,一个是别的工作执行 reduce 函数的处理结果。如上面的示例 /root/udf/myspread.py:

import io
import math
import pickle

LOG_FILE: io.TextIOBase = None


def init():
    global LOG_FILE
    LOG_FILE = open("/var/log/taos/spread.log", "wt")
    log("init function myspead success")


def log(o):
    LOG_FILE.write(str(o) + '\n')


def destroy():
    log("close log file: spread.log")
    LOG_FILE.close()


def start():
    return pickle.dumps((-math.inf, math.inf))


def reduce(block, buf):
    max_number, min_number = pickle.loads(buf)
    log(f"initial max_number={max_number}, min_number={min_number}")
    rows, _ = block.shape()
    for i in range(rows):
        v = block.data(i, 0)
        if v > max_number:
            log(f"max_number={v}")
            max_number = v
        if v < min_number:
            log(f"min_number={v}")
            min_number = v
    return pickle.dumps((max_number, min_number))


def finish(buf):
    max_number, min_number = pickle.loads(buf)
    return max_number - min_number

在这个示例中咱们不光定义了一个聚合函数,还增加记录执行日志的性能,解说如下:

  1. init 函数不再是空函数,而是关上了一个文件用于写执行日志
  2. log 函数是记录日志的工具,主动将传入的对象转成字符串,加换行符输入
  3. destroy 函数用来在执行完结敞开文件
  4. start 返回了初始的 buffer,用来存聚合函数的两头后果,咱们把最大值初始化为负无穷大,最小值初始化为正无穷大
  5. reduce 解决每个数据块并聚合后果
  6. finish 函数将最终的 buffer 转换成最终的输入
    执行上面的 SQL 语句创立对应的 UDF:
create or replace aggregate function myspread as '/root/udf/myspread.py' outputtype double bufsize 128 language 'Python';

这个 SQL 语句与创立标量函数的 SQL 语句有两个重要区别:

  1. 减少了 aggregate 关键字
  2. 减少了 bufsize 关键字,用来指定存储两头后果的内存大小,这个数值能够大于理论应用的数值。本例两头后果是两个浮点数组成的 tuple,序列化后理论占用大小只有 32 个字节,但指定的 bufsize 是 128,能够用 python 命令行打印理论占用的字节数
>>> len(pickle.dumps((12345.6789, 23456789.9877)))
32

测试这个函数,能够看到 myspread 的输入后果和内置的 spread 函数的输入后果是统一的。

taos> select myspread(v1) from t;
       myspread(v1)        |
============================
               5.000000000 |
Query OK, 1 row(s) in set (0.013486s)

taos> select spread(v1) from t;
        spread(v1)         |
============================
               5.000000000 |
Query OK, 1 row(s) in set (0.005501s)

最初,查看咱们本人打印的执行日志,从日志能够看出,reduce 函数被执行了 3 次。执行过程中 max 值被更新了 4 次,min 值只被更新 1 次。

root@slave11 /var/log/taos $ cat spread.log
init function myspead success
initial max_number=-inf, min_number=inf
max_number=1
min_number=1
initial max_number=1, min_number=1
max_number=2
max_number=3
initial max_number=3, min_number=1
max_number=6
close log file: spread.log

通过这个示例,咱们学会了如何定义聚合函数,并打印自定义的日志信息。
要点总结

  1. 创立标量函数的语法
CREATE FUNCTION function_name AS library_path OUTPUTTYPE output_type LANGUAGE 'Python';

OUTPUTTYPE 对应的是 TDengine 的数据类型,如 TIMESTAMP,BIGINT,VARCHAR(64),类型映射关系见官网文档:https://docs.taosdata.com/develop/udf/。

  1. 创立聚合函数的语法
CREATE AGGREGATE FUNCTION function_name library_path OUTPUTTYPE output_type LANGUAGE 'Python';
  1. 更新 UDF 的语法
    更新标量函数
CREATE OR REPLACE FUNCTION function_name AS OUTPUTTYPE int LANGUAGE 'Python';

更新聚合函数

CREATE OR REPLACE AGGREGATE FUNCTION function_name AS OUTPUTTYPE BUFSIZE buf_size int LANGUAGE 'Python';

留神:如果加了“AGGREGATE”关键字,更新之后函数将被当作聚合函数,无论之前是什么类型的函数。相同,如果没有加“AGGREGATE”关键字,更新之后的函数将被当作标量函数,无论之前是什么类型的函数。

  1. 同名的 UDF 每更新一次,版本号会减少 1。用
    select * from ins_functions \G;
    可查看 UDF 的残缺信息,包含 UDF 的源码。
  2. 查看和删除已有的 UDF
SHOW functions;
DROP FUNCTION function_name;
  1. 装置 taospyudf 动静库
    sudo pip3 install taospyudf
    装置过程会从源码编译出共享库 libtaospyudf.so,因而零碎上要有 cmake 和 gcc,编译后这个库会被装置到 /usr/local/lib。装置完别忘了执行命令 ldconfig 更新零碎动态链接库。
  2. 调试 Python UDF 的两个重要日志文件

    1. /var/log/taos/udfdlog.* 这个文件是 UDF 框架的日志。框架负责加载各语言 UDF 的插件,执行 UDF 的生命周期函数
    2. /var/log/taos/taospyudf.log 这个文件是 libtaospyudf.so 输入的日志,每个文件最大 50M,最多保留 5 个。
  3. 定义标量函数最重要是要实现 process 函数,同时必须定义 init 和 destroy 函数即便什么都不做。
def init():
  pass
  
def process(block: datablock) -> tuple[output_type]:
    rows, cols = block.shape()
    result = []
    for i in range(rows):
        for j in range(cols):
            cell_data = block.data(i, j)
            # your logic here
    return result

def destroy():
  pass
  1. 定义聚合函数最重要是要实现 start, reduce 和 finish,同样必须定义 init 和 destroy 函数。
def init():
def destroy():
def start() -> bytes:
def reduce(inputs: datablock, buf: bytes) -> bytes
def finish(buf: bytes) -> output_type:

start 生成最后后果 buffer,而后输出数据会被分为多个行数据块,对每个数据块 inputs 和以后两头后果 buf 调用 reduce,失去新的两头后果,最初再调用 finish 从两头后果 buf 产生最终输入。

  1. 应用第三方 python 库。
    应用第三方库须要查看这个库是否装置到了 Python UDF 插件默认的库搜寻门路,如果没有须要批改 taos.cfg,增加 UdfdLdLibPath 配置,库门路用冒号分隔。
  2. UDF 内无奈通过 print 函数输入日志,须要本人写文件或用 python 内置的 logging 库写文件。
退出移动版