前言

Rust 语言是一门通用零碎级编程语言,无GC且能保障内存平安、并发平安和高性能而著称。自2008年开始由 Graydon Hoare 私人研发,2009年失去 Mozilla 资助,2010年首次公布 0.1.0 版本,用于Servo 引擎的研发,于 2015年5月15号公布 1.0 版本。 自公布以来,截止到2021 年的明天,经验六年的倒退,Rust 失去稳步回升,已逐步趋于成熟稳固。 至 2016 年开始,截止到 2021年,Rust 间断五年成为 StackOverflow 语言榜上最受欢迎的语言[1]。 2021年 2 月 9 号,Rust 基金会发表成立。华为、AWS、Google、微软、Mozilla、Facebook 等科技行业领军巨头退出 Rust 基金会,成为白金成员,以致力于在寰球范畴内推广和倒退 Rust 语言。

官方网如此介绍 Rust : 一门赋予每个人 构建牢靠且高效软件能力的语言。 Rust 语言有三大劣势值得大家关注:

  1. 高性能。Rust 速度惊人且内存利用率极高。因为没有运行时和垃圾回收,它可能胜任对性能要求特地高的服务,能够在嵌入式设施上运行,还能轻松和其余语言集成。
  2. 可靠性。Rust 丰盛的类型零碎和所有权模型保障了内存平安和线程平安,让您在编译期就可能打消各种各样的谬误。
  3. 生产力。Rust 领有杰出的文档、敌对的编译器和清晰的谬误提示信息, 还集成了一流的工具——包管理器和构建工具, 智能地主动补全和类型测验的多编辑器反对, 以及主动格式化代码等等。

Rust 和 C 都是硬件间接形象
Rust 和 C 都是间接对硬件的形象,都可看作一种「可移植汇编程序」。 Rust 和 C 都能管制数据结构的内存布局、整数大小、栈与堆内存调配、指针间接寻址等,并且个别都能翻译成可了解的机器代码,编译器很少插入 "魔法"。 即使 Rust 比 C 有更高层次的构造,如迭代器、特质(trait)和智能指针,它们也被设计为可预测地优化为简略的机器代码(又称 "零老本形象")。 Rust的类型的内存布局很简略,例如,可增长的字符串String 和 Vec 正好是{byte*, capacity, length}。Rust没有任何像 Cpp里的 挪动 或 复制构造函数 这样的概念,所以对象的传递保障不会比传递指针或 memcpy 更简单。

总的来说,Rust有媲美C的高性能,同时又具备高效的开发生产力,同时通过FFI能够高效的和其余语言如C进行混合编程或相互调用。本篇文章次要介绍如何利用Rust开发postgres extension。

部署开发环境
配置RUST开发环境

参考 https://www.rust-lang.org/too... IDE 举荐 https://code.visualstudio.com/ + https://rls.booyaa.wtf/

配置postgres开发环境
ubuntu

sudo apt-get install build-essential libreadline-dev zlib1g-dev flex bison libxml2-dev libxslt-dev libssl-dev libxml2-utils xsltproc

Red hat

sudo yum install -y bison-devel readline-devel zlib-devel openssl-devel wgetsudo yum groupinstall -y 'Development Tools'

cargo pgx 子命令装置
cargo install cargo-pgx

接下来执行命令cargo pgx init。

