关于rust:Rust开发postgres扩展

4次阅读

共计 16774 个字符,预计需要花费 42 分钟才能阅读完成。

前言

Rust 语言是一门通用零碎级编程语言,无 GC 且能保障内存平安、并发平安和高性能而著称。自 2008 年开始由 Graydon Hoare 私人研发,2009 年失去 Mozilla 资助,2010 年首次公布 0.1.0 版本,用于 Servo 引擎的研发,于 2015 年 5 月 15 号公布 1.0 版本。自公布以来,截止到 2021 年的明天,经验六年的倒退,Rust 失去稳步回升,已逐步趋于成熟稳固。至 2016 年开始,截止到 2021 年,Rust 间断五年成为 StackOverflow 语言榜上最受欢迎的语言[1]。2021 年 2 月 9 号,Rust 基金会发表成立。华为、AWS、Google、微软、Mozilla、Facebook 等科技行业领军巨头退出 Rust 基金会,成为白金成员,以致力于在寰球范畴内推广和倒退 Rust 语言。​

官方网如此介绍 Rust : 一门赋予每个人 构建牢靠且高效软件能力的语言。Rust 语言有三大劣势值得大家关注:

  1. 高性能。Rust 速度惊人且内存利用率极高。因为没有运行时和垃圾回收,它可能胜任对性能要求特地高的服务,能够在嵌入式设施上运行,还能轻松和其余语言集成。
  2. 可靠性。Rust 丰盛的类型零碎和所有权模型保障了内存平安和线程平安,让您在编译期就可能打消各种各样的谬误。
  3. 生产力。Rust 领有杰出的文档、敌对的编译器和清晰的谬误提示信息,还集成了一流的工具——包管理器和构建工具,智能地主动补全和类型测验的多编辑器反对,以及主动格式化代码等等。

Rust 和 C 都是硬件间接形象
Rust 和 C 都是间接对硬件的形象,都可看作一种「可移植汇编程序」。Rust 和 C 都能管制数据结构的内存布局、整数大小、栈与堆内存调配、指针间接寻址等,并且个别都能翻译成可了解的机器代码,编译器很少插入 “ 魔法 ”。即使 Rust 比 C 有更高层次的构造,如迭代器、特质(trait)和智能指针,它们也被设计为可预测地优化为简略的机器代码(又称 “ 零老本形象 ”)。Rust 的类型的内存布局很简略,例如,可增长的字符串 String 和 Vec 正好是{byte*, capacity, length}。Rust 没有任何像 Cpp 里的 挪动 或 复制构造函数 这样的概念,所以对象的传递保障不会比传递指针或 memcpy 更简单。

总的来说,Rust 有媲美 C 的高性能,同时又具备高效的开发生产力,同时通过 FFI 能够高效的和其余语言如 C 进行混合编程或相互调用。本篇文章次要介绍如何利用 Rust 开发 postgres extension。​

部署开发环境
配置 RUST 开发环境

参考 https://www.rust-lang.org/too… IDE 举荐 https://code.visualstudio.com/ + https://rls.booyaa.wtf/

配置 postgres 开发环境
ubuntu

sudo apt-get install build-essential libreadline-dev zlib1g-dev flex bison libxml2-dev libxslt-dev libssl-dev libxml2-utils xsltproc

Red hat

sudo yum install -y bison-devel readline-devel zlib-devel openssl-devel wget
sudo yum groupinstall -y 'Development Tools'

cargo pgx 子命令装置
cargo install cargo-pgx

接下来执行命令 cargo pgx init。

