前言
Rust 语言是一门通用零碎级编程语言,无 GC 且能保障内存平安、并发平安和高性能而著称。自 2008 年开始由 Graydon Hoare 私人研发,2009 年失去 Mozilla 资助,2010 年首次公布 0.1.0 版本,用于 Servo 引擎的研发,于 2015 年 5 月 15 号公布 1.0 版本。自公布以来,截止到 2021 年的明天,经验六年的倒退,Rust 失去稳步回升,已逐步趋于成熟稳固。至 2016 年开始,截止到 2021 年,Rust 间断五年成为 StackOverflow 语言榜上最受欢迎的语言[1]。2021 年 2 月 9 号,Rust 基金会发表成立。华为、AWS、Google、微软、Mozilla、Facebook 等科技行业领军巨头退出 Rust 基金会,成为白金成员,以致力于在寰球范畴内推广和倒退 Rust 语言。
官方网如此介绍 Rust : 一门赋予每个人 构建牢靠且高效软件能力的语言。Rust 语言有三大劣势值得大家关注:
- 高性能。Rust 速度惊人且内存利用率极高。因为没有运行时和垃圾回收,它可能胜任对性能要求特地高的服务,能够在嵌入式设施上运行,还能轻松和其余语言集成。
- 可靠性。Rust 丰盛的类型零碎和所有权模型保障了内存平安和线程平安,让您在编译期就可能打消各种各样的谬误。
- 生产力。Rust 领有杰出的文档、敌对的编译器和清晰的谬误提示信息,还集成了一流的工具——包管理器和构建工具,智能地主动补全和类型测验的多编辑器反对,以及主动格式化代码等等。
Rust 和 C 都是硬件间接形象
Rust 和 C 都是间接对硬件的形象,都可看作一种「可移植汇编程序」。Rust 和 C 都能管制数据结构的内存布局、整数大小、栈与堆内存调配、指针间接寻址等,并且个别都能翻译成可了解的机器代码,编译器很少插入 “ 魔法 ”。即使 Rust 比 C 有更高层次的构造,如迭代器、特质(trait)和智能指针,它们也被设计为可预测地优化为简略的机器代码(又称 “ 零老本形象 ”)。Rust 的类型的内存布局很简略,例如,可增长的字符串 String 和 Vec 正好是{byte*, capacity, length}。Rust 没有任何像 Cpp 里的 挪动 或 复制构造函数 这样的概念,所以对象的传递保障不会比传递指针或 memcpy 更简单。
总的来说,Rust 有媲美 C 的高性能,同时又具备高效的开发生产力,同时通过 FFI 能够高效的和其余语言如 C 进行混合编程或相互调用。本篇文章次要介绍如何利用 Rust 开发 postgres extension。
部署开发环境
配置 RUST 开发环境
参考 https://www.rust-lang.org/too… IDE 举荐 https://code.visualstudio.com/ + https://rls.booyaa.wtf/
配置 postgres 开发环境
ubuntu
sudo apt-get install build-essential libreadline-dev zlib1g-dev flex bison libxml2-dev libxslt-dev libssl-dev libxml2-utils xsltproc
Red hat
sudo yum install -y bison-devel readline-devel zlib-devel openssl-devel wget
sudo yum groupinstall -y 'Development Tools'
cargo pgx 子命令装置cargo install cargo-pgx
接下来执行命令 cargo pgx init。
Discovered Postgres v13.3, v12.7, v11.12, v10.17
Downloading Postgres v12.7 from https://ftp.postgresql.org/pub/source/v12.7/postgresql-12.7.tar.bz2
Downloading Postgres v13.3 from https://ftp.postgresql.org/pub/source/v13.3/postgresql-13.3.tar.bz2
Downloading Postgres v10.17 from https://ftp.postgresql.org/pub/source/v10.17/postgresql-10.17.tar.bz2
Downloading Postgres v11.12 from https://ftp.postgresql.org/pub/source/v11.12/postgresql-11.12.tar.bz2
Removing /home/wdy/.pgx/10.17
Untarring Postgres v10.17 to /home/wdy/.pgx/10.17
Configuring Postgres v10.17
Removing /home/wdy/.pgx/11.12
Untarring Postgres v11.12 to /home/wdy/.pgx/11.12
Untarring Postgres v12.7 to /home/wdy/.pgx/12.7
Configuring Postgres v11.12
Removing /home/wdy/.pgx/13.3
Untarring Postgres v13.3 to /home/wdy/.pgx/13.3
Configuring Postgres v12.7
Compiling Postgres v10.17
Configuring Postgres v13.3
Compiling Postgres v11.12
Compiling Postgres v12.7
Compiling Postgres v13.3
Installing Postgres v10.17 to /home/wdy/.pgx/10.17/pgx-install
Installing Postgres v11.12 to /home/wdy/.pgx/11.12/pgx-install
Installing Postgres v12.7 to /home/wdy/.pgx/12.7/pgx-install
Installing Postgres v13.3 to /home/wdy/.pgx/13.3/pgx-install
Validating /home/wdy/.pgx/10.17/pgx-install/bin/pg_config
Initializing data directory at /home/wdy/.pgx/data-10
Validating /home/wdy/.pgx/11.12/pgx-install/bin/pg_config
Initializing data directory at /home/wdy/.pgx/data-11
Validating /home/wdy/.pgx/12.7/pgx-install/bin/pg_config
Initializing data directory at /home/wdy/.pgx/data-12
Validating /home/wdy/.pgx/13.3/pgx-install/bin/pg_config
Initializing data directory at /home/wdy/.pgx/data-13
这个命令会下载版本 v10,v11,v12,v13 的 postgres 而后编译到目录~/.pgx/ 中。这个下载步骤是必须的,因为后续 pgx 会为每中版本的 postgres 的 header 文件生成对应的 Rust bindings,以及后续 pgx 的测试框架中也会用到。
开发一个简略的 extension
创立一个 extension 我的项目
应用上面命令创立一个名为 my_extension 的我的项目
cargo pgx new my_extension
命令执行后,会生成如下一个目录构造
├── Cargo.toml
├── my_extension.control
├── sql
│ ├── lib.generated.sql
│ └── load-order.txt
└── src
└── lib.rs
sql/lib.generated.sql 内容如下
CREATE OR REPLACE FUNCTION "hello_my_extension"() RETURNS text STRICT LANGUAGE c AS 'MODULE_PATHNAME', 'hello_my_extension_wrapper';
src/lib.rs 内容如下
use pgx::*;
pg_module_magic!();
#[pg_extern]
fn hello_my_extension() -> &'static str {"Hello, my_extension"}
#[cfg(any(test, feature = "pg_test"))]
mod tests {
use pgx::*;
#[pg_test]
fn test_hello_my_extension() {assert_eq!("Hello, my_extension", crate::hello_my_extension());
}
}
#[cfg(test)]
pub mod pg_test {pub fn setup(_options: Vec<&str>) {// perform one-off initialization when the pg_test framework starts}
pub fn postgresql_conf_options() -> Vec<&'static str> {
// return any postgresql.conf settings that are required for your tests
vec![]}
}
能够看到 pgx 曾经默认给出了最简略的扩大实现。#[pg_extern] 宏所润饰的函数就是咱们要实现的 extension 函数,mod tests , pub mod pg_test 是 pgx 曾经为咱们写好了的测试模块,用于编写相干测试代码。
pgx 默认曾经给咱们写好了名为 hello_my_extension 的 extension,性能很简略,就是返回 “Hello, my_extension” 字符串
运行 extension
cd my_extension
cargo pgx run pg13 # or pg10 or pg11 or pg12
应用 cargo pgx run 后跟参数 pg13 或 pg10 或 pg11 或 pg12,对应不同的 postgres 版本,cargo pgx run 会把 extension 编译为一个 .so 共享库文件,复制到对应版本的 ~/.pgx/ 目录中,而后启动 Postgres 实例,通过 psql 连贯到和 extension 同名的数据库上。编译实现后,开发者就会处于 psql 的 shell 界面中,能够调用 extension 进行测试了。
这里咱们执行 cargo pgx run pg12 失去输入如下:
$ cargo pgx run pg12
building extension with features `pg12`
"cargo" "build" "--features" "pg12" "--no-default-features"
Updating crates.io index
Downloaded generic-array v0.14.4
Downloaded humantime v2.1.0
Downloaded lazycell v1.3.0
Downloaded shlex v1.0.0
Downloaded stable_deref_trait v1.2.0
Downloaded termcolor v1.1.2
Downloaded typenum v1.13.0
Downloaded time-macros v0.1.1
Downloaded which v3.1.1
Downloaded atomic-traits v0.2.0
Downloaded seahash v4.1.0
Downloaded uuid v0.8.2
Downloaded as-slice v0.1.5
Downloaded pgx v0.1.21
Downloaded peeking_take_while v0.1.2
Downloaded proc-macro-hack v0.5.19
Downloaded rustc-hash v1.1.0
Downloaded serde_cbor v0.11.1
Downloaded time-macros-impl v0.1.2
Downloaded bindgen v0.58.1
Downloaded cexpr v0.4.0
Downloaded env_logger v0.8.4
Downloaded standback v0.2.17
Downloaded generic-array v0.12.4
Downloaded getrandom v0.2.3
Downloaded clang-sys v1.2.0
Downloaded glob v0.3.0
Downloaded const_fn v0.4.8
Downloaded generic-array v0.13.3
Downloaded half v1.7.1
Downloaded hash32 v0.1.1
Downloaded enum-primitive-derive v0.2.1
Downloaded heapless v0.6.1
Downloaded build-deps v0.1.4
Downloaded libloading v0.7.0
Downloaded nom v5.1.2
Downloaded time v0.2.27
Downloaded pgx-macros v0.1.21
Downloaded pgx-pg-sys v0.1.21
Downloaded 39 crates (2.0 MB) in 2.74s
Compiling version_check v0.9.3
Compiling libc v0.2.98
Compiling cfg-if v1.0.0
Compiling autocfg v1.0.1
Compiling proc-macro2 v1.0.27
Compiling unicode-xid v0.2.2
Compiling syn v1.0.73
Compiling memchr v2.4.0
Compiling lazy_static v1.4.0
Compiling serde_derive v1.0.126
Compiling tinyvec_macros v0.1.0
Compiling getrandom v0.1.16
Compiling matches v0.1.8
Compiling serde v1.0.126
Compiling log v0.4.14
Compiling crossbeam-utils v0.8.5
Compiling byteorder v1.4.3
Compiling glob v0.3.0
Compiling percent-encoding v2.1.0
Compiling crc32fast v1.2.1
Compiling ryu v1.0.5
Compiling adler v1.0.2
Compiling ppv-lite86 v0.2.10
Compiling crossbeam-epoch v0.9.5
Compiling regex-syntax v0.6.25
Compiling bitflags v1.2.1
Compiling mime v0.3.16
Compiling typenum v1.13.0
Compiling rayon-core v1.9.1
Compiling scopeguard v1.1.0
Compiling serde_json v1.0.64
Compiling unicode-width v0.1.8
Compiling itoa v0.4.7
Compiling base64 v0.11.0
Compiling httpdate v0.3.2
Compiling xml-rs v0.8.3
Compiling proc-macro-hack v0.5.19
Compiling humantime v2.1.0
Compiling vec_map v0.8.2
Compiling semver-parser v0.7.0
Compiling ansi_term v0.11.0
Compiling unescape v0.1.0
Compiling bindgen v0.58.1
Compiling termcolor v1.1.2
Compiling strsim v0.8.0
Compiling either v1.6.1
Compiling shlex v1.0.0
Compiling lazycell v1.3.0
Compiling peeking_take_while v0.1.2
Compiling rustc-hash v1.1.0
Compiling const_fn v0.4.8
Compiling stable_deref_trait v1.2.0
Compiling heapless v0.6.1
Compiling half v1.7.1
Compiling cfg-if v0.1.10
Compiling once_cell v1.8.0
Compiling seahash v4.1.0
Compiling libloading v0.7.0
Compiling tinyvec v1.2.0
Compiling unicode-bidi v0.3.5
Compiling unicase v2.6.0
Compiling nom v5.1.2
Compiling standback v0.2.17
Compiling generic-array v0.14.4
Compiling time v0.2.27
Compiling memoffset v0.6.4
Compiling miniz_oxide v0.4.4
Compiling rayon v1.5.1
Compiling num-traits v0.2.14
Compiling form_urlencoded v1.0.1
Compiling hash32 v0.1.1
Compiling build-deps v0.1.4
Compiling clang-sys v1.2.0
Compiling textwrap v0.11.0
Compiling semver v0.9.0
Compiling unicode-normalization v0.1.19
Compiling rustc_version v0.2.3
Compiling aho-corasick v0.7.18
Compiling quote v1.0.9
Compiling atty v0.2.14
Compiling dirs-sys v0.3.6
Compiling socks v0.3.3
Compiling num_cpus v1.13.0
Compiling which v3.1.1
Compiling getrandom v0.2.3
Compiling crossbeam-channel v0.5.1
Compiling idna v0.2.3
Compiling mime_guess v2.0.3
Compiling atomic-traits v0.2.0
Compiling generic-array v0.13.3
Compiling generic-array v0.12.4
Compiling colored v2.0.0
Compiling clap v2.33.3
Compiling rand_core v0.5.1
Compiling dirs v3.0.2
Compiling regex v1.5.4
Compiling uuid v0.8.2
Compiling flate2 v1.0.20
Compiling url v2.2.2
Compiling cexpr v0.4.0
Compiling as-slice v0.1.5
Compiling rand_chacha v0.2.2
Compiling crossbeam-deque v0.8.0
Compiling env_proxy v0.4.1
Compiling env_logger v0.8.4
Compiling rand v0.7.3
Compiling rttp_client v0.1.0
Compiling thiserror-impl v1.0.26
Compiling time-macros-impl v0.1.2
Compiling enum-primitive-derive v0.2.1
Compiling time-macros v0.1.1
Compiling thiserror v1.0.26
Compiling serde-xml-rs v0.4.1
Compiling toml v0.5.8
Compiling serde_cbor v0.11.1
Compiling pgx-utils v0.1.21
Compiling pgx-pg-sys v0.1.21
Compiling pgx-macros v0.1.21
Compiling pgx v0.1.21
Compiling my_extension v0.0.0 (/home/wdy/gitlab/valid-my-idea/rust-dig/pgextionsion/my_extension)
Finished dev [unoptimized + debuginfo] target(s) in 1m 28s
installing extension
Copying control file to `/home/wdy/.pgx/12.7/pgx-install/share/postgresql/extension/my_extension.control`
Copying shared library to `/home/wdy/.pgx/12.7/pgx-install/lib/postgresql/my_extension.so`
Writing extension schema to `/home/wdy/.pgx/12.7/pgx-install/share/postgresql/extension/my_extension--1.0.sql`
Finished installing my_extension
Starting Postgres v12 on port 28812
Creating database my_extension
psql (12.7)
Type "help" for help.
my_extension=#
能够看到最初进入了名为 my_extension 的 psql 界面中。接下来创立 EXTENSION 并调用,能够看到失常输入字符串
my_extension=# create extension my_extension;
CREATE EXTENSION
my_extension=# select hello_my_extension();
hello_my_extension
---------------------
Hello, my_extension
(1 row)
Array
Rust 的 Vec 类型对应 postgrs 中的 ARRAY[]::,如果某个扩大函数要返回多个雷同类型的 value,能够应用 Vec 进行返回,如下例中的 static_names 和 static_names_2d,最终在 psql 终端调用后返回的是单行单列。如果某个扩大返回的是一个 Iterator,那么返回的是一个多行单列数据,参见上面例子中的 static_names_iter 和 static_names_2d_iter。
代码
#[pg_extern]
fn static_names() -> Vec<Option<&'static str>> {vec![Some("King"), Some("Eastern"), None, Some("Sun")]
}
#[pg_extern]
fn static_names_iter() -> impl std::iter::Iterator<Item = Option<&'static str>> {vec![Some("Brandy"), Some("Sally"), None, Some("Anchovy")].into_iter()}
#[pg_extern]
fn static_names_2d() -> Vec<Vec<Option<&'static str>>> {
vec![vec![Some("Brandy"), Some("Sally"), None, Some("Anchovy")],
vec![Some("Eric"), Some("David")],
vec![Some("ZomboDB"), Some("PostgreSQL"), Some("Elasticsearch")],
]
}
#[pg_extern]
fn static_names_2d_iter() -> impl std::iter::Iterator<Item = Vec<Option<&'static str>>> {
vec![vec![Some("Brandy"), Some("Sally"), None, Some("Anchovy")],
vec![Some("Eric"), Some("David")],
vec![Some("ZomboDB"), Some("PostgreSQL"), Some("Elasticsearch")],
]
.into_iter()}
验证
执行 cargo pgx run pg13
installing extension
Copying control file to `/home/wdy/.pgx/13.3/pgx-install/share/postgresql/extension/my_extension.control`
Copying shared library to `/home/wdy/.pgx/13.3/pgx-install/lib/postgresql/my_extension.so`
Writing extension schema to `/home/wdy/.pgx/13.3/pgx-install/share/postgresql/extension/my_extension--1.0.sql`
Finished installing my_extension
Starting Postgres v13 on port 28813
Re-using existing database my_extension
psql (13.3)
Type "help" for help.
my_extension=# drop extension my_extension;create extension my_extension;
DROP EXTENSION
CREATE EXTENSION
my_extension=# select static_names();
static_names
-----------------------------
{King,Eastern,NULL,Sun}
(1 row)
my_extension=# select static_names_iter();
static_names_iter
-------------------
Brandy
Sally
Anchovy
(4 rows)
my_extension=# select static_names_2d();
static_names_2d
-------------------------------------------------------------------------------------
{"{Brandy,Sally,NULL,Anchovy}","{Eric,David}","{ZomboDB,PostgreSQL,Elasticsearch}"}
(1 row)
my_extension=# select static_names_2d_iter();
static_names_2d_iter
------------------------------------
{Brandy,Sally,NULL,Anchovy}
{Eric,David}
{ZomboDB,PostgreSQL,Elasticsearch}
(3 rows)
残缺代码
残缺代码
AGGREGATE 扩大
Postgres 能够通过 CREATE AGGREGATE 定义一个新的汇集函数。一个简略的汇集函数蕴含一个或两个一般的函数:
- 状态转移函数 sfunc
- 可选的最终计算函数 ffunc
sfunc(internal-state, next-data-values) ---> next-internal-state
ffunc(internal-state) ---> aggregate-value
- 可能还须要定义汇集函数外部状态的数据类型 stype
- 外部函数状态初始值 initcond , 是一个类型 text 的值。
那么咱们只需参考下面的样例 开发对应的 sfunc, ffunc ,stype,而后再应用 CREATE AGGREGATE 创立汇集函数。
创立扩大
执行上面命令cargo pgx new my_agg
定义外部状态类型 stype
类型 IntegerAvgState 用于汇集函数计算过程中存储中间状态数据
#[derive(Serialize, Deserialize, PostgresType)]
pub struct IntegerAvgState {
sum: i32,
n: i32,
}
impl Default for IntegerAvgState {fn default() -> Self {Self { sum: 0, n: 0}
}
}
impl IntegerAvgState {fn acc(&self, v: i32) -> Self {
Self {
sum: self.sum + v,
n: self.n + 1,
}
}
fn finalize(&self) -> i32 {self.sum / self.n}
}
状态转移函数 sfunc
#[pg_extern]
fn integer_avg_state_func(
internal_state: IntegerAvgState,
next_data_value: i32,
) -> IntegerAvgState {internal_state.acc(next_data_value)
}
最终计算函数 ffunc
#[pg_extern]
fn integer_avg_final_func(internal_state: IntegerAvgState) -> i32 {internal_state.finalize()
}
创立汇集函数
extension_sql!(
r#"
CREATE AGGREGATE MYAVG (integer)
(
sfunc = integer_avg_state_func,
stype = IntegerAvgState,
finalfunc = integer_avg_final_func,
initcond = '{"sum": 0,"n": 0}'
);
"#
);
残缺代码
验证汇集函数
执行 cargo pgx run pg12 进入到 postgres 的 psql 命令行界面,而后执行上面操作
my_agg=# DROP EXTENSION my_agg; CREATE EXTENSION my_agg;
DROP EXTENSION
CREATE EXTENSION
my_agg=# create table t (c integer);
CREATE TABLE
my_agg=# insert into t (c) values (1), (2), (3);
INSERT 0 3
my_agg=# select MYAVG(c) from t;
myavg
-------
2
(1 row)
my_agg=# drop table t;
DROP TABLE
my_agg=# create table t (c integer,b text);
CREATE TABLE
my_agg=# insert into t (c,b) values (1,'king'), (2,'eastern'), (3,'sun');
INSERT 0 3
my_agg=# select MYAVG(c) from t;
myavg
-------
2
(1 row)
残缺代码
残缺代码
TOPN AGGREGATE 扩大
当初咱们实现一个略微简单一些的汇集函数,求表中某列 value 最大的 10 个数值,经典的办法就是采纳最小堆实现。Rust 规范库中 BinaryHeap 默认是最大堆,通过应用 Reverse 进行封装,就能够失去一个最小堆。
创立扩大
cargo pgx new my_topn
定义外部状态类型 stype
#[derive(Serialize, Deserialize, PostgresType)]
pub struct TopState {
min_heap: BinaryHeap<Reverse<i32>>,
n:usize,
}
impl Default for TopState {fn default() -> Self {Self { min_heap:BinaryHeap::new(),n:10 }
}
}
为 TopState 实现 acc 办法,用于状态转移,更新最小堆
impl TopState {fn acc(& mut self, v: i32) {if self.min_heap.len()<self.n{self.min_heap.push(Reverse(v));
return
}
// 取出以后最小堆上的最小值
let top = self.min_heap.peek().unwrap().0;
// 如果比最小值还小 , 必定不会是要求的最大的 10 个数值, 间接抛弃
if v<=top{return}
// 插入到最小堆中,而后移除堆中的最小值
self.min_heap.push(Reverse(v));
self.min_heap.pop().unwrap();
return
}
}
状态转移函数 sfunc
#[pg_extern]
fn integer_topn_state_func(
mut internal_state: TopState,
next_data_value: i32,
) -> TopState {internal_state.acc(next_data_value);
internal_state
}
最终计算函数 ffunc
#[pg_extern]
fn integer_topn_final_func(internal_state: TopState) -> Vec<i32> {internal_state.min_heap.into_sorted_vec().iter().map(|x|x.0).collect()}
创立汇集函数 MYMAXN
extension_sql!(
r#"
CREATE AGGREGATE MYMAXN (integer)
(
sfunc = integer_topn_state_func,
stype = TopState,
finalfunc = integer_topn_final_func,
initcond = '{"min_heap":[],"n":10}'
);
"#
);
验证汇集函数 MYMAXN
执行 cargo pgx run pg13 进入到 postgres 的 psql 命令行界面 执行上面 sql 命令创立测试数据
create table people
(
id integer,
name varchar(32),
age integer,
grade numeric(4, 2),
birthday date,
logintime timestamp
);
insert into people
select generate_series(1,20) as id,
md5(random()::text) as name,
(random()*100)::integer as age,
(random()*99)::numeric(4,2) as grade,
now() - ((random()*1000)::integer||'day')::interval as birthday,
clock_timestamp() as logintime;
失去测试数据如下
my_topn=# select age from people ;
age
-----
28
62
3
20
46
23
74
19
46
26
70
90
22
45
30
46
43
70
78
96
(20 rows)
执行汇集函数 MYMAXN
my_topn=# select MYMAXN(age) from people;
mymaxn
---------------------------------
{96,90,78,74,70,70,62,46,46,46}
(1 row)
残缺代码
留神
如果没有指定 initcond,则在创立 extension 的时候会报如下的谬误提醒
ERROR: must not omit initial value when transition function is strict and transition type is not compatible with input type
升级版 TOPN
同时返回表中中指定列最大的 n 个值和最小的 n 个值。在原有的数据结构中新增一个最大堆即可。
定义外部状态类型 stype
#[derive(Serialize, Deserialize, PostgresType)]
pub struct TopState {
min_heap: BinaryHeap<Reverse<i32>>,
max_heap: BinaryHeap<i32>,
n:usize,
}
impl Default for TopState {fn default() -> Self {Self { min_heap:BinaryHeap::new(),max_heap:BinaryHeap::new(),n:10}
}
}
为 TopState 实现 acc 办法,用于状态转移,更新最小堆
impl TopState {fn acc(& mut self, v: i32) {if self.min_heap.len()<self.n{self.min_heap.push(Reverse(v));
return
}
// 取出以后最小堆上的最小值
let top = self.min_heap.peek().unwrap().0;
// 如果比最小值还小 , 必定不会是要求的最大的 10 个数值, 间接抛弃
if v<=top{return}
// 插入到最小堆中,而后移除堆中的最小值
self.min_heap.push(Reverse(v));
self.min_heap.pop().unwrap();
return
}
fn acc_max(& mut self, v: i32) {if self.max_heap.len()<self.n{self.max_heap.push(v);
return
}
// 取出以后最大堆上的最大值
let top = self.max_heap.peek().unwrap();
// 如果比最大值还大 , 必定不会是要求的最小的 10 个数值, 间接抛弃
if v>=*top{return}
// 插入到最大堆中,而后移除堆中的最大值
self.max_heap.push(v);
self.max_heap.pop().unwrap();
return
}
}
状态转移函数 sfunc
#[pg_extern]
fn integer_topn_state_func(
mut internal_state: TopState,
next_data_value: i32,
) -> TopState {internal_state.acc(next_data_value);
internal_state.acc_max(next_data_value);
internal_state
}
最终计算函数 ffunc
#[pg_extern]
fn integer_topn_final_func(internal_state: TopState) -> Vec<Vec<i32>> {
vec![internal_state.min_heap.into_sorted_vec().iter().map(|x|x.0).collect(),
internal_state.max_heap.into_sorted_vec(),]
}
创立汇集函数 MYMAXN
extension_sql!(
r#"
CREATE AGGREGATE MYMAXN (integer)
(
sfunc = integer_topn_state_func,
stype = TopState,
finalfunc = integer_topn_final_func,
initcond = '{"min_heap":[],"max_heap":[],"n":10}'
);
"#
);
验证汇集函数 MYMAXN
my_topn=# select MYMAXN(age) from people;
mymaxn
----------------------------------------------------------------------
{"{96,90,78,74,70,70,62,46,46,46}","{3,19,20,22,23,26,28,30,43,45}"}
(1 row)
残缺代码
参考文献
- https://mp.weixin.qq.com/s/9r…
- Postgres Compile and Install from source code https://wiki.postgresql.org/w…
- cargo pgx 命令介绍 https://github.com/zombodb/pg…
- https://blog.timescale.com/bl…
- https://www.postgresql.org/do…