Discovered Postgres v13.3, v12.7, v11.12, v10.17  Downloading Postgres v12.7 from https://ftp.postgresql.org/pub/source/v12.7/postgresql-12.7.tar.bz2  Downloading Postgres v13.3 from https://ftp.postgresql.org/pub/source/v13.3/postgresql-13.3.tar.bz2  Downloading Postgres v10.17 from https://ftp.postgresql.org/pub/source/v10.17/postgresql-10.17.tar.bz2  Downloading Postgres v11.12 from https://ftp.postgresql.org/pub/source/v11.12/postgresql-11.12.tar.bz2     Removing /home/wdy/.pgx/10.17    Untarring Postgres v10.17 to /home/wdy/.pgx/10.17  Configuring Postgres v10.17     Removing /home/wdy/.pgx/11.12    Untarring Postgres v11.12 to /home/wdy/.pgx/11.12    Untarring Postgres v12.7 to /home/wdy/.pgx/12.7  Configuring Postgres v11.12     Removing /home/wdy/.pgx/13.3    Untarring Postgres v13.3 to /home/wdy/.pgx/13.3  Configuring Postgres v12.7    Compiling Postgres v10.17  Configuring Postgres v13.3    Compiling Postgres v11.12    Compiling Postgres v12.7    Compiling Postgres v13.3   Installing Postgres v10.17 to /home/wdy/.pgx/10.17/pgx-install   Installing Postgres v11.12 to /home/wdy/.pgx/11.12/pgx-install   Installing Postgres v12.7 to /home/wdy/.pgx/12.7/pgx-install   Installing Postgres v13.3 to /home/wdy/.pgx/13.3/pgx-install   Validating /home/wdy/.pgx/10.17/pgx-install/bin/pg_config Initializing data directory at /home/wdy/.pgx/data-10   Validating /home/wdy/.pgx/11.12/pgx-install/bin/pg_config Initializing data directory at /home/wdy/.pgx/data-11   Validating /home/wdy/.pgx/12.7/pgx-install/bin/pg_config Initializing data directory at /home/wdy/.pgx/data-12   Validating /home/wdy/.pgx/13.3/pgx-install/bin/pg_config Initializing data directory at /home/wdy/.pgx/data-13

这个命令会下载版本v10,v11,v12,v13的postgres而后编译到目录~/.pgx/中。这个下载步骤是必须的,因为后续pgx会为每中版本的postgres的header文件生成对应的Rust bindings,以及后续pgx 的测试框架中也会用到。

开发一个简略的extension
创立一个extension我的项目
应用上面命令创立一个名为my_extension的我的项目

cargo pgx new my_extension

命令执行后,会生成如下一个目录构造

├── Cargo.toml├── my_extension.control├── sql│   ├── lib.generated.sql│   └── load-order.txt└── src    └── lib.rs

sql/lib.generated.sql 内容如下

CREATE OR REPLACE FUNCTION "hello_my_extension"() RETURNS text STRICT LANGUAGE c AS 'MODULE_PATHNAME', 'hello_my_extension_wrapper';

src/lib.rs内容如下

use pgx::*;pg_module_magic!();#[pg_extern]fn hello_my_extension() -> &'static str {    "Hello, my_extension"}#[cfg(any(test, feature = "pg_test"))]mod tests {    use pgx::*;    #[pg_test]    fn test_hello_my_extension() {        assert_eq!("Hello, my_extension", crate::hello_my_extension());    }}#[cfg(test)]pub mod pg_test {    pub fn setup(_options: Vec<&str>) {        // perform one-off initialization when the pg_test framework starts    }    pub fn postgresql_conf_options() -> Vec<&'static str> {        // return any postgresql.conf settings that are required for your tests        vec![]    }}

能够看到pgx曾经默认给出了最简略的扩大实现。 #[pg_extern] 宏所润饰的函数就是咱们要实现的extension函数,mod tests , pub mod pg_test 是pgx曾经为咱们写好了的测试模块,用于编写相干测试代码。

pgx默认曾经给咱们写好了名为 hello_my_extension 的extension,性能很简略,就是返回 "Hello, my_extension" 字符串

运行extension

cd my_extension cargo pgx run pg13  # or pg10 or pg11 or pg12

应用cargo pgx run 后跟参数pg13或pg10或pg11或pg12,对应不同的postgres版本,cargo pgx run会把extension编译为一个 .so 共享库文件,复制到对应版本的 ~/.pgx/ 目录中,而后启动Postgres实例,通过psql连贯到和extension同名的数据库上。编译实现后,开发者就会处于psql的shell界面中,能够调用extension进行测试了。