Discovered Postgres v13.3, v12.7, v11.12, v10.17
  Downloading Postgres v12.7 from https://ftp.postgresql.org/pub/source/v12.7/postgresql-12.7.tar.bz2
  Downloading Postgres v13.3 from https://ftp.postgresql.org/pub/source/v13.3/postgresql-13.3.tar.bz2
  Downloading Postgres v10.17 from https://ftp.postgresql.org/pub/source/v10.17/postgresql-10.17.tar.bz2
  Downloading Postgres v11.12 from https://ftp.postgresql.org/pub/source/v11.12/postgresql-11.12.tar.bz2
     Removing /home/wdy/.pgx/10.17
    Untarring Postgres v10.17 to /home/wdy/.pgx/10.17
  Configuring Postgres v10.17
     Removing /home/wdy/.pgx/11.12
    Untarring Postgres v11.12 to /home/wdy/.pgx/11.12
    Untarring Postgres v12.7 to /home/wdy/.pgx/12.7
  Configuring Postgres v11.12
     Removing /home/wdy/.pgx/13.3
    Untarring Postgres v13.3 to /home/wdy/.pgx/13.3
  Configuring Postgres v12.7
    Compiling Postgres v10.17
  Configuring Postgres v13.3
    Compiling Postgres v11.12
    Compiling Postgres v12.7
    Compiling Postgres v13.3
   Installing Postgres v10.17 to /home/wdy/.pgx/10.17/pgx-install
   Installing Postgres v11.12 to /home/wdy/.pgx/11.12/pgx-install
   Installing Postgres v12.7 to /home/wdy/.pgx/12.7/pgx-install
   Installing Postgres v13.3 to /home/wdy/.pgx/13.3/pgx-install
   Validating /home/wdy/.pgx/10.17/pgx-install/bin/pg_config
 Initializing data directory at /home/wdy/.pgx/data-10
   Validating /home/wdy/.pgx/11.12/pgx-install/bin/pg_config
 Initializing data directory at /home/wdy/.pgx/data-11
   Validating /home/wdy/.pgx/12.7/pgx-install/bin/pg_config
 Initializing data directory at /home/wdy/.pgx/data-12
   Validating /home/wdy/.pgx/13.3/pgx-install/bin/pg_config
 Initializing data directory at /home/wdy/.pgx/data-13

这个命令会下载版本 v10,v11,v12,v13 的 postgres 而后编译到目录~/.pgx/ 中。这个下载步骤是必须的,因为后续 pgx 会为每中版本的 postgres 的 header 文件生成对应的 Rust bindings,以及后续 pgx 的测试框架中也会用到。​

开发一个简略的 extension
创立一个 extension 我的项目
应用上面命令创立一个名为 my_extension 的我的项目

cargo pgx new my_extension

命令执行后,会生成如下一个目录构造

├── Cargo.toml
├── my_extension.control
├── sql
│   ├── lib.generated.sql
│   └── load-order.txt
└── src
    └── lib.rs

sql/lib.generated.sql 内容如下

CREATE OR REPLACE FUNCTION "hello_my_extension"() RETURNS text STRICT LANGUAGE c AS 'MODULE_PATHNAME', 'hello_my_extension_wrapper';

src/lib.rs 内容如下

use pgx::*;

pg_module_magic!();

#[pg_extern]
fn hello_my_extension() -> &'static str {"Hello, my_extension"}

#[cfg(any(test, feature = "pg_test"))]
mod tests {
    use pgx::*;

    #[pg_test]
    fn test_hello_my_extension() {assert_eq!("Hello, my_extension", crate::hello_my_extension());
    }

}

#[cfg(test)]
pub mod pg_test {pub fn setup(_options: Vec<&str>) {// perform one-off initialization when the pg_test framework starts}

    pub fn postgresql_conf_options() -> Vec<&'static str> {
        // return any postgresql.conf settings that are required for your tests
        vec![]}
}

能够看到 pgx 曾经默认给出了最简略的扩大实现。#[pg_extern] 宏所润饰的函数就是咱们要实现的 extension 函数,mod tests , pub mod pg_test 是 pgx 曾经为咱们写好了的测试模块,用于编写相干测试代码。​

pgx 默认曾经给咱们写好了名为 hello_my_extension 的 extension,性能很简略,就是返回 “Hello, my_extension” 字符串 ​

运行 extension

cd my_extension 
cargo pgx run pg13  # or pg10 or pg11 or pg12

应用 cargo pgx run 后跟参数 pg13 或 pg10 或 pg11 或 pg12,对应不同的 postgres 版本,cargo pgx run 会把 extension 编译为一个 .so 共享库文件,复制到对应版本的 ~/.pgx/ 目录中,而后启动 Postgres 实例,通过 psql 连贯到和 extension 同名的数据库上。编译实现后,开发者就会处于 psql 的 shell 界面中,能够调用 extension 进行测试了。​

这里咱们执行 cargo pgx run pg12 失去输入如下:

$ cargo pgx run pg12
building extension with features `pg12`
"cargo" "build" "--features" "pg12" "--no-default-features"
    Updating crates.io index
  Downloaded generic-array v0.14.4
  Downloaded humantime v2.1.0
  Downloaded lazycell v1.3.0
  Downloaded shlex v1.0.0
  Downloaded stable_deref_trait v1.2.0
  Downloaded termcolor v1.1.2
  Downloaded typenum v1.13.0
  Downloaded time-macros v0.1.1
  Downloaded which v3.1.1
  Downloaded atomic-traits v0.2.0
  Downloaded seahash v4.1.0
  Downloaded uuid v0.8.2
  Downloaded as-slice v0.1.5
  Downloaded pgx v0.1.21
  Downloaded peeking_take_while v0.1.2
  Downloaded proc-macro-hack v0.5.19
  Downloaded rustc-hash v1.1.0
  Downloaded serde_cbor v0.11.1
  Downloaded time-macros-impl v0.1.2
  Downloaded bindgen v0.58.1
  Downloaded cexpr v0.4.0
  Downloaded env_logger v0.8.4
  Downloaded standback v0.2.17
  Downloaded generic-array v0.12.4
  Downloaded getrandom v0.2.3
  Downloaded clang-sys v1.2.0
  Downloaded glob v0.3.0
  Downloaded const_fn v0.4.8
  Downloaded generic-array v0.13.3
  Downloaded half v1.7.1
  Downloaded hash32 v0.1.1
  Downloaded enum-primitive-derive v0.2.1
  Downloaded heapless v0.6.1
  Downloaded build-deps v0.1.4
  Downloaded libloading v0.7.0
  Downloaded nom v5.1.2
  Downloaded time v0.2.27
  Downloaded pgx-macros v0.1.21
  Downloaded pgx-pg-sys v0.1.21
  Downloaded 39 crates (2.0 MB) in 2.74s
   Compiling version_check v0.9.3
   Compiling libc v0.2.98
   Compiling cfg-if v1.0.0
   Compiling autocfg v1.0.1
   Compiling proc-macro2 v1.0.27
   Compiling unicode-xid v0.2.2
   Compiling syn v1.0.73
   Compiling memchr v2.4.0
   Compiling lazy_static v1.4.0
   Compiling serde_derive v1.0.126
   Compiling tinyvec_macros v0.1.0
   Compiling getrandom v0.1.16
   Compiling matches v0.1.8
   Compiling serde v1.0.126
   Compiling log v0.4.14
   Compiling crossbeam-utils v0.8.5
   Compiling byteorder v1.4.3
   Compiling glob v0.3.0
   Compiling percent-encoding v2.1.0
   Compiling crc32fast v1.2.1
   Compiling ryu v1.0.5
   Compiling adler v1.0.2
   Compiling ppv-lite86 v0.2.10
   Compiling crossbeam-epoch v0.9.5
   Compiling regex-syntax v0.6.25
   Compiling bitflags v1.2.1
   Compiling mime v0.3.16
   Compiling typenum v1.13.0
   Compiling rayon-core v1.9.1
   Compiling scopeguard v1.1.0
   Compiling serde_json v1.0.64
   Compiling unicode-width v0.1.8
   Compiling itoa v0.4.7
   Compiling base64 v0.11.0
   Compiling httpdate v0.3.2
   Compiling xml-rs v0.8.3
   Compiling proc-macro-hack v0.5.19
   Compiling humantime v2.1.0
   Compiling vec_map v0.8.2
   Compiling semver-parser v0.7.0
   Compiling ansi_term v0.11.0
   Compiling unescape v0.1.0
   Compiling bindgen v0.58.1
   Compiling termcolor v1.1.2
   Compiling strsim v0.8.0
   Compiling either v1.6.1
   Compiling shlex v1.0.0
   Compiling lazycell v1.3.0
   Compiling peeking_take_while v0.1.2
   Compiling rustc-hash v1.1.0
   Compiling const_fn v0.4.8
   Compiling stable_deref_trait v1.2.0
   Compiling heapless v0.6.1
   Compiling half v1.7.1
   Compiling cfg-if v0.1.10
   Compiling once_cell v1.8.0
   Compiling seahash v4.1.0
   Compiling libloading v0.7.0
   Compiling tinyvec v1.2.0
   Compiling unicode-bidi v0.3.5
   Compiling unicase v2.6.0
   Compiling nom v5.1.2
   Compiling standback v0.2.17
   Compiling generic-array v0.14.4
   Compiling time v0.2.27
   Compiling memoffset v0.6.4
   Compiling miniz_oxide v0.4.4
   Compiling rayon v1.5.1
   Compiling num-traits v0.2.14
   Compiling form_urlencoded v1.0.1
   Compiling hash32 v0.1.1
   Compiling build-deps v0.1.4
   Compiling clang-sys v1.2.0
   Compiling textwrap v0.11.0
   Compiling semver v0.9.0
   Compiling unicode-normalization v0.1.19
   Compiling rustc_version v0.2.3
   Compiling aho-corasick v0.7.18
   Compiling quote v1.0.9
   Compiling atty v0.2.14
   Compiling dirs-sys v0.3.6
   Compiling socks v0.3.3
   Compiling num_cpus v1.13.0
   Compiling which v3.1.1
   Compiling getrandom v0.2.3
   Compiling crossbeam-channel v0.5.1
   Compiling idna v0.2.3
   Compiling mime_guess v2.0.3
   Compiling atomic-traits v0.2.0
   Compiling generic-array v0.13.3
   Compiling generic-array v0.12.4
   Compiling colored v2.0.0
   Compiling clap v2.33.3
   Compiling rand_core v0.5.1
   Compiling dirs v3.0.2
   Compiling regex v1.5.4
   Compiling uuid v0.8.2
   Compiling flate2 v1.0.20
   Compiling url v2.2.2
   Compiling cexpr v0.4.0
   Compiling as-slice v0.1.5
   Compiling rand_chacha v0.2.2
   Compiling crossbeam-deque v0.8.0
   Compiling env_proxy v0.4.1
   Compiling env_logger v0.8.4
   Compiling rand v0.7.3
   Compiling rttp_client v0.1.0
   Compiling thiserror-impl v1.0.26
   Compiling time-macros-impl v0.1.2
   Compiling enum-primitive-derive v0.2.1
   Compiling time-macros v0.1.1
   Compiling thiserror v1.0.26
   Compiling serde-xml-rs v0.4.1
   Compiling toml v0.5.8
   Compiling serde_cbor v0.11.1
   Compiling pgx-utils v0.1.21
   Compiling pgx-pg-sys v0.1.21
   Compiling pgx-macros v0.1.21
   Compiling pgx v0.1.21
   Compiling my_extension v0.0.0 (/home/wdy/gitlab/valid-my-idea/rust-dig/pgextionsion/my_extension)
    Finished dev [unoptimized + debuginfo] target(s) in 1m 28s