这里咱们执行 cargo pgx run pg12 失去输入如下:

$ cargo pgx run pg12building extension with features `pg12`"cargo" "build" "--features" "pg12" "--no-default-features"    Updating crates.io index  Downloaded generic-array v0.14.4  Downloaded humantime v2.1.0  Downloaded lazycell v1.3.0  Downloaded shlex v1.0.0  Downloaded stable_deref_trait v1.2.0  Downloaded termcolor v1.1.2  Downloaded typenum v1.13.0  Downloaded time-macros v0.1.1  Downloaded which v3.1.1  Downloaded atomic-traits v0.2.0  Downloaded seahash v4.1.0  Downloaded uuid v0.8.2  Downloaded as-slice v0.1.5  Downloaded pgx v0.1.21  Downloaded peeking_take_while v0.1.2  Downloaded proc-macro-hack v0.5.19  Downloaded rustc-hash v1.1.0  Downloaded serde_cbor v0.11.1  Downloaded time-macros-impl v0.1.2  Downloaded bindgen v0.58.1  Downloaded cexpr v0.4.0  Downloaded env_logger v0.8.4  Downloaded standback v0.2.17  Downloaded generic-array v0.12.4  Downloaded getrandom v0.2.3  Downloaded clang-sys v1.2.0  Downloaded glob v0.3.0  Downloaded const_fn v0.4.8  Downloaded generic-array v0.13.3  Downloaded half v1.7.1  Downloaded hash32 v0.1.1  Downloaded enum-primitive-derive v0.2.1  Downloaded heapless v0.6.1  Downloaded build-deps v0.1.4  Downloaded libloading v0.7.0  Downloaded nom v5.1.2  Downloaded time v0.2.27  Downloaded pgx-macros v0.1.21  Downloaded pgx-pg-sys v0.1.21  Downloaded 39 crates (2.0 MB) in 2.74s   Compiling version_check v0.9.3   Compiling libc v0.2.98   Compiling cfg-if v1.0.0   Compiling autocfg v1.0.1   Compiling proc-macro2 v1.0.27   Compiling unicode-xid v0.2.2   Compiling syn v1.0.73   Compiling memchr v2.4.0   Compiling lazy_static v1.4.0   Compiling serde_derive v1.0.126   Compiling tinyvec_macros v0.1.0   Compiling getrandom v0.1.16   Compiling matches v0.1.8   Compiling serde v1.0.126   Compiling log v0.4.14   Compiling crossbeam-utils v0.8.5   Compiling byteorder v1.4.3   Compiling glob v0.3.0   Compiling percent-encoding v2.1.0   Compiling crc32fast v1.2.1   Compiling ryu v1.0.5   Compiling adler v1.0.2   Compiling ppv-lite86 v0.2.10   Compiling crossbeam-epoch v0.9.5   Compiling regex-syntax v0.6.25   Compiling bitflags v1.2.1   Compiling mime v0.3.16   Compiling typenum v1.13.0   Compiling rayon-core v1.9.1   Compiling scopeguard v1.1.0   Compiling serde_json v1.0.64   Compiling unicode-width v0.1.8   Compiling itoa v0.4.7   Compiling base64 v0.11.0   Compiling httpdate v0.3.2   Compiling xml-rs v0.8.3   Compiling proc-macro-hack v0.5.19   Compiling humantime v2.1.0   Compiling vec_map v0.8.2   Compiling semver-parser v0.7.0   Compiling ansi_term v0.11.0   Compiling unescape v0.1.0   Compiling bindgen v0.58.1   Compiling termcolor v1.1.2   Compiling strsim v0.8.0   Compiling either v1.6.1   Compiling shlex v1.0.0   Compiling lazycell v1.3.0   Compiling peeking_take_while v0.1.2   Compiling rustc-hash v1.1.0   Compiling const_fn v0.4.8   Compiling stable_deref_trait v1.2.0   Compiling heapless v0.6.1   Compiling half v1.7.1   Compiling cfg-if v0.1.10   Compiling once_cell v1.8.0   Compiling seahash v4.1.0   Compiling libloading v0.7.0   Compiling tinyvec v1.2.0   Compiling unicode-bidi v0.3.5   Compiling unicase v2.6.0   Compiling nom v5.1.2   Compiling standback v0.2.17   Compiling generic-array v0.14.4   Compiling time v0.2.27   Compiling memoffset v0.6.4   Compiling miniz_oxide v0.4.4   Compiling rayon v1.5.1   Compiling num-traits v0.2.14   Compiling form_urlencoded v1.0.1   Compiling hash32 v0.1.1   Compiling build-deps v0.1.4   Compiling clang-sys v1.2.0   Compiling textwrap v0.11.0   Compiling semver v0.9.0   Compiling unicode-normalization v0.1.19   Compiling rustc_version v0.2.3   Compiling aho-corasick v0.7.18   Compiling quote v1.0.9   Compiling atty v0.2.14   Compiling dirs-sys v0.3.6   Compiling socks v0.3.3   Compiling num_cpus v1.13.0   Compiling which v3.1.1   Compiling getrandom v0.2.3   Compiling crossbeam-channel v0.5.1   Compiling idna v0.2.3   Compiling mime_guess v2.0.3   Compiling atomic-traits v0.2.0   Compiling generic-array v0.13.3   Compiling generic-array v0.12.4   Compiling colored v2.0.0   Compiling clap v2.33.3   Compiling rand_core v0.5.1   Compiling dirs v3.0.2   Compiling regex v1.5.4   Compiling uuid v0.8.2   Compiling flate2 v1.0.20   Compiling url v2.2.2   Compiling cexpr v0.4.0   Compiling as-slice v0.1.5   Compiling rand_chacha v0.2.2   Compiling crossbeam-deque v0.8.0   Compiling env_proxy v0.4.1   Compiling env_logger v0.8.4   Compiling rand v0.7.3   Compiling rttp_client v0.1.0   Compiling thiserror-impl v1.0.26   Compiling time-macros-impl v0.1.2   Compiling enum-primitive-derive v0.2.1   Compiling time-macros v0.1.1   Compiling thiserror v1.0.26   Compiling serde-xml-rs v0.4.1   Compiling toml v0.5.8   Compiling serde_cbor v0.11.1   Compiling pgx-utils v0.1.21   Compiling pgx-pg-sys v0.1.21   Compiling pgx-macros v0.1.21   Compiling pgx v0.1.21   Compiling my_extension v0.0.0 (/home/wdy/gitlab/valid-my-idea/rust-dig/pgextionsion/my_extension)    Finished dev [unoptimized + debuginfo] target(s) in 1m 28sinstalling extension      Copying control file to `/home/wdy/.pgx/12.7/pgx-install/share/postgresql/extension/my_extension.control`      Copying shared library to `/home/wdy/.pgx/12.7/pgx-install/lib/postgresql/my_extension.so`      Writing extension schema to `/home/wdy/.pgx/12.7/pgx-install/share/postgresql/extension/my_extension--1.0.sql`     Finished installing my_extension     Starting Postgres v12 on port 28812     Creating database my_extensionpsql (12.7)Type "help" for help.my_extension=#