installing extension
      Copying control file to `/home/wdy/.pgx/12.7/pgx-install/share/postgresql/extension/my_extension.control`
      Copying shared library to `/home/wdy/.pgx/12.7/pgx-install/lib/postgresql/my_extension.so`
      Writing extension schema to `/home/wdy/.pgx/12.7/pgx-install/share/postgresql/extension/my_extension--1.0.sql`
     Finished installing my_extension
     Starting Postgres v12 on port 28812
     Creating database my_extension
psql (12.7)
Type "help" for help.

my_extension=#

能够看到最初进入了名为 my_extension 的 psql 界面中。接下来创立 EXTENSION 并调用,能够看到失常输入字符串

my_extension=# create extension my_extension;
CREATE EXTENSION
my_extension=# select hello_my_extension();
 hello_my_extension  
---------------------
 Hello, my_extension
(1 row)

Array
Rust 的 Vec 类型对应 postgrs 中的 ARRAY[]::,如果某个扩大函数要返回多个雷同类型的 value,能够应用 Vec 进行返回,如下例中的 static_names 和 static_names_2d,最终在 psql 终端调用后返回的是单行单列。如果某个扩大返回的是一个 Iterator,那么返回的是一个多行单列数据,参见上面例子中的 static_names_iter 和 static_names_2d_iter。​

代码

#[pg_extern]
fn static_names() -> Vec<Option<&'static str>> {vec![Some("King"), Some("Eastern"), None, Some("Sun")]
}

#[pg_extern]
fn static_names_iter() -> impl std::iter::Iterator<Item = Option<&'static str>> {vec![Some("Brandy"), Some("Sally"), None, Some("Anchovy")].into_iter()}

#[pg_extern]
fn static_names_2d() -> Vec<Vec<Option<&'static str>>> {
    vec![vec![Some("Brandy"), Some("Sally"), None, Some("Anchovy")],
        vec![Some("Eric"), Some("David")],
        vec![Some("ZomboDB"), Some("PostgreSQL"), Some("Elasticsearch")],
    ]
}

#[pg_extern]
fn static_names_2d_iter() -> impl std::iter::Iterator<Item = Vec<Option<&'static str>>> {
    vec![vec![Some("Brandy"), Some("Sally"), None, Some("Anchovy")],
        vec![Some("Eric"), Some("David")],
        vec![Some("ZomboDB"), Some("PostgreSQL"), Some("Elasticsearch")],
    ]
    .into_iter()}

验证

执行 cargo pgx run pg13

installing extension
      Copying control file to `/home/wdy/.pgx/13.3/pgx-install/share/postgresql/extension/my_extension.control`
      Copying shared library to `/home/wdy/.pgx/13.3/pgx-install/lib/postgresql/my_extension.so`
      Writing extension schema to `/home/wdy/.pgx/13.3/pgx-install/share/postgresql/extension/my_extension--1.0.sql`
     Finished installing my_extension
     Starting Postgres v13 on port 28813
     Re-using existing database my_extension
psql (13.3)
Type "help" for help.

my_extension=# drop extension my_extension;create extension my_extension;
DROP EXTENSION
CREATE EXTENSION
my_extension=# select static_names();
        static_names         
-----------------------------
 {King,Eastern,NULL,Sun}
(1 row)

my_extension=# select static_names_iter();
 static_names_iter 
-------------------
 Brandy
 Sally

 Anchovy
(4 rows)

my_extension=# select static_names_2d();
                                   static_names_2d                                   
-------------------------------------------------------------------------------------
 {"{Brandy,Sally,NULL,Anchovy}","{Eric,David}","{ZomboDB,PostgreSQL,Elasticsearch}"}
(1 row)

my_extension=# select static_names_2d_iter();
        static_names_2d_iter        
------------------------------------
 {Brandy,Sally,NULL,Anchovy}
 {Eric,David}
 {ZomboDB,PostgreSQL,Elasticsearch}
(3 rows)

残缺代码
残缺代码

AGGREGATE 扩大
Postgres 能够通过 CREATE AGGREGATE 定义一个新的汇集函数。一个简略的汇集函数蕴含一个或两个一般的函数:

  1. 状态转移函数 sfunc
  2. 可选的最终计算函数 ffunc
sfunc(internal-state, next-data-values) ---> next-internal-state
ffunc(internal-state) ---> aggregate-value
  1. 可能还须要定义汇集函数外部状态的数据类型 stype
  2. 外部函数状态初始值 initcond , 是一个类型 text 的值。

那么咱们只需参考下面的样例 开发对应的 sfunc, ffunc ,stype,而后再应用 CREATE AGGREGATE 创立汇集函数。​

创立扩大
执行上面命令
cargo pgx new my_agg

定义外部状态类型 stype

类型 IntegerAvgState 用于汇集函数计算过程中存储中间状态数据

#[derive(Serialize, Deserialize, PostgresType)]
pub struct IntegerAvgState {
    sum: i32,
    n: i32,
}
impl Default for IntegerAvgState {fn default() -> Self {Self { sum: 0, n: 0}
    }
}
impl IntegerAvgState {fn acc(&self, v: i32) -> Self {
        Self {
            sum: self.sum + v,
            n: self.n + 1,
        }
    }
    fn finalize(&self) -> i32 {self.sum / self.n}
}