能够看到最初进入了名为my_extension的psql界面中。 接下来创立EXTENSION并调用,能够看到失常输入字符串

my_extension=# create extension my_extension;CREATE EXTENSIONmy_extension=# select hello_my_extension(); hello_my_extension  --------------------- Hello, my_extension(1 row)

Array
Rust的Vec类型对应 postgrs中的ARRAY[]:: ,如果某个扩大函数要返回多个雷同类型的value,能够应用Vec进行返回,如下例中的 static_names 和 static_names_2d,最终在psql终端调用后返回的是单行单列。 如果某个扩大返回的是一个Iterator,那么返回的是一个多行单列数据,参见上面例子中的 static_names_iter 和 static_names_2d_iter。

代码

#[pg_extern]fn static_names() -> Vec<Option<&'static str>> {    vec![Some("King"), Some("Eastern"), None, Some("Sun")]}#[pg_extern]fn static_names_iter() -> impl std::iter::Iterator<Item = Option<&'static str>> {    vec![Some("Brandy"), Some("Sally"), None, Some("Anchovy")].into_iter()}#[pg_extern]fn static_names_2d() -> Vec<Vec<Option<&'static str>>> {    vec![        vec![Some("Brandy"), Some("Sally"), None, Some("Anchovy")],        vec![Some("Eric"), Some("David")],        vec![Some("ZomboDB"), Some("PostgreSQL"), Some("Elasticsearch")],    ]}#[pg_extern]fn static_names_2d_iter() -> impl std::iter::Iterator<Item = Vec<Option<&'static str>>> {    vec![        vec![Some("Brandy"), Some("Sally"), None, Some("Anchovy")],        vec![Some("Eric"), Some("David")],        vec![Some("ZomboDB"), Some("PostgreSQL"), Some("Elasticsearch")],    ]    .into_iter()}