状态转移函数 sfunc

#[pg_extern]
fn integer_avg_state_func(
    internal_state: IntegerAvgState,
    next_data_value: i32,
) -> IntegerAvgState {internal_state.acc(next_data_value)
}

最终计算函数 ffunc

#[pg_extern]
fn integer_avg_final_func(internal_state: IntegerAvgState) -> i32 {internal_state.finalize()
}

创立汇集函数

extension_sql!(
    r#"
    CREATE AGGREGATE MYAVG (integer)
    (
        sfunc = integer_avg_state_func,
        stype = IntegerAvgState,
        finalfunc = integer_avg_final_func,
        initcond = '{"sum": 0,"n": 0}'
    );
    "#
);

残缺代码 ​

验证汇集函数
执行 cargo pgx run pg12 进入到 postgres 的 psql 命令行界面,而后执行上面操作

my_agg=# DROP EXTENSION my_agg; CREATE EXTENSION my_agg;
DROP EXTENSION
CREATE EXTENSION
my_agg=#  create table t (c integer);
CREATE TABLE
my_agg=# insert into t (c) values (1), (2), (3);
INSERT 0 3
my_agg=# select MYAVG(c) from t;
 myavg 
-------
     2
(1 row)

my_agg=# drop table t;
DROP TABLE
my_agg=#  create table t (c integer,b text);
CREATE TABLE
my_agg=# insert into t (c,b) values (1,'king'), (2,'eastern'), (3,'sun');
INSERT 0 3
my_agg=# select MYAVG(c) from t;
 myavg 
-------
     2
(1 row)

残缺代码
残缺代码

TOPN AGGREGATE 扩大
当初咱们实现一个略微简单一些的汇集函数,求表中某列 value 最大的 10 个数值,经典的办法就是采纳最小堆实现。Rust 规范库中 BinaryHeap 默认是最大堆,通过应用 Reverse 进行封装,就能够失去一个最小堆。​

创立扩大

cargo pgx new my_topn

定义外部状态类型 stype

#[derive(Serialize, Deserialize, PostgresType)]
pub struct TopState {
    min_heap:  BinaryHeap<Reverse<i32>>,
    n:usize,
}
impl Default for TopState {fn default() -> Self {Self { min_heap:BinaryHeap::new(),n:10 }
    }
}

为 TopState 实现 acc 办法,用于状态转移,更新最小堆

impl TopState {fn acc(& mut self, v: i32)  {if self.min_heap.len()<self.n{self.min_heap.push(Reverse(v));
            return 
        }

        // 取出以后最小堆上的最小值
        let top = self.min_heap.peek().unwrap().0;
        // 如果比最小值还小 , 必定不会是要求的最大的 10 个数值, 间接抛弃
        if v<=top{return}

        // 插入到最小堆中,而后移除堆中的最小值
        self.min_heap.push(Reverse(v));
        self.min_heap.pop().unwrap();
        return 

    }

}

状态转移函数 sfunc

#[pg_extern]
fn integer_topn_state_func(
   mut internal_state: TopState,
    next_data_value: i32,
) -> TopState {internal_state.acc(next_data_value);
    internal_state
}

最终计算函数 ffunc

#[pg_extern]
fn integer_topn_final_func(internal_state: TopState) -> Vec<i32> {internal_state.min_heap.into_sorted_vec().iter().map(|x|x.0).collect()}

创立汇集函数 MYMAXN

extension_sql!(
    r#"
    CREATE AGGREGATE MYMAXN (integer)
    (
        sfunc = integer_topn_state_func,
        stype = TopState,
        finalfunc = integer_topn_final_func,
        initcond = '{"min_heap":[],"n":10}'
    );
    "#
);

验证汇集函数 MYMAXN

执行 cargo pgx run pg13 进入到 postgres 的 psql 命令行界面 执行上面 sql 命令创立测试数据

create table people
(
    id        integer,
    name      varchar(32),
    age       integer,
    grade     numeric(4, 2),
    birthday  date,
    logintime timestamp
);