验证

执行 cargo pgx run pg13

installing extension      Copying control file to `/home/wdy/.pgx/13.3/pgx-install/share/postgresql/extension/my_extension.control`      Copying shared library to `/home/wdy/.pgx/13.3/pgx-install/lib/postgresql/my_extension.so`      Writing extension schema to `/home/wdy/.pgx/13.3/pgx-install/share/postgresql/extension/my_extension--1.0.sql`     Finished installing my_extension     Starting Postgres v13 on port 28813     Re-using existing database my_extensionpsql (13.3)Type "help" for help.my_extension=# drop extension my_extension;create extension my_extension;DROP EXTENSIONCREATE EXTENSIONmy_extension=# select static_names();        static_names         ----------------------------- {King,Eastern,NULL,Sun}(1 row)my_extension=# select static_names_iter(); static_names_iter ------------------- Brandy Sally Anchovy(4 rows)my_extension=# select static_names_2d();                                   static_names_2d                                   ------------------------------------------------------------------------------------- {"{Brandy,Sally,NULL,Anchovy}","{Eric,David}","{ZomboDB,PostgreSQL,Elasticsearch}"}(1 row)my_extension=# select static_names_2d_iter();        static_names_2d_iter        ------------------------------------ {Brandy,Sally,NULL,Anchovy} {Eric,David} {ZomboDB,PostgreSQL,Elasticsearch}(3 rows)

残缺代码
残缺代码

AGGREGATE扩大
Postgres能够通过 CREATE AGGREGATE 定义一个新的汇集函数。一个简略的汇集函数蕴含一个或两个一般的函数:

  1. 状态转移函数 sfunc
  2. 可选的最终计算函数 ffunc
sfunc( internal-state, next-data-values ) ---> next-internal-stateffunc( internal-state ) ---> aggregate-value
  1. 可能还须要定义汇集函数外部状态的数据类型 stype
  2. 外部函数状态初始值 initcond ,是一个类型text的值。

那么咱们只需参考下面的样例 开发对应的 sfunc, ffunc ,stype,而后再应用 CREATE AGGREGATE 创立汇集函数。

创立扩大
执行上面命令
cargo pgx new my_agg

定义外部状态类型 stype

类型 IntegerAvgState 用于汇集函数计算过程中存储中间状态数据

#[derive(Serialize, Deserialize, PostgresType)]pub struct IntegerAvgState {    sum: i32,    n: i32,}impl Default for IntegerAvgState {    fn default() -> Self {        Self { sum: 0, n: 0 }    }}impl IntegerAvgState {    fn acc(&self, v: i32) -> Self {        Self {            sum: self.sum + v,            n: self.n + 1,        }    }    fn finalize(&self) -> i32 {        self.sum / self.n    }}

状态转移函数 sfunc

#[pg_extern]fn integer_avg_state_func(    internal_state: IntegerAvgState,    next_data_value: i32,) -> IntegerAvgState {    internal_state.acc(next_data_value)}

最终计算函数 ffunc

#[pg_extern]fn integer_avg_final_func(internal_state: IntegerAvgState) -> i32 {    internal_state.finalize()}

创立汇集函数

extension_sql!(    r#"    CREATE AGGREGATE MYAVG (integer)    (        sfunc = integer_avg_state_func,        stype = IntegerAvgState,        finalfunc = integer_avg_final_func,        initcond = '{"sum": 0, "n": 0}'    );    "#);

残缺代码

验证汇集函数
执行 cargo pgx run pg12 进入到postgres的psql命令行界面,而后执行上面操作

my_agg=# DROP EXTENSION my_agg; CREATE EXTENSION my_agg;DROP EXTENSIONCREATE EXTENSIONmy_agg=#  create table t (c integer);CREATE TABLEmy_agg=# insert into t (c) values (1), (2), (3);INSERT 0 3my_agg=# select MYAVG(c) from t; myavg -------     2(1 row)my_agg=# drop table t;DROP TABLEmy_agg=#  create table t (c integer,b text);CREATE TABLEmy_agg=# insert into t (c,b) values (1,'king'), (2,'eastern'), (3,'sun');INSERT 0 3my_agg=# select MYAVG(c) from t; myavg -------     2(1 row)

残缺代码
残缺代码

TOPN AGGREGATE扩大
当初咱们实现一个略微简单一些的汇集函数,求表中某列value最大的10个数值,经典的办法就是采纳最小堆实现。 Rust规范库中 BinaryHeap 默认是最大堆,通过应用 Reverse 进行封装,就能够失去一个最小堆。

创立扩大

cargo pgx new my_topn

定义外部状态类型 stype

#[derive(Serialize, Deserialize, PostgresType)]pub struct TopState {    min_heap:  BinaryHeap<Reverse<i32>>,    n:usize,}impl Default for TopState {    fn default() -> Self {        Self { min_heap:BinaryHeap::new(),n:10 }    }}

为 TopState 实现 acc办法,用于状态转移,更新最小堆

impl TopState {    fn acc(& mut self, v: i32)  {        if self.min_heap.len()<self.n{            self.min_heap.push(Reverse(v));            return         }        // 取出以后最小堆上的最小值        let top = self.min_heap.peek().unwrap().0;        // 如果比最小值还小 , 必定不会是要求的最大的 10 个数值, 间接抛弃        if v<=top{            return         }        // 插入到最小堆中,而后移除堆中的最小值        self.min_heap.push(Reverse(v));        self.min_heap.pop().unwrap();        return     }}

状态转移函数 sfunc

#[pg_extern]fn integer_topn_state_func(   mut internal_state: TopState,    next_data_value: i32,) -> TopState {    internal_state.acc(next_data_value);    internal_state}

最终计算函数 ffunc

#[pg_extern]fn integer_topn_final_func(internal_state: TopState) -> Vec<i32> {    internal_state.min_heap.into_sorted_vec().iter().map(|x|x.0).collect()}

创立汇集函数 MYMAXN

extension_sql!(    r#"    CREATE AGGREGATE MYMAXN (integer)    (        sfunc = integer_topn_state_func,        stype = TopState,        finalfunc = integer_topn_final_func,        initcond = '{"min_heap":[],"n":10}'    );    "#);

验证汇集函数 MYMAXN

执行 cargo pgx run pg13 进入到postgres的psql命令行界面 执行上面sql命令创立测试数据

create table people(    id        integer,    name      varchar(32),    age       integer,    grade     numeric(4, 2),    birthday  date,    logintime timestamp);insert into peopleselect generate_series(1,20) as id,md5(random()::text) as name,(random()*100)::integer as age,(random()*99)::numeric(4,2) as grade,now() - ((random()*1000)::integer||' day')::interval as birthday,clock_timestamp() as logintime;

失去测试数据如下

my_topn=# select age from people ; age -----  28  62   3  20  46  23  74  19  46  26  70  90  22  45  30  46  43  70  78  96(20 rows)

执行汇集函数MYMAXN

my_topn=# select MYMAXN(age) from people;             mymaxn              --------------------------------- {96,90,78,74,70,70,62,46,46,46}(1 row)

残缺代码

留神
如果没有指定 initcond,则在创立extension的时候会报如下的谬误提醒

ERROR:  must not omit initial value when transition function is strict and transition type is not compatible with input type

升级版TOPN

同时返回表中中指定列最大的n个值和最小的n个值。在原有的数据结构中新增一个最大堆即可。

定义外部状态类型 stype

#[derive(Serialize, Deserialize, PostgresType)]pub struct TopState {    min_heap:  BinaryHeap<Reverse<i32>>,    max_heap:  BinaryHeap<i32>,    n:usize,}impl Default for TopState {    fn default() -> Self {        Self { min_heap:BinaryHeap::new(),max_heap:BinaryHeap::new(),n:10 }    }}

为 TopState 实现 acc办法,用于状态转移,更新最小堆

impl TopState {    fn acc(& mut self, v: i32)  {        if self.min_heap.len()<self.n{            self.min_heap.push(Reverse(v));            return         }        // 取出以后最小堆上的最小值        let top = self.min_heap.peek().unwrap().0;        // 如果比最小值还小 , 必定不会是要求的最大的 10 个数值, 间接抛弃        if v<=top{            return         }        // 插入到最小堆中,而后移除堆中的最小值        self.min_heap.push(Reverse(v));        self.min_heap.pop().unwrap();        return     }    fn acc_max(& mut self, v: i32)  {        if self.max_heap.len()<self.n{            self.max_heap.push(v);            return         }        // 取出以后最大堆上的最大值        let top = self.max_heap.peek().unwrap();        // 如果比最大值还大 , 必定不会是要求的最小的 10 个数值, 间接抛弃        if v>=*top{            return         }        // 插入到最大堆中,而后移除堆中的最大值        self.max_heap.push(v);        self.max_heap.pop().unwrap();        return     }}

状态转移函数 sfunc

#[pg_extern]fn integer_topn_state_func(   mut internal_state: TopState,    next_data_value: i32,) -> TopState {    internal_state.acc(next_data_value);    internal_state.acc_max(next_data_value);    internal_state}

最终计算函数 ffunc

#[pg_extern]fn integer_topn_final_func(internal_state: TopState) -> Vec<Vec<i32>> {    vec![    internal_state.min_heap.into_sorted_vec().iter().map(|x|x.0).collect(),    internal_state.max_heap.into_sorted_vec(),    ]}

创立汇集函数 MYMAXN

extension_sql!(    r#"    CREATE AGGREGATE MYMAXN (integer)    (        sfunc = integer_topn_state_func,        stype = TopState,        finalfunc = integer_topn_final_func,        initcond = '{"min_heap":[],"max_heap":[],"n":10}'    );    "#);

验证汇集函数 MYMAXN

my_topn=# select MYMAXN(age) from people;                                mymaxn                                ---------------------------------------------------------------------- {"{96,90,78,74,70,70,62,46,46,46}","{3,19,20,22,23,26,28,30,43,45}"}(1 row)

残缺代码

参考文献

  1. https://mp.weixin.qq.com/s/9r...
  2. Postgres Compile and Install from source code https://wiki.postgresql.org/w...
  3. cargo pgx 命令介绍 https://github.com/zombodb/pg...
  4. https://blog.timescale.com/bl...
  5. https://www.postgresql.org/do...