insert into people
select generate_series(1,20) as id,
md5(random()::text) as name,
(random()*100)::integer as age,
(random()*99)::numeric(4,2) as grade,
now() - ((random()*1000)::integer||'day')::interval as birthday,
clock_timestamp() as logintime;

失去测试数据如下

my_topn=# select age from people ;
 age 
-----
  28
  62
   3
  20
  46
  23
  74
  19
  46
  26
  70
  90
  22
  45
  30
  46
  43
  70
  78
  96
(20 rows)

执行汇集函数 MYMAXN

my_topn=# select MYMAXN(age) from people;
             mymaxn              
---------------------------------
 {96,90,78,74,70,70,62,46,46,46}
(1 row)

残缺代码

留神
如果没有指定 initcond,则在创立 extension 的时候会报如下的谬误提醒

ERROR:  must not omit initial value when transition function is strict and transition type is not compatible with input type

升级版 TOPN

同时返回表中中指定列最大的 n 个值和最小的 n 个值。在原有的数据结构中新增一个最大堆即可。​

定义外部状态类型 stype

#[derive(Serialize, Deserialize, PostgresType)]
pub struct TopState {
    min_heap:  BinaryHeap<Reverse<i32>>,
    max_heap:  BinaryHeap<i32>,
    n:usize,
}
impl Default for TopState {fn default() -> Self {Self { min_heap:BinaryHeap::new(),max_heap:BinaryHeap::new(),n:10}
    }
}

为 TopState 实现 acc 办法,用于状态转移,更新最小堆

impl TopState {fn acc(& mut self, v: i32)  {if self.min_heap.len()<self.n{self.min_heap.push(Reverse(v));
            return 
        }

        // 取出以后最小堆上的最小值
        let top = self.min_heap.peek().unwrap().0;
        // 如果比最小值还小 , 必定不会是要求的最大的 10 个数值, 间接抛弃
        if v<=top{return}

        // 插入到最小堆中,而后移除堆中的最小值
        self.min_heap.push(Reverse(v));
        self.min_heap.pop().unwrap();
        return 

    }
    fn acc_max(& mut self, v: i32)  {if self.max_heap.len()<self.n{self.max_heap.push(v);
            return 
        }

        // 取出以后最大堆上的最大值
        let top = self.max_heap.peek().unwrap();
        // 如果比最大值还大 , 必定不会是要求的最小的 10 个数值, 间接抛弃
        if v>=*top{return}

        // 插入到最大堆中,而后移除堆中的最大值
        self.max_heap.push(v);
        self.max_heap.pop().unwrap();
        return 

    }

}

状态转移函数 sfunc

#[pg_extern]
fn integer_topn_state_func(
   mut internal_state: TopState,
    next_data_value: i32,
) -> TopState {internal_state.acc(next_data_value);
    internal_state.acc_max(next_data_value);
    internal_state
}

最终计算函数 ffunc

#[pg_extern]
fn integer_topn_final_func(internal_state: TopState) -> Vec<Vec<i32>> {
    vec![internal_state.min_heap.into_sorted_vec().iter().map(|x|x.0).collect(),
    internal_state.max_heap.into_sorted_vec(),]
}

创立汇集函数 MYMAXN

extension_sql!(
    r#"
    CREATE AGGREGATE MYMAXN (integer)
    (
        sfunc = integer_topn_state_func,
        stype = TopState,
        finalfunc = integer_topn_final_func,
        initcond = '{"min_heap":[],"max_heap":[],"n":10}'
    );
    "#
);

验证汇集函数 MYMAXN

my_topn=# select MYMAXN(age) from people;
                                mymaxn                                
----------------------------------------------------------------------
 {"{96,90,78,74,70,70,62,46,46,46}","{3,19,20,22,23,26,28,30,43,45}"}
(1 row)

残缺代码

参考文献

  1. https://mp.weixin.qq.com/s/9r…
  2. Postgres Compile and Install from source code https://wiki.postgresql.org/w…
  3. cargo pgx 命令介绍 https://github.com/zombodb/pg…
  4. https://blog.timescale.com/bl…
  5. https://www.postgresql.org/do…

正文完
 0