概述

本文档领导和帮忙KunlunBase用户评测和验证KunlunBase的各项重要性能。用户遵循本文档操作即可装置好KunlunBase集群并且体验和应用到KunlunBase的各次要性能,把本文档作为应用KunlunBase 的疾速入门手册。KunlunBase的残缺的文档请拜访 doc.kunlunbase.com
用户也可依据本文档来评测和验证KunlunBase各项性能工作失常。KunlunBase团队在进行KunlunBase产品研发过程中,会继续对其各项性能开发测试程序,并且应用自动化测试零碎每天运行所有测试程序,以便及时发现和解决问题,确保KunlunBase各项性能工作失常。同时,咱们会应用mysql和PostgreSQL 的测试集来针对 KunlunBase的存储节点和计算节点进行功能测试。对于KunlunBase的自动化测试零碎的日常测试后果,请拜访www.kunlunbase.com 查看。

KunlunBase 集群架构

KunlunBase 是一个分布式关系数据库管理系统,面向TB和PB级别海量数据处理,反对高吞吐量和低延时的极致性能来解决海量数据的高并发读写申请。它反对程度弹性扩容,提供强壮的事务ACID保障,高效易用的分布式查询处理,高可扩展性,高可用性和通明的分库分表数据处理性能,业务层和终端用户无感知的程度扩大能力,是典型的 NewSQL分布式数据库系统。

KunlunBase的上述技术能力确保用户能够在业务持续增长时,随时不停服减少服务器节点来扩充数据存储和解决&计算能力,并且在服务器节点产生故障或者断电时,不失落数据,并且不进行数据读写服务。确保用户业务继续以最高品质运行。

应用软件开发者只须要依照应用单节点关系数据库雷同的办法应用昆仑数据库,就能够失去所有上述NewSQL数据库的长处,齐全不须要思考数据的存储和治理的任何细节。用户能够应用PostgreSQL和MySQL两种连贯协定连贯到KunlunBase集群的计算节点并执行DDL和DML SQL语句。KunlunBase反对规范SQL,PostgreSQL的DDL 语法,以及PostgreSQL和MySQL 的公有DML 语法。因而,本来应用PostgreSQL和MySQL的应用程序不须要批改就能够应用KunlunBase。

KunlunBase反对 所有SQL 数据分析性能,能够执行TPC-H 和TPC-DS的所有 OLAP 剖析SQL语句,因而,用户能够把各类业务零碎中的数据继续流入KunlunBase集群,并且应用KunlunBase 集群对这些继续更新的数据做OLAP 剖析和数据挖掘,以便迅速发现变动和趋势,抓住转瞬即逝的商机,及时应答突发状况,取得微小的竞争劣势。

即便用户的数据量只有若干GB,也能够应用KunlunBase而获益。KunlunBase的fullsync性能比MySQL 自带的semisync(半同步)和group replication具备更高的可靠性 --- 这两种技术如果主备节点同时断电那么用户数据可能失落或者不统一,而fullsync 确保这种状况下不会失落用户数据。此时只部署一个存储集群(例如 一主两备)即可。当用户数据和业务负载持续上升时,能够随时按需减少更多的存储集群和计算节点,从而扩充数据存储和查问能力。

同时,用户能够通过读写拆散扩大KunlunBase的解决能力。通过应用读写拆散技术从备机查问数据,并安顿专属的计算节点用于OLAP剖析查问,能够确保OLAP剖析工作齐全不会影响OLTP 业务的性能。

应用KunlunBase

为KunlunBase 初始化服务器(BootStrap)

通过初始化(bootstrap)的服务器才能够装置KunlunBase集群。初始化服务器实现后,用户就能够应用XPanel集群管理工具装置KunlunBase集群。

本节介绍如何初始化计算机服务器。

应用脚本初始化服务器

  1. 下载脚本
    git clone https://gitee.com/zettadb/clo...
    cd cloudnative/cluster
  2. 填写配置文件,具体参数阐明参考http://www.kunlunbase.com:818...
  3. 填写完配置文件后,开始初始化服务器

     python setup_cluster_manager.py --config=cluster_and_node_mgr.json --action=install  bash -e clustermgr/install.sh

创立KunlunBase集群

以下操作皆须要在初始化服务器后进行

XPanel

  1. 进入xpanel web端:http://host:17000/KunlunXPanel

    a. host就是下载docker镜像并运行的服务器对应ip,能够通过ifconfig查看

    b. 示例中的17000就是在运行docker镜像时映射的商品

  2. 首次进入,账号和明码都是super_dba, 元数据节点用主的ip和端口就行

  1. 随后会须要批改明码,批改完明码后,进入到xpanel主界面

  1. 减少计算机列表
    a. 查看须要减少的计算机是否存在列表中,不存在则进行该步骤,存在则跳过
    b. 目前只能一个一个增加,该列表里的对应的服务器必须装置有node_mgr,且配置正确
    c. 系统管理 -- 用户治理 -- 新增 -- 填写对应的参数 -- 确认

  1. 在新增完计算机后,就能够创立集群了

    a. 集群治理 -- 集群列表 -- 新增

    b. 依据需要填写对应的参数值

    c. 等集群装置好后,能够在 集群展现 界面查看对应的节点信息和主备关系
    集群治理 -- 集群列表 -- 点击对应的集群

应用KunlunBase做数据读写

各编程语言筹备应用KunlunBase

以后 KunlunBase 同时反对 postgres 和 mysql 协定。

计算节点 mysql 协定端口号能够通过进入计算节点 pg 端口,应用show mysql_port; 查看

命令行连贯
  • 连贯 Kunlunbase pg协定,能够通过tcp和url的形式进行连贯

    • Tcp:psql -h host -p port -U username -d databaseName
    • url: psql postgres://userName:[email protected]:port/databaseName
  • 连贯KunlunBase mysql协定,能够通过tcp和socket的形式进行连贯

    • TCP:mysql -h host -P port -ppassword -u userName
    • 留神:-p和明码之间不能有空格
各语言connector下载方式
  • go 和 rust 都能够间接通过自带的包管理器下载所需依赖,不须要额定装置

    • go举荐1.17.5版本或者更高版本
    • rust举荐1.59.0版本或者更高版本

java

在maven仓库抉择好对应的jar包版本后,点击 jar 就能够下载对应jar包

  • mysql connector/j

    • https://dev.mysql.com/downloa... mysql官网下载
    • https://mvnrepository.com/art... maven仓库
  • Postgresql JDBC Driver

    • https://mvnrepository.com/art... maven仓库
  • mariadb connector/j - 3.x版本

    • https://mariadb.org/connector... mariadb官网,官网只有最新的3.x版本
    • https://mvnrepository.com/art... maven仓库
  • mariadb connector/j - 2.x版本

    • http://zettatech.tpddns.cn:14... 泽拓科技官网
    • 因为官网2.0版本在连贯计算节点会有一些bug,所以不倡议间接应用mariadb官网的2.x版本connector

python

  • Psycopg2

    • pip install psycopg2 通过pip装置
    • pip install psycopg2==$version 通过pip装置
    • https://github.com/psycopg/ps... 通过setup.pg build
  • mysql-connector/python

    • https://downloads.mysql.com/a... mysql官网
    • pip install mysql-connector-python 通过pip装置8.x版本mysql connector/python
    • https://github.com/mysql/mysq... 通过setup.pg build
  • pymysql

    • pip install pymysql 通过pip装置
    • https://github.com/PyMySQL/Py... 通过setup.pg build

nodejs

  • pg

    • npm install pg 通过npm工具装置pg connector
  • mysql

    • npm install mysql 通过npm工具装置mysql connector
    • https://downloads.mysql.com/a... mysql官网下载

php

  • mysql

    • https://downloads.mysql.com/a... mysql官网下载

c

  • postgresql

    • sudo apt-get install libpq++-dev -y ubuntu装置pg依赖
  • mysql

    • sudo apt-get install libmysql++-dev -y ubuntu装置mysql依赖

c#

  • postgresql

    • dotnet add package Npgsql
  • mysql

    • dotnet add package MySql.Data

c++

  • postgresql

    sudo apt install libpq-devgit clone https://github.com/jtv/libpqxx.gitcd libpqxx./configuremakesudo make install
  • mysql

    • sudo apt-get install libmysqlclient-dev 装置c++连贯mysql的依赖
    • https://downloads.mysql.com/a... mysql官网connector下载
应用各编程语言对KunlunBase 执行增删改查DML语句

以下所有示例能够在https://gitee.com/zettadb/clo...这里下载

  • pg相干示例在cloudnative/smoke目录下
  • mysql相干示例在cloudnative/smoke/somkeTest-mysql目录下

java

  • Postgresql-JDBC-Driver
package kunlun.test;/* * Copyright (c) 2019 ZettaDB inc. All rights reserved. * This source code is licensed under Apache 2.0 License, * combined with Common Clause Condition 1.0, as detailed in the NOTICE file. */import java.io.BufferedReader;import java.io.FileReader;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.Statement;import java.util.ArrayList;import java.util.LinkedList;import java.util.List;import java.util.Properties;public class SmokeTest {    static {        try {            Class.forName("org.postgresql.Driver");            //Class.forName("com.mysql.cj.jdbc.Driver");        } catch (Exception ex) {        }    }    public static Connection getConnection(String user,                                           String password,                                           String host,                                           int port,                                           String dbname) {        String proto = "postgresql";        Properties props = new Properties();        props.setProperty("user", user);        props.setProperty("password", password);        String url = "jdbc:" + proto+"://" + host + ":" + port + "/" + dbname;        try {            return DriverManager.getConnection(url, props);        } catch (Exception ex) {            ex.printStackTrace();            return null;        }    }    public static void smokeTest(Connection conn) throws Exception{        boolean autocommit = conn.getAutoCommit();        System.out.println("default autocommit: " + autocommit);        conn.setAutoCommit(true);        Statement st =conn.createStatement();        st.execute("SET client_min_messages TO 'warning';");        st.execute("drop table if exists t1;");        st.execute("RESET client_min_messages;");        String createSql = "create table t1(id integer primary key, " +                           "info text, wt integer);";        st.execute(createSql);        st.execute("insert into t1(id,info,wt) values(1, 'record1', 1);");        st.execute("insert into t1(id,info,wt) values(2, 'record2', 2);");        st.execute("update t1 set wt = 12 where id = 1;");        ResultSet res1 = st.executeQuery("select * from t1;");        System.out.printf("res1:%s%n", showResults(res1).toString());        res1.close();        st.close();        String pstr = "select * from t1 where id=?";        PreparedStatement ps = conn.prepareStatement(pstr);        ps.setInt(1, 1);        ResultSet pres = ps.executeQuery();        System.out.printf("pres1:%s%n", showResults(pres).toString());        ps.setInt(1, 2);        pres = ps.executeQuery();        System.out.printf("pres2:%s%n", showResults(pres).toString());        ps.close();        pstr = "update t1 set info=? , wt=? where id=?";        ps = conn.prepareStatement(pstr);        ps.setString(1, "Rec1");        ps.setInt(2, 2);        ps.setInt(3, 1);        ps.execute();        ps.setString(1, "Rec2");        ps.setInt(2, 3);        ps.setInt(3, 2);        ps.execute();        ps.close();        st =conn.createStatement();        ResultSet res2 = st.executeQuery("select * from t1;");        System.out.printf("res2:%s%n", showResults(res2).toString());        res2.close();        st.execute("delete from t1 where id = 1;");        ResultSet res3 = st.executeQuery("select * from t1;");        System.out.printf("res3:%s%n", showResults(res3).toString());        res3.close();        st.execute("drop table t1;");        st.close();        conn.setAutoCommit(autocommit);    }    /*     * We do the following actions:     * 1 Create the able     * 2 Insert two records     * 3 Update the first record.     * 4 Query the records(res1).     * 5 Delete the second record.     * 6 Query the records again(res2).     * 7 Drop the table.     */    public static void smokeTestFile(Connection conn, String cmdfile) throws Exception{        boolean autocommit = conn.getAutoCommit();        System.out.println("default autocommit: " + autocommit);        conn.setAutoCommit(true);        Statement st =conn.createStatement();        BufferedReader br = new BufferedReader(new FileReader(cmdfile));        String cmd = null;        do {            cmd = br.readLine();            if (cmd == null) {                break;            }            if (cmd.toUpperCase().startsWith("SELECT")) {                ResultSet res = st.executeQuery(cmd);                System.out.printf("sql:%s, res:%s%n", cmd,                                  showResults(res).toString());                res.close();            } else {                st.execute(cmd);            }        } while (cmd != null);        br.close();        st.close();        conn.setAutoCommit(autocommit);    }    private static List<List<String>> showResults(ResultSet res)        throws Exception {        LinkedList<List<String>> results = new LinkedList<>();        int cols = res.getMetaData().getColumnCount();        while (res.next()) {            List<String> row = new ArrayList<>(cols);            for (int i = 0; i < cols; i++) {                row.add(res.getString(i + 1));            }            results.addLast(row);        }        return results;    }    public static void test1(String[] args) throws Exception{        String host = args[0];        int port = Integer.valueOf(args[1]);        String user = "abc";        String password = "abc";        String database = "postgres";        Connection conn = getConnection(user, password, host, port, database);        smokeTest(conn);        conn.close();    }    public static void main(String[] args) throws Exception {        test1(args);    }}
  • mysql connector/j

    • 编译:javac test.java
    • 运行:java -cp .:./mysql-connector-java-$version.jar test
import java.sql.DriverManager;import java.util.ArrayList;import java.sql.*;import java.util.*;public class mysql {    // MySQL 8.0 以下版本 - JDBC 驱动名及数据库 URL    static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";    // MySQL 8.0 以上版本 - JDBC 驱动名及数据库 URL    //static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";    // 数据库的用户名与明码,须要依据本人的设置    static final String USER = "user";    static final String PASS = "pwd";    public static void main(String[] args) {        Connection conn = null;        Statement stmt = null;        //解决传入的参数        String host = Arrays.toString(args);        String urls1 = "jdbc:mysql://" + host + "/postgres";        String urls2 = urls1.replace("[","");        String urls = urls2.replace("]","");        try{            // 注册 JDBC 驱动            Class.forName(JDBC_DRIVER);            // 关上链接            System.out.println("连贯数据库" + host + "...");            conn = DriverManager.getConnection(urls,USER,PASS);            // 执行查问            System.out.println(" 实例化Statement对象...");            stmt = conn.createStatement();            ArrayList sqls = new ArrayList();            sqls.add("drop table if exists myjdbc_sm;");            sqls.add("create table myjdbc_sm(a int primary key, b text);");            sqls.add("insert into myjdbc_sm values(1, 'abc'),(2, 'bcd'),(3, 'cde');");            sqls.add("select * from myjdbc_sm;");            sqls.add("update myjdbc_sm set b = 'def' where a = 1;");            sqls.add("select * from myjdbc_sm;");            sqls.add("delete from myjdbc_sm where a = 3;");            sqls.add("select * from myjdbc_sm;");            for (int i = 0; i <= 7; ++i){                String sql = (String) sqls.get(i);                System.out.println(sql);                if (sql == "select * from myjdbc_sm;"){                    ResultSet rs = stmt.executeQuery(sql);                    while(rs.next()) {                        int a = rs.getInt("a");                        String b = rs.getString("b");                        System.out.print(" a: " + a + " b: " + b + "\n");                    }                    rs.close();                }                else {                    stmt.execute(sql);                }            }            stmt.close();            conn.close();        }catch(SQLException se){            // 解决 JDBC 谬误            se.printStackTrace();        }catch(Exception e){            // 解决 Class.forName 谬误            e.printStackTrace();        }finally{            // 敞开资源            try{                if(stmt!=null) stmt.close();            }catch(SQLException se2){            }// 什么都不做            try{                if(conn!=null) conn.close();            }catch(SQLException se){                se.printStackTrace();            }        }        System.out.println("Goodbye!");    }}

python

  • Psycopg2
import psycopg2import sysdef test(hoststr, portstr):        intport = int(portstr)        conn = psycopg2.connect(host=hoststr, port=intport, user='abc', password='abc', database='postgres')        conn.autocommit = True        cur = conn.cursor()        sqls=["SET client_min_messages TO 'warning';",                "drop table if exists t1111",                "RESET client_min_messages;",                "create table t1111(id int primary key, info text, wt int)",                "insert into t1111(id,info,wt) values(1, 'record1', 1)",                "insert into t1111(id,info,wt) values(2, 'record2', 2)",                "update t1111 set wt = 12 where id = 1", "select * from t1111",                "delete from t1111 where id = 1", "select * from t1111",                "prepare q1(int) as select*from t1111 where id=$1",                "begin",                "execute q1(1)",                "execute q1(2)",                "prepare q2(text,int, int) as update t1111 set info=$1 , wt=$2 where id=$3",                "execute q2('Rec1',2,1)",                "commit",                "execute q2('Rec2',3,2)",                "drop table t1111"]        for sql in sqls:                res = cur.execute(sql+";")                print "command:%s, res:%s" % (sql, str(res))if __name__ == '__main__':        test(sys.argv[1], sys.argv[2])
  • mysql connector/python
import argparsefrom time import sleepfrom mysql import connectordef test(sql, host, port, user, pwd, db):    conn = connector.connect(buffered=True, host=host, port=port, user=user, passwd=pwd, database=db, ssl_disabled=True)    cur = conn.cursor()    print(sql)    if sql == 'select * from mysql_connector_python;':        cur.execute(sql)        rs = cur.fetchall()        srs = str(rs)        srs = srs.replace('[(', '')        srs = srs.replace(')]', '')        srs = srs.replace('), (', '\n------\n')        srs = srs.replace(',', ' |')        srs = srs.replace('\'', '')        print('--------\na | b\n------\n' + srs + '\n--------')    else:        cur.execute(sql)    conn.commit()    cur.close()    conn.close()def execSql(host, port, user, pwd, db):    sql = ['drop table if exists mysql_connector_python;',            'create table mysql_connector_python(a int primary key, b text);',            "insert into mysql_connector_python values(1,'a'),(2,'b'),(3,'c');",            'select * from mysql_connector_python;',            "update mysql_connector_python set b = 'abc' where a = 3;",            'select * from mysql_connector_python;',            'delete from mysql_connector_python where a = 3;',            'select * from mysql_connector_python;']    for sqls in sql:        test(sqls, host, port, user, pwd, db)if __name__ == '__main__':    parser = argparse.ArgumentParser(description = 'this script is use to test ddl replication!')    parser.add_argument('--host', help='host')    parser.add_argument('--port', default=3306, help='port')    parser.add_argument('--db', default='mysql', help='database name')    parser.add_argument('--pwd', default='root', help='password')    parser.add_argument('--user', default='root', help='user name')    args = parser.parse_args()    host = args.host    port = args.port    db   = args.db    pwd  = args.pwd    user = args.user    print(host, str(port))    execSql(host, port, user, pwd, db)

nodejs

  • postgresql
const { findSourceMap } = require('module');const { CLIENT_RENEG_LIMIT } = require('tls');const pg=require('./node_modules/pg');var arguments = process.argv.splice(2);console.log('host:', arguments[0], 'port: ', arguments[1]);var conString = ('postgres://abc:[email protected]'+arguments[0]+':'+arguments[1]+'/postgres');var client = new pg.Client(conString);client.connect(function(err){    if(err){        return console.error('数据库连贯出错',err);    }    console.log("")    console.log("=========== JS Driver ==============");    client.query('drop table if exists smoketesttable_js;',function(err,data){        if(err){            return console.error('step 1 : droped table failed!',err);        }else{            console.log('step 1 : drop table success!')        }    })  client.query('drop table if exists smoketesttable_js1;');//再运行一次的起因是因为如果失败了就只有一个failed!提醒,没有报错信息。所以再运行一次让这个报错信息显示进去    client.query('create table smoketesttable_js(id int primary key,name text,gender text);',function(err,data){        if(err){            return console.error('step 2 : create failed!',err);        }else{            console.log('step 2 : create table success!')        }    })  client.query('create table smoketesttable_js1(id int primary key,name text,gender text);')    client.query("insert into smoketesttable_js values(1,'name1','male'),(2,'name2','female'),(3,'name3','male');",function(err,data){        if(err){            return console.error('step 3 : insert failed!',err);        }else{            console.log('step 3 : insert data success!')        }    })  client.query("insert into smoketesttable_js1 values(1,'name1','male'),(2,'name2','female'),(3,'name3','male');")    client.query("delete from smoketesttable_js where id = 1;",function(err){        if(err){            return console.error('step 4 : delete failed!')        }else{            console.log("step 4 : delete data success!")        }    })  client.query("delete from smoketesttable_js1 where id = 1;")    client.query("update smoketesttable_js set gender = 'male' where id = 2;",function(err){        if(err){            return console.error("step 5 : update failed!")        }else{            console.log('step 5 : update gender success!')        }    })  client.query("update smoketesttable_js1 set gender = 'male' where id = 2;")    client.query("select * from smoketesttable_js;",function(err){        if(err){            return console.error("select failed!")            client.query("step 6 : select * from smoktesttable_js;")        }else{            console.log('step 6 : select table success!')        }    })    client.query("select * from smoketesttable_js1;")    client.query("commit",function(err){        if(err){            return console.error("select failed!")        }else{            console.log('step 6 : commit success!')        }        client.end();        console.log("====================================");  console.log("")    })})
  • mysql
// 应用process获取命令行传的参数var arguments = process.argv.splice(2);var hosts = arguments[0];var ports  = arguments[1];var mysql  = require('mysql');var connection = mysql.createConnection({    host     : hosts,    user     : 'abc',    password : 'abc',    port: ports,    database: 'postgres'});//console.log(connection)connection.connect();var sql1 = 'drop table if exists myjs_sm';var sql2 = 'create table if not exists myjs_sm(a int primary key,b text);'var sql3 = 'insert into myjs_sm values(1,"abc")';var sqls = 'select * from myjs_sm';var sql4 = 'update myjs_sm set b = "asd" where a = 1';var sql5 = 'delete from myjs_sm where a = 1';var  execsql=function(arg1){//查  connection.query(arg1, function (err, result) {                if(err){                console.log(err.message);                return;        }        console.log('-----------------------[ "' + arg1 + '" ]');        console.log(result);        console.log('------------------------------------------------------------\n\n');  });  //connection.end();}execsql(sql1);execsql(sql2);execsql(sql3);execsql(sqls);execsql(sql4);execsql(sqls);execsql(sql5);connection.end();

php

  • postgresql

    • php smoketest-pg.php host port
<?php/* * Copyright (c) 2019 ZettaDB inc. All rights reserved. * This source code is licensed under Apache 2.0 License, * combined with Common Clause Condition 1.0, as detailed in the NOTICE file. */function showresults($ret) {  echo "=== select results ===\n";  while ($row = pg_fetch_row($ret)) {        $str="";        foreach ($row as $item){                $str .= $item."  ";        }        echo "row: $str\n";  }}function checkret($db, $cmd, $ret) {  if (!$ret) {        echo $cmd.pg_last_error($db)."\n";        exit(1);  } else {        echo $cmd.":success\n";  }}$host = "host=$argv[1]";$port = "port=$argv[2]";$dbname = "dbname=postgres";$credentials = "user=abc password=abc";$connstr="$host $port $dbname $credentials";echo "conn string: $connstr\n";$db = pg_connect($connstr);if(!$db){  echo "Error : Unable to open database\n";  exit(1);} else {  echo "Opened database successfully\n";}$sql = "SET client_min_messages TO 'warning';";$ret = pg_query($db, $sql);checkret($db, $sql, $ret);$sql = "drop table if exists t1;";$ret = pg_query($db, $sql);checkret($db, $sql, $ret);$sql = "RESET client_min_messages;";$ret = pg_query($db, $sql);checkret($db, $sql, $ret);$sql = "create table t1(id integer primary key, info text, wt integer);";$ret = pg_query($db, $sql);checkret($db, $sql, $ret);$sql = "insert into t1(id,info,wt) values(1, 'record1', 1);";$ret = pg_query($db, $sql);checkret($db, $sql, $ret);$sql = "insert into t1(id,info,wt) values(2, 'record2', 2);";$ret = pg_query($db, $sql);checkret($db, $sql, $ret);$sql = "update t1 set wt = 12 where id = 1;";$ret = pg_query($db, $sql);checkret($db, $sql, $ret);$sql = "select info from t1;";$ret = pg_query($db, $sql);if ($ret) {  showresults($ret);}$sql = 'select * from t1 where id=$1';$ret = pg_prepare($db, "prep", $sql);checkret($db,  "prep:".$sql, $ret);$ret = pg_execute($db, "prep", array(1));showresults($ret);$ret = pg_execute($db, "prep", array(2));showresults($ret);$sql = 'update t1 set info=$1 , wt=$2 where id=$3';$ret = pg_prepare($db, "prep2", $sql);checkret($db, "prep:".$sql, $ret);$ret = pg_execute($db, "prep2", array('Rec1', 2, 1));checkret($db, "prep2:{Rec1,2,1}", $ret);$ret = pg_execute($db, "prep2", array('Rec2', 3, 2));checkret($db, "prep2:{Rec2,3,2}", $ret);$sql = "select * from t1;";$ret = pg_query($db, $sql);if ($ret) {  showresults($ret);}$sql = "delete from t1 where id = 1;";$ret = pg_query($db, $sql);checkret($db, $sql, $ret);$sql = "select * from t1;";$ret = pg_query($db, $sql);if ($ret) {  showresults($ret);}pg_close($db)?>
  • mysql

    • php smoketest-my.php host port
<?php$host = "$argv[1]";$port = "$argv[2]";$dbname = "postgres";$user = "abc";$pwd = "abc";$conn = mysqli_connect($host, $user, $pwd, $dbname, $port) or die("数据库连贯谬误!");$sql1 = "drop table if  exists myphp_sm \n";$sql2 = "create table if not exists myphp_sm(a int primary key, b text)\n";$sql3 = "insert into myphp_sm values (1,'abc')\n";$sql4 = "select * from myphp_sm\n";$sql5 = "update myphp_sm set b = 'asd' where a = 1\n";$sql6 = "select * from myphp_sm\n";$sql7 = "delete from myphp_sm where a = 1\n";$rs = mysqli_query($conn, $sql1);echo "$sql1 success\n";$rs = mysqli_query($conn, $sql2);echo "$sql2 success\n";$rs = mysqli_query($conn, $sql3);echo "$sql3 success\n";$rs = mysqli_query($conn, $sql4);echo "$sql4 success\n";$row = mysqli_fetch_array($rs);var_dump($row);$rs = mysqli_query($conn, $sql5);echo "$sql5 success\n";$rs = mysqli_query($conn, $sql6);echo "$sql6 success\n";$row = mysqli_fetch_array($rs);var_dump($row);$rs = mysqli_query($conn, $sql7);echo "$sql7 success\n";?>

go

  • mysql

    • 环境设置及编译
go env -w GO111MODULE=ongo env -w GOPROXY=https://goproxy.cn,directgo mod init smoketest_my #初始化脚本go mod tidy #该步go的包管理器会把依赖库主动解决好go build # build
  • ./smoketest_my -h host -p port
package mainimport (    "fmt"    _ "github.com/go-sql-driver/mysql"    "github.com/jmoiron/sqlx"    "flag")func checkError(err error) {        if err != nil {                panic(err)        }}func main() {        var User string        var Pwd string        var Host string        var Port int        var Dbname string        flag.StringVar(&Host,"h","","默认为空")        flag.IntVar(&Port,"p",5001,"默认为5001")        flag.StringVar(&Pwd,"pwd","abc","默认为abc")        flag.StringVar(&Dbname,"d","postgres","默认为postgres")        flag.StringVar(&User,"u","abc","默认为abc")        flag.Parse()        fmt.Println("============= Golang-mysql ============")        // Initialize connection string.        var connectionString string = fmt.Sprintf("%s:%[email protected](%s:%d)/%s?charset=utf8", User, Pwd, Host, Port, Dbname)        // Initialize connection object.        db, err := sqlx.Open("mysql", connectionString)        checkError(err)        err = db.Ping()        checkError(err)        fmt.Println("Successfully connection to database!!!")        // Drop table        _, err = db.Exec("drop table if exists mygo_sm;")        checkError(err)        fmt.Println("Successfully drop   table")        // Create table.        _, err = db.Exec("create table mygo_sm(id int primary key,name text,gender text);")        checkError(err)        fmt.Println("Successfully create table")        // Insert//        sql_statement := "insert into mygo_sm values ($1, $2, $3);"        _, err = db.Exec("insert into mygo_sm values( 1, 'banana', 'male')")        checkError(err)        _, err = db.Exec("insert into mygo_sm values(2, 'orange', 'female')")        checkError(err)        _, err = db.Exec("insert into mygo_sm values(3, 'apple', 'male')")        checkError(err)        fmt.Println("Successfully insert table")        _, err = db.Exec("delete from mygo_sm where id = 2")        checkError(err)        fmt.Println("Successfully delete table")        _, err = db.Exec("update mygo_sm set name = 'update' where id = 3")        checkError(err)        fmt.Println("Successfully update table")        _, err = db.Exec("select * from mygo_sm")        checkError(err)        fmt.Println("Successfully select table")        fmt.Println("=================================")}
  • postgresql

    • 环境设置及编译,参考上条
    • ./smoketest -h host -p port
package mainimport (  "database/sql"  "fmt"  "flag"  _ "github.com/lib/pq")const (  // Initialize connection constants.  //HOST     = "mydemoserver.postgres.database.azure.com"  //DATABASE = "postgres"  //USER     = "abc"  //PASSWORD = "abc")func checkError(err error) {  if err != nil {        panic(err)  }}func main() {        var User string        var Pwd string        var Host string        var Port int  var Db string  flag.StringVar(&Host,"h","","默认为空")  flag.IntVar(&Port,"p",5001,"默认为5001")  flag.StringVar(&Pwd,"pwd","abc","默认为abc")  flag.StringVar(&Db,"d","postgres","默认为postgres")  flag.StringVar(&User,"u","abc","默认为abc")  flag.Parse()  fmt.Println("============= Golang ============")  // Initialize connection string.  var connectionString string = fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", Host, Port,User,Pwd,Db)  // Initialize connection object.  db, err := sql.Open("postgres", connectionString)  checkError(err)  err = db.Ping()  checkError(err)        fmt.Println("Successfully connection to database!!!")  // Drop previous table of same name if one exists.  _, err = db.Exec("drop table if exists SmokeTestTable_go;")  checkError(err)  fmt.Println("Successfully drop   table")  // Create table.  _, err = db.Exec("create table SmokeTestTable_go(id int primary key,name text,gender text);")  checkError(err)  fmt.Println("Successfully create table")  // Insert some data into table.  sql_statement := "insert into SmokeTestTable_go values ($1, $2, $3);"  _, err = db.Exec(sql_statement, 1, "banana", "male")  checkError(err)  _, err = db.Exec(sql_statement, 2, "orange", "female")  checkError(err)  _, err = db.Exec(sql_statement, 3, "apple", "male")  checkError(err)  fmt.Println("Successfully insert table")  _, err = db.Exec("delete from Smoketesttable_go where id = 2")        checkError(err)        fmt.Println("Successfully delete table")        _, err = db.Exec("update SmokeTestTable_go set name = 'update' where id = 3")        checkError(err)        fmt.Println("Successfully update table")  _, err = db.Exec("select * from SmokeTestTable_go")        checkError(err)        fmt.Println("Successfully select table")  fmt.Println("=================================")}

c

  • postgresql

    • gcc -o smokeTest smokeTest.c -I/path/postgresql-11.5-rel/include -L/path/postgresql-11.5-rel/lib -lpq
    • ./smokeTest "dbname = postgres host=127.0.0.1 port=5401 user=user password=pwd"
/* * Copyright (c) 2019 ZettaDB inc. All rights reserved. * This source code is licensed under Apache 2.0 License, * combined with Common Clause Condition 1.0, as detailed in the NOTICE file. * * gcc -o smokeTest smokeTest.c -I/path/postgresql-11.5-rel/include -L/path/postgresql-11.5-rel/lib -lpq * ./smokeTest "dbname = postgres host=127.0.0.1 port=5401 user=abc password=abc" * * Test the C version of libpq, the PostgreSQL frontend library. */#include <stdio.h>#include <stdlib.h>#include "libpq-fe.h"static voidexit_nicely(PGconn *conn){        PQfinish(conn);        exit(1);}intmain(int argc, char **argv){        const char *conninfo;        PGconn     *conn;        PGresult   *res;        int                     nFields;        int                     i,                                j;        /*         * If the user supplies a parameter on the command line, use it as the         * conninfo string; otherwise default to setting dbname=postgres and using         * environment variables or defaults for all other connection parameters.         */        if (argc > 1)                conninfo = argv[1];        else                conninfo = "dbname = postgres host=192.168.0.104 port=6401 user=abc password=abc";        /* Make a connection to the database */        conn = PQconnectdb(conninfo);        /* Check to see that the backend connection was successfully made */        if (PQstatus(conn) != CONNECTION_OK)        {                fprintf(stderr, "Connection to database failed: %s",                                PQerrorMessage(conn));                exit_nicely(conn);        }        /* Set always-secure search path, so malicous users can't take control. */        res = PQexec(conn, "SET client_min_messages TO 'warning';");        PQclear(res);        res = PQexec(conn, "drop table if exists t1");        if (PQresultStatus(res) != PGRES_COMMAND_OK)        {                fprintf(stderr, "drop table failed: %s", PQerrorMessage(conn));                PQclear(res);                exit_nicely(conn);        } else {                fprintf(stderr, "drop table ok\n");        }        PQclear(res);        res = PQexec(conn, "RESET client_min_messages;");        PQclear(res);        /*         * Our test case here involves using a cursor, for which we must be inside         * a transaction block.  We could do the whole thing with a single         * PQexec() of "select * from pg_database", but that's too trivial to make         * a good example.         */        /* Start a transaction block */        res = PQexec(conn, "create table t1(id integer primary key,info text, wt integer)");        if (PQresultStatus(res) != PGRES_COMMAND_OK)        {                fprintf(stderr, "create table command failed: %s", PQerrorMessage(conn));                PQclear(res);                exit_nicely(conn);        } else {                fprintf(stderr, "create table ok\n");        }        PQclear(res);        /*         * Fetch rows from pg_database, the system catalog of databases         */        res = PQexec(conn, "insert into t1(id,info,wt) values(1, 'record1', 1)");        if (PQresultStatus(res) != PGRES_COMMAND_OK)        {                fprintf(stderr, "insert record 1 failed: %s", PQerrorMessage(conn));                PQclear(res);                exit_nicely(conn);        } else {                fprintf(stderr, "insert record 1 ok\n");        }        PQclear(res);        res = PQexec(conn, "insert into t1(id,info,wt) values(2, 'record2', 2)");        if (PQresultStatus(res) != PGRES_COMMAND_OK)        {                fprintf(stderr, "insert record 2 failed: %s", PQerrorMessage(conn));                PQclear(res);                exit_nicely(conn);        } else {                fprintf(stderr, "insert record 2 ok\n");        }        PQclear(res);        res = PQexec(conn, "update t1 set wt = 12 where id = 1");        if (PQresultStatus(res) != PGRES_COMMAND_OK)        {                fprintf(stderr, "update record failed: %s", PQerrorMessage(conn));                PQclear(res);                exit_nicely(conn);        } else {                fprintf(stderr, "update record ok\n");        }        PQclear(res);        res = PQexec(conn, "select * from t1");        if (PQresultStatus(res) != PGRES_TUPLES_OK)        {                fprintf(stderr, "select failed: %s", PQerrorMessage(conn));                PQclear(res);                exit_nicely(conn);        } else {                int nFields = PQnfields(res);                for(i=0;i<nFields;i++)                {                        fprintf(stderr, "%s\t\t",PQfname(res,i));                }                fprintf(stderr, "\n");                for(i=0;i<PQntuples(res);i++)                {                        for(j=0;j<nFields;j++)                        {                                fprintf(stderr, "%s\t\t",PQgetvalue(res,i,j));                        }                        fprintf(stderr, "\n");                }        }        PQclear(res);        res = PQexec(conn, "delete from t1 where id = 1");        if (PQresultStatus(res) != PGRES_COMMAND_OK)        {                fprintf(stderr, "delete record failed: %s", PQerrorMessage(conn));                PQclear(res);                exit_nicely(conn);        } else {                fprintf(stderr, "delete record ok\n");        }        PQclear(res);        res = PQexec(conn, "drop table t1");        if (PQresultStatus(res) != PGRES_COMMAND_OK)        {                fprintf(stderr, "drop table failed: %s", PQerrorMessage(conn));                PQclear(res);                exit_nicely(conn);        } else {                fprintf(stderr, "drop table ok\n");        }        PQclear(res);        /* close the connection to the database and cleanup */        PQfinish(conn);        return 0;}
  • mysql

    • gcc -I/usr/include/mysql -L/usr/lib/mysql mysql.c -lmysqlclient -o mysql
    • ./mysql host port
#include <stdio.h>#include <mysql.h>int main(int argc,char** argv){  printf("connect to %s:%s\n",argv[1], argv[2]);  printf("version: %s\n", mysql_get_client_info());  MYSQL* my = mysql_init(NULL);  int port = atoi(argv[2]);  if(!mysql_real_connect(my, ("%s", argv[1]), "abc", "abc", "postgres", port, NULL, 0)){        printf("connect error !\n");        mysql_close(my);  }  printf("drop table if exists myc_sm;\n");  mysql_query(my, "drop table if exists myc_sm;");  printf("create table myc_sm;\n");  mysql_query(my, "create table myc_sm(a int primary key, b text);");  printf("insert into myc_sm values(1,'abc'),(2,'bcd'),(3,'cde')\n");  mysql_query(my, "insert into myc_sm values(1,'abc'),(2,'bcd'),(3,'cde')");  void select(void)  {        printf("\n\nselect * from myc_sm;\n");        int res = mysql_query(my, "select * from myc_sm;");        MYSQL_RES* a = mysql_store_result(my);        int rows = mysql_num_rows(a);        int cols = mysql_num_fields(a);        printf("rows: %d, cols: %d\n", rows, cols);        MYSQL_FIELD *field = mysql_fetch_fields(a);        for(int i = 0; i < cols; i++)        {                printf("%-10s\t", field[i].name);        }        puts("");                MYSQL_ROW line;        for(int i = 0; i < rows; i++)                {                        line =  mysql_fetch_row(a);                        for(int j = 0; j < cols; j++)                        {                                printf("%-10s\t", line[j]);                        }                        puts("");        }  }  select();  printf("update myc_sm set b = 'def' where a = 1;");  mysql_query(my, "update myc_sm set b = 'def' where a = 1;");  select();  printf("delete from myc_sm where a = 3;");  mysql_query(my, "delete from myc_sm where a = 3;");  select();  mysql_close(my);}

c#

  • postgresql
using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;using Npgsql;namespace SmokeTest{    class Program    {        static void Main(string[] args)        {            Console.WriteLine("\n========== C# Driver ============");            string connSting = "Host=localhost;Port=5401;Username=abc;Password=abc;Database=postgres";            var conn = new NpgsqlConnection(connSting);            NpgsqlDataAdapter DA = new NpgsqlDataAdapter();            //NpgsqlCommand cmd_select = new NpgsqlCommand("select * from SmokeTestTable_csharp");            //DA.SelectCommand = cmd_select;            string drop1 = "drop table if exists SmokeTestTable_csharp;";            string create = "create table SmokeTestTable_csharp(id int primary key,name text,gender text);";            string insert = "insert into SmokeTestTable_csharp values(1,'li','male'),(2,'delete_me','male'),(3,'update_me','female');";            string create2 = "create table testfordelete(id int primary key);";            string droptab = "drop table testfordelete;";            string delete = "delete from Smoketesttable_csharp where id = 2";            string update = "update SmokeTestTable_csharp set name = 'update' where id = 3";            string select = "select * from SmokeTestTable_csharp";            string drodb = "drop database if exists smoketestdb;";            string commit = "commit;";            string credb = "create database smoketestdb;";            string swdb = "use smoketestdb";            string dropdb = "drop database smoketestdb;";            conn.Open();            using (var com1 = new NpgsqlCommand(drop1, conn))            using (var com2 = new NpgsqlCommand(create, conn))            using (var com3 = new NpgsqlCommand(insert, conn))            using (var com4 = new NpgsqlCommand(create2, conn))            using (var com5 = new NpgsqlCommand(droptab, conn))            using (var com6 = new NpgsqlCommand(delete, conn))            using (var com7 = new NpgsqlCommand(update, conn))            using (var com8 = new NpgsqlCommand(select, conn))            using (var drobd1 = new NpgsqlCommand(drodb,conn))            using (var credb1 = new NpgsqlCommand(credb, conn))            using (var swdb1 = new NpgsqlCommand(swdb, conn))            using (var dropdb1 = new NpgsqlCommand(dropdb, conn))            using (var comm = new NpgsqlCommand(commit,conn))            {                //drobd1.ExecuteNonQuery();                //Console.WriteLine("drop table success!");                //credb1.ExecuteNonQuery();                //Console.WriteLine("create database success!");                //comm.ExecuteNonQuery();                //Console.WriteLine("commit success!");                //swdb1.ExecuteNonQuery();                //Console.WriteLine("switch database success!");                com1.ExecuteNonQuery();                Console.WriteLine("drop   table success!");                com2.ExecuteNonQuery();                Console.WriteLine("create table success!");                com3.ExecuteNonQuery();                Console.WriteLine("insert table success!");                com4.ExecuteNonQuery();                Console.WriteLine("create table success!");                com5.ExecuteNonQuery();                Console.WriteLine("drop   table success!");                com6.ExecuteNonQuery();                Console.WriteLine("delete table success!");                com7.ExecuteNonQuery();                Console.WriteLine("update table success!");                com8.ExecuteNonQuery();                Console.WriteLine("select table success!");                comm.ExecuteNonQuery();                Console.WriteLine("commit table success!");                //dropdb1.ExecuteNonQuery();                //Console.WriteLine("drop database success!");            }            conn.Close();            Console.WriteLine("=================================\n");        }    }}
  • mysql
using System;using System.Collections.Generic;using System.Data;using MySql.Data.MySqlClient;using System.Text;namespace mysql{    class Program    {        static void Main(string[] args)        {        //server=127.0.0.1;port=3306;user=root;password=root; database=minecraftdb;                string cs = "server=" + args[0] + ";user=abc;password=abc;port=" + args[1] + ";database=postgres";                Console.WriteLine("testing mysql: " + cs);        MySqlConnection conn = null;        conn = new MySqlConnection(cs);        //conn.Open();        //Console.WriteLine("drop table if exists mycs_sm;");        //MySqlCommand cmd = new MySqlCommand("drop table if exists mycs_sm;", conn);        //int n = cmd.ExecuteNonQuery();        List<string> sqlList = new List<string>();        sqlList.Add("drop table if exists mycs_sm;");        sqlList.Add("create table mycs_sm(a int primary key, b text);");        sqlList.Add("insert into mycs_sm values(1,'abc'),(2,'bcd'),(3,'cde');");        sqlList.Add("select * from mycs_sm;");        sqlList.Add("update mycs_sm set b = 'def' where a = 1;");        sqlList.Add("select * from mycs_sm;");        sqlList.Add("delete from mycs_sm where a = 3;");        sqlList.Add("select * from mycs_sm;");        foreach (string i in sqlList){                Console.WriteLine(i);                List<string> list = new List<string>();                if (i == "select * from mycs_sm;"){                        conn.Open();                        MySqlCommand cmd = new MySqlCommand(i, conn);                        MySqlDataReader reader = cmd.ExecuteReader();                        while (reader.Read()){                                string id = reader.GetString("a");                                string name = reader.GetString("b");                                Console.WriteLine(id + " : " + name);                        }                        conn.Close();                }                else {                        conn.Open();                        MySqlCommand cmd = new MySqlCommand(i, conn);                        cmd.ExecuteNonQuery();                        conn.Close();                }        }        //conn.Close();        }    }}

c++

  • postgresql

    • 编译

      • g++ -o smokeTest smokeTest.cpp -lpqxx -lpq -std=c++17
/* * Copyright (c) 2019 ZettaDB inc. All rights reserved. * This source code is licensed under Apache 2.0 License, * * sudo apt install libpq-dev * git clone https://github.com/jtv/libpqxx.git * cd libpqxx * ./configure * make * sudo make install * * g++ -o smokeTest smokeTest.cpp -lpqxx -lpq -std=c++17 * ./smokeTest "dbname = postgres host=127.0.0.1 port=5401 user=abc password=abc" * * Test the C++ version of libpqxx, the PostgreSQL frontend library. */#include <iostream>#include <pqxx/pqxx>using namespace std;using namespace pqxx;intmain(int argc, char **argv){  const char *conninfo;  if (argc > 1)        conninfo = argv[1];  else        conninfo = "dbname = postgres user=abc password=abc hostaddr=127.0.0.1 port=5401";  try{        pqxx::connection db(conninfo);        if (db.is_open()) {                cout << "Opened database successfully: " << db.dbname() << endl;        } else {                cout << "Can't open database" << endl;                return 1;        }        pqxx::nontransaction txn1{db};        txn1.exec("drop table if exists t1");        txn1.exec("create table t1(id integer primary key, info text, wt integer)");        txn1.commit();        pqxx::work txn2{db};        txn2.exec("insert into t1(id,info,wt) values(1, 'record1', 1)");        txn2.exec("insert into t1(id,info,wt) values(2, 'record2', 2)");        txn2.exec("insert into t1(id,info,wt) values(3, 'record3', 3)");        txn2.exec("update t1 set wt = 12 where id = 1");        txn2.exec("delete from t1 where id = 2");        pqxx::result r2{txn2.exec("select * from t1")};        for (auto row: r2)                std::cout << row[0] << " " << row[1] << " " << row[2] << std::endl;        txn2.commit();        pqxx::nontransaction txn3{db};        txn3.exec("drop table t1");        txn3.commit();        db.close();  }catch (const std::exception &e){        cerr << e.what() << std::endl;        return 1;  }  return 0;}
  • mysql

    • 编译

      • g++ mysql.cpp -lmysqlcppconn -o mysql
// g++ mysql.cpp -lmysqlcppconn -o mysqltest// ./mysqltest "tcp://192.168.0.113:5661"#include "mysql_connection.h"#include <stdlib.h>#include <iostream>#include <cppconn/driver.h>#include <cppconn/exception.h>#include <cppconn/resultset.h>#include <cppconn/statement.h>using namespace std;int main(int argc,char* argv[]){  sql::Driver *driver;        sql::Connection *con;        sql::Statement *stmt;        sql::ResultSet *res;  /* Create a connection */        driver = get_driver_instance();  //string infos = sprintf("\"tcp://" , argv[1] , "\"");  //string in1 = "\"tcp://";  //string in2 = "\"";  //string infos = in1 + argv[1] + in2;  string infos = argv[1];        con = driver->connect(infos, "abc", "abc");  con->setSchema("postgres");  stmt = con->createStatement();        stmt->execute("drop table if exists mycpp_sm;");  cout<<"drop table if exists mycpp_sm;"<<endl;  stmt->execute("create table mycpp_sm(a int primary key, b text)");  cout<<"create table mycpp_sm(a int primary key, b text)"<<endl;  stmt->execute("insert into mycpp_sm values(1, 'abc'),(2,'bcd'),(3,'cde')");  cout<<"insert into mycpp_sm(1, 'abc'),(2,'bcd'),(3, 'cde')"<<endl;  stmt->executeQuery("select * from mycpp_sm");  cout<<"select * from mycpp_sm"<<endl;  stmt->execute("update mycpp_sm set b = 'qwer' where a = 2");  cout<<"update mycpp_sm set b = 'qwer' where a = 2"<<endl;  stmt->executeQuery("select * from mycpp_sm");  cout<<"select * from mycpp_sm"<<endl;        stmt->execute("delete from mycpp_sm where a = 3");  cout<<"delete from mycpp_sm where a = 3"<<endl;  stmt->executeQuery("select * from mycpp_sm");  cout<<"select * from mycpp_sm"<<endl;  delete stmt;  delete con;}

rust

  • postgresql

    • 编译

      • cargo build(该步会把所有依赖下载并装置)
use clap::{App, Arg};use postgres::{Client, NoTls};const DEFAULT_HOST: &str = "localhost";const DEFAULT_PORT: &str = "7999";const DEFAULT_USER: &str = "abc";const DEFAULT_PASS: &str = "abc";const DEFAULT_DB: &str = "postgres";struct ConnectionArgs {    pub host: String,    pub port: String,    pub user: String,    pub pass: String,    pub db: String,}fn parse_args() -> ConnectionArgs {    let matches = App::new("Execute SQL in Postgres")        .arg(            Arg::with_name("host")                .short("h")                .long("host")                .value_name("HOSTNAME")                .default_value(DEFAULT_HOST)                .help("Sets the host name of PostgreSQL service")                .takes_value(true),        )        .arg(            Arg::with_name("port")                .short("p")                .long("port")                .value_name("PORT")                .default_value(DEFAULT_PORT)                .help("Sets the port of PostgreSQL service")                .takes_value(true),        )        .arg(            Arg::with_name("user")                .short("u")                .long("user")                .value_name("USERNAME")                .default_value(DEFAULT_USER)                .help("Sets the user to log into PostgreSQL")                .takes_value(true),        )        .arg(            Arg::with_name("pass")                .long("pass")                .value_name("PASSWORD")                .default_value(DEFAULT_PASS)                .help("Sets the user's password")                .takes_value(true),        )        .arg(            Arg::with_name("db")                .short("d")                .long("database")                .value_name("DATABASE")                .default_value(DEFAULT_DB)                .help("Sets the database to use")                .takes_value(true),        )        .get_matches();    ConnectionArgs {        host: matches.value_of("host").unwrap().to_string(),        port: matches.value_of("port").unwrap().to_string(),        user: matches.value_of("user").unwrap().to_string(),        pass: matches.value_of("pass").unwrap().to_string(),        db: matches.value_of("db").unwrap().to_string(),    }}fn main() {    let args = parse_args();    let sqls = [        "drop table if exists t1;",        "create table t1(id integer primary key, info text, wt integer);",        "insert into t1(id,info,wt) values(1, 'record1', 1);",        "insert into t1(id,info,wt) values(2, 'record2', 2);",        "update t1 set wt = 12 where id = 1;",        "select * from t1;",        "delete from t1 where id = 1;",        "select * from t1;",    ];    let mut client = Client::connect(        &format!(            "postgres://{}:{}@{}:{}/{}",            args.user, args.pass, args.host, args.port, args.db        ),        NoTls,    )    .unwrap();    for sql in sqls {        // 或者应用 let result = client.query(sql, &[]);        // query 会输入后果,execute 只返回影响的行数        let result = client.execute(sql, &[]);        println!("command: {}, res: {:?}", sql, result);    }}
  • mysql

    • 编译

      • cargo build(该步会把所有依赖下载并装置)
use mysql::*;use mysql::prelude::*;//use std::env;use clap::{App, Arg};fn main() {    let matches = App::new("Execute SQL in Postgres")        .arg(            Arg::with_name("host")                .short("h")                .long("host")                .value_name("HOSTNAME")                .help("Sets the host name of PostgreSQL service")                .takes_value(true),        )        .arg(           Arg::with_name("port")                .short("p")                .long("port")                .value_name("PORT")                .help("Sets the port of PostgreSQL service")                .takes_value(true),        ).get_matches();    let host = matches.value_of("host").unwrap_or("192.168.0.113").to_string();    let port = matches.value_of("port").unwrap_or("5662").to_string();    let front = "mysql://pgx:[email protected]".to_string();    let hosts = front + &host + ":" + &port + "/mysql";    let url = hosts.to_string();    // let mut hosts = front + &host + ":".to_string() + &port + "/mysql".to_string();//    let mut url = format!("mysql://pgx:[email protected]{:?}:{:?}/mysql", host, port);    let urls = "mysql://abc:[email protected]:5662/postgres";    println!("{}", url);    println!("{}", urls);    let pool = Pool::new(urls).unwrap(); // 获取连接池    let mut conn = pool.get_conn().unwrap();// 获取链接    let sql = [        "drop table if exists myrs_sm;",        "create table myrs_sm(a int primary key, b text);",        "insert into myrs_sm values(1, 'abc'),(2,'bcd'),(3,'cde');",        "select * from myrs_sm;",        "update myrs_sm set b = 'def' where a = 1",        "select * from myrs_sm;",        "delete from myrs_sm where a = 3",        "select * from myrs_sm;"    ];    let selects = "select * from myrs_sm;";    for sqls in sql{        conn.query_iter(sqls).unwrap();        println!();        println!("{:?}", sqls);        if sqls == "select * from myrs_sm;"{            conn.query_iter(selects).unwrap()                .for_each(|row|{                let r:(i32,String)=from_row(row.unwrap());                println!("result: a={},b='{}'",r.0,r.1);            });        }    }}

根本DDL

Database

create database
CREATE DATABASE name    [ [ WITH ] [ OWNER [=] user_name ]           [ TEMPLATE [=] template ]           [ ENCODING [=] encoding ]           [ LOCALE [=] locale ]           [ LC_COLLATE [=] lc_collate ]           [ LC_CTYPE [=] lc_ctype ]           [ TABLESPACE [=] tablespace_name ]           [ ALLOW_CONNECTIONS [=] allowconn ]           [ CONNECTION LIMIT [=] connlimit ]           [ IS_TEMPLATE [=] istemplate ] ]

创立一个新的数据库

create database kunlun;

要在一个默认表空间pg_default中创立一个被用户 abc 有的新数据库 kunlun:

CREATE DATABASE kunlun OWNER abc TABLESPACE pg_default;

更多的参考链接
create database

alter database
ALTER DATABASE name [ [ WITH ] option [ ... ] ]这里 option 能够是:    ALLOW_CONNECTIONS allowconn    CONNECTION LIMIT connlimit    IS_TEMPLATE istemplateALTER DATABASE name OWNER TO { new_owner | CURRENT_USER | SESSION_USER }ALTER DATABASE name SET TABLESPACE new_tablespaceALTER DATABASE name SET configuration_parameter { TO | = } { value | DEFAULT }ALTER DATABASE name SET configuration_parameter FROM CURRENTALTER DATABASE name RESET configuration_parameterALTER DATABASE name RESET ALL

批改数据库的所有者

ALTER DATABASE kunlun OWNER TO abc;

在数据库 kunlun 中禁用索引扫描

ALTER DATABASE kunlun SET enable_indexscan TO off;

批改数据库的最大链接数:

alter database kunlun connection limit 10;

更多的参考链接
alter database

drop database
DROP DATABASE [ IF EXISTS ] name

更多的参考链接
drop database

Schema

create schema
CREATE SCHEMA schema_name [ AUTHORIZATION role_specification ] [ schema_element [ ... ] ]CREATE SCHEMA AUTHORIZATION role_specification [ schema_element [ ... ] ]CREATE SCHEMA IF NOT EXISTS schema_name [ AUTHORIZATION role_specification ]CREATE SCHEMA IF NOT EXISTS AUTHORIZATION role_specification其中 role_specification 能够是:    user_name  | CURRENT_USER  | SESSION_USER

创立一个新的模式

create schema myschema;

为用户创立 kunlun 创立一个模式,此模式也同时命名为 kunlun :

CREATE SCHEMA AUTHORIZATION kunlun;

更多的参考链接
create schema

alter schema
ALTER SCHEMA name OWNER TO { new_owner | CURRENT_USER | SESSION_USER }

更改模式的所有者

 alter schema kunlun owner to vito;

更多的参考链接
alter schema

drop schema
DROP SCHEMA [ IF EXISTS ] name [, ...] [ CASCADE | RESTRICT ]

删除名为 kunlun 的 schema:

drop schema kunlun;

更多的参考链接
drop schema

Table

create table
CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXISTS ] table_name ( [  { column_name data_type [ COLLATE collation ] [ column_constraint [ ... ] ]    | table_constraint    | LIKE source_table [ like_option ... ] }    [, ... ]] )[ INHERITS ( parent_table [, ... ] ) ][ PARTITION BY { RANGE | LIST | HASH } ( { column_name | ( expression ) } [ COLLATE collation ] [ opclass ] [, ... ] ) ][ USING method ][ WITH ( storage_parameter [= value] [, ... ] ) | WITHOUT OIDS ][ ON COMMIT { PRESERVE ROWS | DELETE ROWS | DROP } ][ TABLESPACE tablespace_name ]CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXISTS ] table_name    OF type_name [ (  { column_name [ WITH OPTIONS ] [ column_constraint [ ... ] ]    | table_constraint }    [, ... ]) ][ PARTITION BY { RANGE | LIST | HASH } ( { column_name | ( expression ) } [ COLLATE collation ] [ opclass ] [, ... ] ) ][ USING method ][ WITH ( storage_parameter [= value] [, ... ] ) | WITHOUT OIDS ][ ON COMMIT { PRESERVE ROWS | DELETE ROWS | DROP } ][ TABLESPACE tablespace_name ]CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXISTS ] table_name    PARTITION OF parent_table [ (  { column_name [ WITH OPTIONS ] [ column_constraint [ ... ] ]    | table_constraint }    [, ... ]) ] { FOR VALUES partition_bound_spec | DEFAULT }[ PARTITION BY { RANGE | LIST | HASH } ( { column_name | ( expression ) } [ COLLATE collation ] [ opclass ] [, ... ] ) ][ USING method ][ WITH ( storage_parameter [= value] [, ... ] ) | WITHOUT OIDS ][ ON COMMIT { PRESERVE ROWS | DELETE ROWS | DROP } ][ TABLESPACE tablespace_name ]其中 column_constraint 是:[ CONSTRAINT constraint_name ]{ NOT NULL |  NULL |  DEFAULT default_expr |  GENERATED ALWAYS AS ( generation_expr ) STORED |  GENERATED { ALWAYS | BY DEFAULT } AS IDENTITY [ ( sequence_options ) ] |  UNIQUE index_parameters |  PRIMARY KEY index_parameters |  REFERENCES reftable [ ( refcolumn ) ] [ MATCH FULL | MATCH PARTIAL | MATCH SIMPLE ]    [ ON DELETE referential_action ] [ ON UPDATE referential_action ] }[ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ]table_constraint 是:[ CONSTRAINT constraint_name ]{   UNIQUE ( column_name [, ... ] ) index_parameters |  PRIMARY KEY ( column_name [, ... ] ) index_parameters |like_option 是:{ INCLUDING | EXCLUDING } { COMMENTS | CONSTRAINTS | DEFAULTS | GENERATED | IDENTITY | INDEXES | STATISTICS | STORAGE | ALL }partition_bound_spec 是:IN ( partition_bound_expr [, ...] ) |FROM ( { partition_bound_expr | MINVALUE | MAXVALUE } [, ...] )  TO ( { partition_bound_expr | MINVALUE | MAXVALUE } [, ...] ) |WITH ( MODULUS numeric_literal, REMAINDER numeric_literal )UNIQUE、PRIMARY KEY以及EXCLUDE束缚中的index_parameters是:[ INCLUDE ( column_name [, ... ] ) ][ WITH ( storage_parameter [= value] [, ... ] ) ][ USING INDEX TABLESPACE tablespace_name ]一个EXCLUDE束缚中的exclude_element是:{ column_name | ( expression ) } [ opclass ] [ ASC | DESC ] [ NULLS { FIRST | LAST } ]

创立表 t1:

create table t1(a int , b int);

创立长期表 v1:

create temp table v1(a int, b text);

建设 hash 分区表:

CREATE TABLE t2 (    order_id     bigint not null,    cust_id      bigint not null,    status       text) PARTITION BY HASH (order_id);

创立一个分区表:

CREATE TABLE t3 (    logdate         date not null,    peaktemp        int,    unitsales       int) PARTITION BY RANGE (logdate);

更多的参考链接

create table

alter table
ALTER TABLE [ IF EXISTS ] [ ONLY ] name [ * ]    action [, ... ]ALTER TABLE [ IF EXISTS ] [ ONLY ] name [ * ]    RENAME [ COLUMN ] column_name TO new_column_nameALTER TABLE [ IF EXISTS ] [ ONLY ] name [ * ]    RENAME CONSTRAINT constraint_name TO new_constraint_nameALTER TABLE [ IF EXISTS ] name    RENAME TO new_nameALTER TABLE [ IF EXISTS ] name    SET SCHEMA new_schemaALTER TABLE ALL IN TABLESPACE name [ OWNED BY role_name [, ... ] ]    SET TABLESPACE new_tablespace [ NOWAIT ]ALTER TABLE [ IF EXISTS ] name    ATTACH PARTITION partition_name { FOR VALUES partition_bound_spec | DEFAULT }ALTER TABLE [ IF EXISTS ] name    DETACH PARTITION partition_name其中action 是以下之一:    ADD [ COLUMN ] [ IF NOT EXISTS ] column_name data_type [ COLLATE collation ] [ column_constraint [ ... ] ]    DROP [ COLUMN ] [ IF EXISTS ] column_name [ RESTRICT | CASCADE ]    ALTER [ COLUMN ] column_name [ SET DATA ] TYPE data_type [ COLLATE collation ] [ USING expression ]    ALTER [ COLUMN ] column_name SET DEFAULT expression    ALTER [ COLUMN ] column_name DROP DEFAULT    ALTER [ COLUMN ] column_name { SET | DROP } NOT NULL    ALTER [ COLUMN ] column_name DROP EXPRESSION [ IF EXISTS ]    ALTER [ COLUMN ] column_name ADD GENERATED { ALWAYS | BY DEFAULT } AS IDENTITY [ ( sequence_options ) ]    ALTER [ COLUMN ] column_name { SET GENERATED { ALWAYS | BY DEFAULT } | SET sequence_option | RESTART [ [ WITH ] restart ] } [...]    ALTER [ COLUMN ] column_name DROP IDENTITY [ IF EXISTS ]    ALTER [ COLUMN ] column_name SET STATISTICS integer    ALTER [ COLUMN ] column_name SET ( attribute_option = value [, ... ] )    ALTER [ COLUMN ] column_name RESET ( attribute_option [, ... ] )    ALTER [ COLUMN ] column_name SET STORAGE { PLAIN | EXTERNAL | EXTENDED | MAIN }    ADD table_constraint [ NOT VALID ]    ADD table_constraint_using_index    ALTER CONSTRAINT constraint_name [ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ]    VALIDATE CONSTRAINT constraint_name    DROP CONSTRAINT [ IF EXISTS ]  constraint_name [ RESTRICT | CASCADE ]    ENABLE REPLICA TRIGGER trigger_name    ENABLE ALWAYS TRIGGER trigger_name    DISABLE RULE rewrite_rule_name    ENABLE RULE rewrite_rule_name    ENABLE REPLICA RULE rewrite_rule_name    ENABLE ALWAYS RULE rewrite_rule_name    DISABLE ROW LEVEL SECURITY    ENABLE ROW LEVEL SECURITY    FORCE ROW LEVEL SECURITY    NO FORCE ROW LEVEL SECURITY    SET TABLESPACE new_tablespace    SET ( storage_parameter [= value] [, ... ] )    RESET ( storage_parameter [, ... ] )    INHERIT parent_table    NO INHERIT parent_table    OF type_name    NOT OF    OWNER TO { new_owner | CURRENT_USER | SESSION_USER }    REPLICA IDENTITY { DEFAULT | USING INDEX index_name | FULL | NOTHING }and partition_bound_spec is:IN ( partition_bound_expr [, ...] ) |FROM ( { partition_bound_expr | MINVALUE | MAXVALUE } [, ...] )  TO ( { partition_bound_expr | MINVALUE | MAXVALUE } [, ...] ) |WITH ( MODULUS numeric_literal, REMAINDER numeric_literal )and column_constraint is:[ CONSTRAINT constraint_name ]{ NOT NULL |  NULL |  DEFAULT default_expr |  GENERATED ALWAYS AS ( generation_expr ) STORED |  GENERATED { ALWAYS | BY DEFAULT } AS IDENTITY [ ( sequence_options ) ] |  UNIQUE index_parameters |  PRIMARY KEY index_parameters |  REFERENCES reftable [ ( refcolumn ) ] [ MATCH FULL | MATCH PARTIAL | MATCH SIMPLE ]    [ ON DELETE referential_action ] [ ON UPDATE referential_action ] }[ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ]而table_constraint是:[ CONSTRAINT constraint_name ]{  UNIQUE ( column_name [, ... ] ) index_parameters |  PRIMARY KEY ( column_name [, ... ] ) index_parameters |  EXCLUDE [ USING index_method ] ( exclude_element WITH operator [, ... ] ) index_parameters [ WHERE ( predicate ) ] |    [ MATCH FULL | MATCH PARTIAL | MATCH SIMPLE ] [ ON DELETE action ] [ ON UPDATE action ] }[ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ]并且 table_constraint_using_index 是:    [ CONSTRAINT constraint_name ]    { UNIQUE | PRIMARY KEY } USING INDEX index_name    [ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ]UNIQUE、PRIMARY KEY以及EXCLUDE束缚中的index_parameters是:[ INCLUDE ( column_name [, ... ] ) ][ WITH ( storage_parameter [= value] [, ... ] ) ][ USING INDEX TABLESPACE tablespace_name ]exclude_element in an EXCLUDE constraint is:{ column_name | ( expression ) } [ opclass ] [ ASC | DESC ] [ NULLS { FIRST | LAST } ]

重命名表 kunlun 的名字:

create table kunlun(a int not null);alter table kunlun rename to t1;

向 t1 表中减少一列:

alter table t1 add column bb text;

bb 字段重命名为 b:

alter table t1 rename column bb to b;

批改表中 b 字段的数据类型:

ALTER TABLE t1 ALTER COLUMN b type varchar(10);

向 b 字段减少惟一束缚:

alter table t1 add constraint unique_t1_b unique (b);

为 b 列减少一个非空束缚:

ALTER TABLE t1 ALTER COLUMN b SET NOT NULL;

移除 b 列的非空束缚:

ALTER TABLE t1 ALTER COLUMN b drop NOT NULL;

把 t1 表挪动至另一个模式中:

create schema kunlun;ALTER TABLE public.t1 SET SCHEMA kunlun;

更多的参考链接
alter table

drop table
DROP TABLE [ IF EXISTS ] name [, ...] [ CASCADE | RESTRICT ]

删除表 t1:

drop table t1;

更多的参考链接
drop table

Index

create index
CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ [ IF NOT EXISTS ] name ] ON [ ONLY ] table_name [ USING method ]    ( { column_name | ( expression ) } [ COLLATE collation ] [ opclass ] [ ASC | DESC ] [ NULLS { FIRST | LAST } ] [, ...] )    [ INCLUDE ( column_name [, ...] ) ]    [ WITH ( storage_parameter = value [, ... ] ) ]    [ TABLESPACE tablespace_name ]    [ WHERE predicate ]

更多的参考链接
create index

alter index
ALTER INDEX [ IF EXISTS ] name RENAME TO new_nameALTER INDEX [ IF EXISTS ] name SET TABLESPACE tablespace_nameALTER INDEX name ATTACH PARTITION index_nameALTER INDEX name DEPENDS ON EXTENSION extension_nameALTER INDEX [ IF EXISTS ] name SET ( storage_parameter = value [, ... ] )ALTER INDEX [ IF EXISTS ] name RESET ( storage_parameter [, ... ] )ALTER INDEX [ IF EXISTS ] name ALTER [ COLUMN ] column_number    SET STATISTICS integerALTER INDEX ALL IN TABLESPACE name [ OWNED BY role_name [, ... ] ]    SET TABLESPACE new_tablespace [ NOWAIT ]

重命名一个现有的索引:

ALTER INDEX distributors RENAME TO suppliers;

更多的参考链接
alter index

drop index
DROP INDEX [ CONCURRENTLY ] [ IF EXISTS ] name [, ...] [ CASCADE | RESTRICT ]

删除一个存在的索引:

drop index suppliers;

更多的参考链接
drop index

Sequence

create sequence
CREATE [ TEMPORARY | TEMP ] SEQUENCE [ IF NOT EXISTS ] name [ INCREMENT [ BY ] increment ]    [ MINVALUE minvalue | NO MINVALUE ] [ MAXVALUE maxvalue | NO MAXVALUE ]    [ START [ WITH ] start ] [ CACHE cache ] [ [ NO ] CYCLE ]    [ OWNED BY { table_name.column_name | NONE } ]

创立一个名为 kunlun 的序列从 100 开始:

CREATE SEQUENCE kunlun START 100;SELECT nextval('kunlun'); nextval---------     100SELECT nextval('kunlun'); nextval---------     101

更多的参考链接
create sequence

alter sequence
ALTER SEQUENCE [ IF EXISTS ] name    [ AS data_type ]    [ INCREMENT [ BY ] increment ]    [ MINVALUE minvalue | NO MINVALUE ] [ MAXVALUE maxvalue | NO MAXVALUE ]    [ START [ WITH ] start ]    [ RESTART [ [ WITH ] restart ] ]    [ CACHE cache ] [ [ NO ] CYCLE ]    [ OWNED BY { table_name.column_name | NONE } ]ALTER SEQUENCE [ IF EXISTS ] name OWNER TO { new_owner | CURRENT_USER | SESSION_USER }ALTER SEQUENCE [ IF EXISTS ] name RENAME TO new_nameALTER SEQUENCE [ IF EXISTS ] name SET SCHEMA new_schema

更改 kunlun 序列从 200 开始:

ALTER SEQUENCE kunlun RESTART WITH 200;SELECT nextval('kunlun'); nextval---------     200SELECT nextval('kunlun'); nextval---------     201

更多的参考链接
alter sequence

drop sequence
DROP SEQUENCE [ IF EXISTS ] name [, ...] [ CASCADE | RESTRICT ]

删除 kunlun 序列:

DROP SEQUENCE kunlun;

更多的参考链接
drop sequence

View

create view
CREATE [ OR REPLACE ] [ TEMP | TEMPORARY ] [ RECURSIVE ] VIEW name [ ( column_name [, ...] ) ]    [ WITH ( view_option_name [= view_option_value] [, ... ] ) ]    AS query    [ WITH [ CASCADED | LOCAL ] CHECK OPTION ]

创立一个为 v1 的视图:

create table t1(id int, a int);insert into t1(id,a) values (1,2),(2,4),(3,8),(6,0);CREATE VIEW v1 AS    SELECT *    FROM t1    WHERE id<5;

更多的参考链接
create view

alter view
ALTER VIEW [ IF EXISTS ] name ALTER [ COLUMN ] column_name SET DEFAULT expressionALTER VIEW [ IF EXISTS ] name ALTER [ COLUMN ] column_name DROP DEFAULTALTER VIEW [ IF EXISTS ] name OWNER TO { new_owner | CURRENT_USER | SESSION_USER }ALTER VIEW [ IF EXISTS ] name RENAME TO new_nameALTER VIEW [ IF EXISTS ] name SET SCHEMA new_schemaALTER VIEW [ IF EXISTS ] name SET ( view_option_name [= view_option_value] [, ... ] )ALTER VIEW [ IF EXISTS ] name RESET ( view_option_name [, ... ] )

更改 v1 视图的名称:

alter view v1 rename to vv1;

更多的参考链接
alter view

drop view
DROP VIEW [ IF EXISTS ] name [, ...] [ CASCADE | RESTRICT ]

删除 vv1 视图:

drop view vv1;

更多的参考链接
drop view

存储过程

http://192.168.0.104/pgdocs/h...

用户和权限治理

create role
CREATE ROLE name [ [ WITH ] option [ ... ] ]where option能够是:      SUPERUSER | NOSUPERUSER    | CREATEDB | NOCREATEDB    | CREATEROLE | NOCREATEROLE    | INHERIT | NOINHERIT    | LOGIN | NOLOGIN    | REPLICATION | NOREPLICATION    | BYPASSRLS | NOBYPASSRLS    | CONNECTION LIMIT connlimit    | [ ENCRYPTED ] PASSWORD 'password' | PASSWORD NULL    | VALID UNTIL 'timestamp'    | IN ROLE role_name [, ...]    | IN GROUP role_name [, ...]    | ROLE role_name [, ...]    | ADMIN role_name [, ...]    | USER role_name [, ...]    | SYSID uid

创立一个能够登录的角色 kunlun,它没有明码:

create role kunlun LOGIN;

创立一个能够登陆且有明码的用户:kunlun1:
CREATE USERCREATE ROLE完全相同,除了它带有LOGIN;

CREATE USER kunlun1 WITH PASSWORD '12345678';

创立用户 kunlun2,明码有效期到 2024年1月1日:

CREATE ROLE kunlun2 WITH LOGIN PASSWORD '12345678' VALID UNTIL '2024-01-01';

创立一个能够创立数据库和治理角色的角色 kunlun3:

CREATE ROLE kunlun3 WITH CREATEDB CREATEROLE;

更多的参考链接
create role

alter role
ALTER ROLE role_specification [ WITH ] option [ ... ]其中option能够是:      SUPERUSER | NOSUPERUSER    | CREATEDB | NOCREATEDB    | CREATEROLE | NOCREATEROLE    | INHERIT | NOINHERIT    | LOGIN | NOLOGIN    | REPLICATION | NOREPLICATION    | BYPASSRLS | NOBYPASSRLS    | CONNECTION LIMIT connlimit    | [ ENCRYPTED ] PASSWORD 'password' | PASSWORD NULL    | VALID UNTIL 'timestamp'ALTER ROLE name RENAME TO new_nameALTER ROLE { role_specification | ALL } [ IN DATABASE database_name ] SET configuration_parameter { TO | = } { value | DEFAULT }ALTER ROLE { role_specification | ALL } [ IN DATABASE database_name ] SET configuration_parameter FROM CURRENTALTER ROLE { role_specification | ALL } [ IN DATABASE database_name ] RESET configuration_parameterALTER ROLE { role_specification | ALL } [ IN DATABASE database_name ] RESET ALL其中role_specification能够是:    role_name  | CURRENT_USER  | SESSION_USER

更改 kunlun1 的明码:

ALTER ROLE kunlun1 WITH PASSWORD '87654321';

移除 kunlun2 的明码:

ALTER ROLE kunlun2 WITH PASSWORD NULL;

更多的参考链接
alter role

drop role
DROP ROLE [ IF EXISTS ] name [, ...]

删除创立的角色 kunlun:

drop role kunlun;

更多的参考链接
drop role

grant
GRANT { { SELECT | INSERT | UPDATE | DELETE | TRUNCATE | REFERENCES | TRIGGER }    [, ...] | ALL [ PRIVILEGES ] }    ON { [ TABLE ] table_name [, ...]         | ALL TABLES IN SCHEMA schema_name [, ...] }    TO role_specification [, ...] [ WITH GRANT OPTION ]GRANT { { SELECT | INSERT | UPDATE | REFERENCES } ( column_name [, ...] )    [, ...] | ALL [ PRIVILEGES ] ( column_name [, ...] ) }    ON [ TABLE ] table_name [, ...]    TO role_specification [, ...] [ WITH GRANT OPTION ]GRANT { { USAGE | SELECT | UPDATE }    [, ...] | ALL [ PRIVILEGES ] }    ON { SEQUENCE sequence_name [, ...]         | ALL SEQUENCES IN SCHEMA schema_name [, ...] }    TO role_specification [, ...] [ WITH GRANT OPTION ]GRANT { { CREATE | CONNECT | TEMPORARY | TEMP } [, ...] | ALL [ PRIVILEGES ] }    ON DATABASE database_name [, ...]    TO role_specification [, ...] [ WITH GRANT OPTION ]GRANT { USAGE | ALL [ PRIVILEGES ] }    ON DOMAIN domain_name [, ...]    TO role_specification [, ...] [ WITH GRANT OPTION ]GRANT { USAGE | ALL [ PRIVILEGES ] }    ON FOREIGN DATA WRAPPER fdw_name [, ...]    TO role_specification [, ...] [ WITH GRANT OPTION ]GRANT { USAGE | ALL [ PRIVILEGES ] }    ON FOREIGN SERVER server_name [, ...]    TO role_specification [, ...] [ WITH GRANT OPTION ]GRANT { EXECUTE | ALL [ PRIVILEGES ] }    ON { { FUNCTION | PROCEDURE | ROUTINE } routine_name [ ( [ [ argmode ] [ arg_name ] arg_type [, ...] ] ) ] [, ...]         | ALL { FUNCTIONS | PROCEDURES | ROUTINES } IN SCHEMA schema_name [, ...] }    TO role_specification [, ...] [ WITH GRANT OPTION ]GRANT { USAGE | ALL [ PRIVILEGES ] }    ON LANGUAGE lang_name [, ...]    TO role_specification [, ...] [ WITH GRANT OPTION ]GRANT { { SELECT | UPDATE } [, ...] | ALL [ PRIVILEGES ] }    ON LARGE OBJECT loid [, ...]    TO role_specification [, ...] [ WITH GRANT OPTION ]GRANT { { CREATE | USAGE } [, ...] | ALL [ PRIVILEGES ] }    ON SCHEMA schema_name [, ...]    TO role_specification [, ...] [ WITH GRANT OPTION ]GRANT { CREATE | ALL [ PRIVILEGES ] }    ON TABLESPACE tablespace_name [, ...]    TO role_specification [, ...] [ WITH GRANT OPTION ]GRANT { USAGE | ALL [ PRIVILEGES ] }    ON TYPE type_name [, ...]    TO role_specification [, ...] [ WITH GRANT OPTION ]其中role_specification能够是:    [ GROUP ] role_name  | PUBLIC  | CURRENT_USER  | SESSION_USERGRANT role_name [, ...] TO role_name [, ...] [ WITH ADMIN OPTION ]

把表 t1 上的插入特权授予给所有用户:

GRANT INSERT ON t1 TO PUBLIC;

更多的参考链接
grant

revoke
REVOKE [ GRANT OPTION FOR ]    { { SELECT | INSERT | UPDATE | DELETE | TRUNCATE | REFERENCES | TRIGGER }    [, ...] | ALL [ PRIVILEGES ] }    ON { [ TABLE ] table_name [, ...]         | ALL TABLES IN SCHEMA schema_name [, ...] }    FROM { [ GROUP ] role_name | PUBLIC } [, ...]    [ CASCADE | RESTRICT ]REVOKE [ GRANT OPTION FOR ]    { { SELECT | INSERT | UPDATE | REFERENCES } ( column_name [, ...] )    [, ...] | ALL [ PRIVILEGES ] ( column_name [, ...] ) }    ON [ TABLE ] table_name [, ...]    FROM { [ GROUP ] role_name | PUBLIC } [, ...]    [ CASCADE | RESTRICT ]REVOKE [ GRANT OPTION FOR ]    { { USAGE | SELECT | UPDATE }    [, ...] | ALL [ PRIVILEGES ] }    ON { SEQUENCE sequence_name [, ...]         | ALL SEQUENCES IN SCHEMA schema_name [, ...] }    FROM { [ GROUP ] role_name | PUBLIC } [, ...]    [ CASCADE | RESTRICT ]REVOKE [ GRANT OPTION FOR ]    { { CREATE | CONNECT | TEMPORARY | TEMP } [, ...] | ALL [ PRIVILEGES ] }    ON DATABASE database_name [, ...]    FROM { [ GROUP ] role_name | PUBLIC } [, ...]    [ CASCADE | RESTRICT ]REVOKE [ GRANT OPTION FOR ]    { USAGE | ALL [ PRIVILEGES ] }    ON DOMAIN domain_name [, ...]    FROM { [ GROUP ] role_name | PUBLIC } [, ...]    [ CASCADE | RESTRICT ]REVOKE [ GRANT OPTION FOR ]    { USAGE | ALL [ PRIVILEGES ] }    ON FOREIGN DATA WRAPPER fdw_name [, ...]    FROM { [ GROUP ] role_name | PUBLIC } [, ...]    [ CASCADE | RESTRICT ]REVOKE [ GRANT OPTION FOR ]    { USAGE | ALL [ PRIVILEGES ] }    ON FOREIGN SERVER server_name [, ...]    FROM { [ GROUP ] role_name | PUBLIC } [, ...]    [ CASCADE | RESTRICT ]REVOKE [ GRANT OPTION FOR ]    { EXECUTE | ALL [ PRIVILEGES ] }    ON { { FUNCTION | PROCEDURE | ROUTINE } function_name [ ( [ [ argmode ] [ arg_name ] arg_type [, ...] ] ) ] [, ...]         | ALL { FUNCTIONS | PROCEDURES | ROUTINES } IN SCHEMA schema_name [, ...] }    FROM { [ GROUP ] role_name | PUBLIC } [, ...]    [ CASCADE | RESTRICT ]REVOKE [ GRANT OPTION FOR ]    { USAGE | ALL [ PRIVILEGES ] }    ON LANGUAGE lang_name [, ...]    FROM { [ GROUP ] role_name | PUBLIC } [, ...]    [ CASCADE | RESTRICT ]REVOKE [ GRANT OPTION FOR ]    { { SELECT | UPDATE } [, ...] | ALL [ PRIVILEGES ] }    ON LARGE OBJECT loid [, ...]    FROM { [ GROUP ] role_name | PUBLIC } [, ...]    [ CASCADE | RESTRICT ]REVOKE [ GRANT OPTION FOR ]    { { CREATE | USAGE } [, ...] | ALL [ PRIVILEGES ] }    ON SCHEMA schema_name [, ...]    FROM { [ GROUP ] role_name | PUBLIC } [, ...]    [ CASCADE | RESTRICT ]REVOKE [ GRANT OPTION FOR ]    { CREATE | ALL [ PRIVILEGES ] }    ON TABLESPACE tablespace_name [, ...]    FROM { [ GROUP ] role_name | PUBLIC } [, ...]    [ CASCADE | RESTRICT ]REVOKE [ GRANT OPTION FOR ]    { USAGE | ALL [ PRIVILEGES ] }    ON TYPE type_name [, ...]    FROM { [ GROUP ] role_name | PUBLIC } [, ...]    [ CASCADE | RESTRICT ]REVOKE [ ADMIN OPTION FOR ]    role_name [, ...] FROM role_name [, ...]    [ CASCADE | RESTRICT ]

从 public 发出表t1上的插入特权:

REVOKE INSERT ON t1 FROM PUBLIC;

更多的参考链接
revoke

其余DDL

truncate

TRUNCATE [ TABLE ] [ ONLY ] name [ * ] [, ... ]    [ RESTART IDENTITY | CONTINUE IDENTITY ] [ CASCADE | RESTRICT ]

更多的参考链接
truncate

根本DML

insert

[ WITH [ RECURSIVE ] with_query [, ...] ]INSERT INTO table_name [ AS alias ] [ ( column_name [, ...] ) ]    [ OVERRIDING { SYSTEM | USER } VALUE ]    { DEFAULT VALUES | VALUES ( { expression | DEFAULT } [, ...] ) [, ...] | query }    [ ON CONFLICT [ conflict_target ] conflict_action ]    [ RETURNING * | output_expression [ [ AS ] output_name ] [, ...] ]其中 conflict_target 能够是以下之一:    ( { index_column_name | ( index_expression ) } [ COLLATE collation ] [ opclass ] [, ...] ) [ WHERE index_predicate ]    ON CONSTRAINT constraint_name并且 conflict_action 是以下之一:    DO NOTHING    DO UPDATE SET { column_name = { expression | DEFAULT } |                    ( column_name [, ...] ) = [ ROW ] ( { expression | DEFAULT } [, ...] ) |                    ( column_name [, ...] ) = ( sub-SELECT )                  } [, ...]              [ WHERE condition ]

向表 t1 中插入数据:

create table t1(id int, a int);insert into t1(id,a) values (1,2);insert into t1(id,a) values (2,4),(3,8),(6,0);

更多的参考链接
insert

update

[ WITH [ RECURSIVE ] with_query [, ...] ]UPDATE [ ONLY ] table_name [ * ] [ [ AS ] alias ]    SET { column_name = { expression | DEFAULT } |          ( column_name [, ...] ) = [ ROW ] ( { expression | DEFAULT } [, ...] ) |          ( column_name [, ...] ) = ( sub-SELECT )        } [, ...]    [ FROM from_list ]    [ WHERE condition | WHERE CURRENT OF cursor_name ]    [ RETURNING * | output_expression [ [ AS ] output_name ] [, ...] ]

在 t1 表中 a 列中的 4 改为 3:

UPDATE t1 SET  a='3' WHERE id=2;

更多的参考链接
update

delete

[ WITH [ RECURSIVE ] with_query [, ...] ]DELETE FROM [ ONLY ] table_name [ * ] [ [ AS ] alias ]    [ USING from_item [, ...] ]    [ WHERE condition | WHERE CURRENT OF cursor_name ]    [ RETURNING * | output_expression [ [ AS ] output_name ] [, ...] ]

在 t1 表中删除 id 等于 6 的行:

DELETE FROM t1 WHERE id=6;

更多的参考链接
delete

select

[ WITH [ RECURSIVE ] with_query [, ...] ]SELECT [ ALL | DISTINCT [ ON ( expression [, ...] ) ] ]    [ * | expression [ [ AS ] output_name ] [, ...] ]    [ FROM from_item [, ...] ]    [ WHERE condition ]    [ GROUP BY grouping_element [, ...] ]    [ HAVING condition ]    [ WINDOW window_name AS ( window_definition ) [, ...] ]    [ { UNION | INTERSECT | EXCEPT } [ ALL | DISTINCT ] select ]    [ ORDER BY expression [ ASC | DESC | USING operator ] [ NULLS { FIRST | LAST } ] [, ...] ]    [ LIMIT { count | ALL } ]    [ OFFSET start [ ROW | ROWS ] ]    [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } { ONLY | WITH TIES } ]    [ FOR { UPDATE | NO KEY UPDATE | SHARE | KEY SHARE } [ OF table_name [, ...] ] [ NOWAIT | SKIP LOCKED ] [...] ]其中 from_item 能够是以下之一:    [ ONLY ] table_name [ * ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]                [ TABLESAMPLE sampling_method ( argument [, ...] ) [ REPEATABLE ( seed ) ] ]    [ LATERAL ] ( select ) [ AS ] alias [ ( column_alias [, ...] ) ]    with_query_name [ [ AS ] alias [ ( column_alias [, ...] ) ] ]    [ LATERAL ] function_name ( [ argument [, ...] ] )                [ WITH ORDINALITY ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]    [ LATERAL ] function_name ( [ argument [, ...] ] ) [ AS ] alias ( column_definition [, ...] )    [ LATERAL ] function_name ( [ argument [, ...] ] ) AS ( column_definition [, ...] )    [ LATERAL ] ROWS FROM( function_name ( [ argument [, ...] ] ) [ AS ( column_definition [, ...] ) ] [, ...] )                [ WITH ORDINALITY ] [ [ AS ] alias [ ( column_alias [, ...] ) ] ]    from_item [ NATURAL ] join_type from_item [ ON join_condition | USING ( join_column [, ...] ) ]并且 grouping_element 能够是以下之一:    ( )    expression    ( expression [, ...] )    ROLLUP ( { expression | ( expression [, ...] ) } [, ...] )    CUBE ( { expression | ( expression [, ...] ) } [, ...] )    GROUPING SETS ( grouping_element [, ...] )并且 with_query 是:    with_query_name [ ( column_name [, ...] ) ] AS [ [ NOT ] MATERIALIZED ] ( select | values | insert | update | delete )TABLE [ ONLY ] table_name [ * ]

进行查看 t1 表,

select * from t1;

等价于

table t1;

更多的参考链接
select

MySQL 特有的DML

insert ignore

如果数据库没有内容,就插入新的数据,如果有数据的话就跳过这条数据:

drop table if exists t1;create table t1 (a int PRIMARY KEY , b int not null,CONSTRAINT t1_b_key UNIQUE (b));insert ignore into t1(a,b) values (4,4);#反复进行insert,因为曾经存在而进行跳过insert ignore into t1(a,b) values (4,4);
replace into

查看t1外面存在的内容:

insert into t1(a,b) values (2,3);table t1; a | b---+--- 2 | 3 4 | 4(2 rows)

如果存在抵触,则先删除其余抵触的元组,而后再进行插入:

replace into t1 values(1,1),(1,2);table t1; a | b---+--- 1 | 2 2 | 3 4 | 4(3 rows)

Prepared Statement

PostgreSQL语法和示例

(应用client api调用)

PREPARE name [ ( data_type [, ...] ) ] AS statement
MySQL语法和示例

(应用client api调用)

事务处理

KunlunBase反对与MySQL雷同的事务处理性能并且与MySQL连贯协定或者PostgreSQL连贯协定无关,在这两类连贯中具备雷同的行为。

autocommit事务

在一个客户端连贯有一个autocommit变量,默认值是 true,用户能够在连贯中动静批改autocommit 的值。当autocommit为true时,用户发送的任何DML语句都会作为独立的事务来执行,语句执行完结时KunlunBase就主动提交这个事务。

DDL语句永远作为一个独立的事务来执行,无论autocommit是true还是false。 如果执行DDL时本连贯中已有运行中的事务,那么kunlunbase会先提交这个事务,而后再执行这个DDL。

显式事务

咱们把不是autocommit的事务称为显式事务。一个显式事务能够显式或者隐式地启动和提交。

显式地启动和提交/回滚事务
  1. 用户执行begin或者start transaction 显式开启一个事务
  2. 执行若干条DML语句
  3. 执行commit 显式地提交这个事务;或者执行rollback 显式地回滚这个事务
隐式开启和隐式提交事务

如果用户设置autocommit为false,那么之后本客户端连贯 C 中收到的第一条DML语句后,会先主动(隐式)启动一个事务 T,在这个事务T 中执行这条语句。后续在这个连贯 C 中收到更多DML语句时,KunlunBase会持续在事务T 中执行这些语句。

一个显示事务还能够被隐式地提交,当在显式事务T 中 KunlunBase 收到 set autocommit=true 或者任何一个DDL语句时,KunlunBase会(隐式)提交事务T。

如果在显式事务中执行某个DML语句出错,那么事务 T 在Kunlunbase外部会被主动回滚,之后用户在连贯C中 的事务T中 发送任何DML语句都会被KunlunBase疏忽,直到在连贯 C 中收到commit或者rollback后,KunlunBase会回滚事务 T 。

事务处理性能示例

autocommit事务示例

在autocommit开启状态时,用户发送的任何DML语句都会作为独立的事务来执行,语句执行完结时KunlunBase就主动提交这个事务。

set autocommit=true;show autocommit;drop table if exists t1;create table t1(a serial primary key,  b int);insert into t1(b) values(11);insert into t1(b) values(12);insert into t1(b) values(13);insert into t1(b) values(14);drop table if exists t2;

在autocommit敞开状态时,须要进行手动提交或者回滚

set autocommit=off;show autocommit;drop table if exists t1;create table t1(a serial primary key,  b int);insert into t1(b) values(11);

关上另一个会话查看t1表

进行commit提交之后才会胜利过插入

在向事务中插入一条数据,进行手动回滚

DDL语句永远作为一个独立的事务来执行

ddl语句曾经进行提交了,commit提醒没有检测到事务

set autocommit=true;show autocommit;begin;create table t1(a serial primary key,  b int);commit;

将autocommit设置为false,也是一样的

set autocommit=false;show autocommit;begin;drop table t1;commit;

显性事务示例

先敞开隐式提交,在显示中进行事务

show autocommit;set autocommit=off;begin;create table t1(a serial primary key,  b int);insert into t1(b) values(21);insert into t1(b) values(22);insert into t1(b) values(23);insert into t1(b) values(24);    

关上另一个窗口,查看t1表,因为还没进行commit;表t1中是没有数据的

咱们持续进行commit;提交

胜利提交为t1表中插入数据

无论autocommit是true还是false。 如果执行DDL时本连贯中已有运行中的事务,那么kunlunbase会先提交这个事务,而后再执行这个DDL

show autocommit;set autocommit=off;begin;drop table if exists t1;create table t1(a serial primary key,  b int);insert into t1(b) values(21);insert into t1(b) values(22);insert into t1(b) values(23);insert into t1(b) values(24);

t1目前是没有数据的

如果执行ddl语句会提交后面事务

当初曾经提交了事务,再输出commit;也是提醒没有事务须要提交

commit;

在显式事务中执行某个DML语句出错,那么事务 T 在Kunlunbase外部会被主动回滚,之后用户在连贯C中 的事务T中 发送任何DML语句都会被KunlunBase疏忽,直到在连贯 C 中收到commit或者rollback后,KunlunBase会回滚事务 T 。

set autocommit=off;show autocommit;begin;drop table if exists t1;create table t1(a serial primary key,  b int);insert into t1(b) values(11);insert into t1(c) values(12);insert into t1(b) values(13);insert into t1(b) values(14);commit;

所有数据都进行了回滚,t1表中没有任何数据

DDL事务处理和复制

用户在KunlunBase集群的任何一个计算节点执行的DDL,KunlunBase会主动复制到这个集群的所有其余计算节点执行;并且用户在连贯到多个计算节点的多个客户端连贯中能够同时执行DDL语句,KunlunBase会确保一个集群的所有计算节点依照雷同的程序执行用户发送给所有计算节点的DDL语句。因而一个KunlunBase集群的所有计算节点始终领有雷同的元数据从而正确运行。

当用户执行一个DDL语句时,KunlunBase把这个语句作为一个独立的分布式事务来运行。如果这个事务运行过程中计算节点或者存储节点或者元数据节点产生故障或者断电,那么这个分布式事务会主动被回滚,革除所有中间状态,这样用户就能够再次执行这个DDL语句。

用户不须要晓得DDL事务处理的技术细节和原理,只须要晓得在一个KunlunBase集群的任何一个计算节点都能够执行DDL语句和DML语句,就像应用单机数据库一样。

体验DDL复制性能

这里是一个例子帮忙用户体验KunlunBase集群的确具备DDL复制的能力。例如,在任何一个计算节点执行create table 语句,在集群的其余计算节点都能够应用到那个create table语句创立的表。

咱们首先须要筹备一个两个计算节点集群,便于测试.

psql postgres://abc:[email protected]:47001/postgrespsql postgres://abc:[email protected]:47004/postgres
create

在计算节点47001中创立数据库kunlundb;

在计算节点47004中进行查看

接着在47004的节点中创立t1表

create table t1(a int, b int);insert into t1 values (1,2);

在47001计算节点中存在kunlundb数据库中的t1表

insert

持续在47001中减少表t1的数据

insert into t1 values (3,4);insert into t1 values (5,6);

减少的数据在47004中也胜利复制

update

在计算节点47004中进行批改数据

update t1 set a='2' where a=3;update t1 set a='3' where a=5;

在另一个计算节点47001中,数据也是失去了同步

delete

在计算节点47001中进行删除某行数据

delete from t1 where a=1;

在另一个计算节点47004中,数据也是失去了同步

drop

在计算节点47004中进行删除表t1

同样在另一个计算节点47004中也不在表t1

Fullsync 高可用

Fullsync强同步机制

模仿备机故障体验fullsync
  1. 购买测试集群,例如购买一主两备shard,连贯shard主节点能够失常进行创立库表,插入/删除数据等操作。
  2. 将该shard两个备db的mysqld过程进行了。
  3. 连贯shard主节点进行创立库表,插入/删除数据时,都会报错返回
监控和配置Fullsync
  1. 通过XPanel监控和配置

设置fullsync_consistency_level实例变量,具体操作如下图所示。

http://www.kunlunbase.com:818...

获取fullsync_consistency_level实例变量,具体操作如下图所示。

集群展现页面,点击某个存储节点-》进入按钮,还能够查看以后节点的fullsync值。

  1. 通过SQL语句监控和配置

设置 fullsync 配置 sql:

set global fullsync_consistency_level=2

查看 fullsync 设置:

show global variables like 'fullsync_consistency_level'

主备切换

模仿主节点故障
  1. 设置主节点只读,执行该sql,set global read_only=true;
  2. 将主节点mysqld过程kill掉/调用stop脚本进行
    cluster_mgr检测到主节点异样后,触发计时工作,主节点间断故障工夫超过设置值(默认20s),则触发容灾切换。切换后选出新主,能够失常对外提供服务。
主备切换流程记录

查看主备切换流程记录有两个中央:

  1. 元数据集群的kunlun_metadata_db.rbr_consfailover表中。
  2. xpanel中集群治理->集群列表信息->主备切换中。

验证新的主节点工作失常并且没有失落已提交的事务

cluster_mgr 提供验证测试程序,下载地址如下:
https://github.com/zettadb/cl...

弹性扩缩容

XPanel

1. 弹性扩容。

留神:

  • 一个集群中至多存在两个以上shard能力发动扩容。
  • 扩容分为主动扩容和手动扩容。主动扩容与手动扩容的区别:主动扩容用户不须要人为选shard表,由零碎自动检测所需扩容表进行扩容操作;手动扩容就是用户本人选表搬迁来达到扩容目标。

(1)主动扩容。

进到XPanel中,点集群治理-》集群列表-》集群列表信息,进到集群列表页面,找到曾经建好的多shard集群,发动主动扩容。如下图所示。

如果扩容胜利,如图所示:

如果扩容失败,如图所示:

2. 手动扩容。

具体操作,如下图所示。

如果扩容胜利,如图所示:

如果扩容失败,如图所示:

cluster-mgr API

  1. 应用postman创立rbr集群:

集群创立胜利后,连贯计算节点,如:
psql postgres://abc:[email protected]:59701/postgres

建设student表并写入数据。

  1. 减少shard:

3.发动扩容:

扩容胜利后,连贯新增的shard的主,如:

mysql -uclustmgr -pclustmgr_pwd -h192.168.0.129 -P59413use postgres_$$_publicshow tables;(查看是否有student表)

预期是这样:

故障复原机制及验证办法

1. 弹性扩缩容原理介绍

弹性扩容的指标是将业务表,按需从一个 shard 上不停机的迁徙到另一个 shard 上,从而实现数据的横向扩大,其过程有如下几个阶段:数据全量导出、数据全量导入、增量数据恢复、路由切换。

数据全量导出:
该阶段,会将待迁徙表利用 mydumper 工具进行以逻辑导出,并将数据保留在磁盘上。

数据全量导入:
该阶段,会将上一阶段导出的数据,利用 myloader 工具导入到指标分片上。

增量数据恢复:
该阶段,会利用 MySQL 的主备复制机制,依据 mydumper 导出全量镜像时的一致性位点,创立一个只同步待迁徙表的 MySQL 主备同步 channel ,并命名为 expand_id(ID 为全局的工作 id),其中该 channel 的源节点为表的迁出源 shard ,指标实例为扩容的指标 shard 的主节点。待增量数据重放实现后,在不进行整个数据同步通道的前提下,对源 shard 的待迁徙表进行 rename 操作,从而阻断源 shard 对该表的读写。当在指标 shard 的主机上也发现该表曾经被 rename 后,增量数据恢复阶段完结,并对数据同步通道进行清理,进入路由切换阶段。

路由切换阶段:
路由切换阶段,会先通过写入元数据集群的形式,告诉所有的计算节点,该表的路由曾经发生变化。在写入实现后,对指标 shard 上的曾经在上一阶段被 rename 的表进行复原操作,即再次 rename 为原表名。此时源 shard 上的表还是处在 rename 状态,指标 shard 上的表曾经被复原为业务的原表名。

所有的计算几点,如果曾经更新了路由,则如果此时有业务拜访该迁徙表,则会路由到指标 shardd 上。如果该计算节点,在业务申请到来时,还没有更新到最新的路由变动,依然拜访原 shardd 此时因为源 shard 上的表名依然处于 rename 状态,因而拜访会失败,晓得更新到最新的路由后,拜访能力胜利。

2. 故障场景及复原机制

故障复原机制次要须要解决的问题是:当扩容正在进行的过程中产生故障导致扩容流程中断的状况下,如何清理中间状态的数据或者当服务复原后如何持续未实现的扩容工作的问题。

当工作中断在数据全量导出完结后:
此时工作从新发动即可。

当工作中断在数据全量导入之后:
此时工作从新发动即可

当工作中断在增量数据恢复之后
则工作间接回滚,所有的中间状态的数据被清理掉。

集群数据备份和复原

  1. 创立rbr集群:

此集群作为源集群;
集群创立胜利后,连贯计算节点,如:
psql postgres://abc:[email protected]:59701/postgres

建设student表并写入数据

  1. 发动备份操作:(需保障hdfs server已启动)

    hdfs下记录复原的工夫,如:2022-08-23 13:52
  2. 创立另一个集群:
    规格需与步骤1中的集群统一,参考步骤1,作为指标集群。
  3. 发动复原操作:

复原胜利后,链接步骤3中集群的计算节点,如:
psql postgres://abc:[email protected]:59701/postgres
查看student表是否同步到指标集群中。

XPanel 配置和发动备份

  1. cluster_mgr装置实现后,零碎会主动上报备份存储目录,在XPanel的备份存储指标治理页面中能够查看相干备份存储目录列表。如下图所示。

  1. 建好vito集群。

  1. 登录vito集群中的计算节点47001新建一个表,并退出两条数据。

  1. 对vito集群发动全量备份,操作如下图所示。

  1. 到集群备份列表中,查看备份后果信息,如下图所示。

cluster-mgr API 配置和发动备份

XPanel 启动集群复原

  1. 对已备份的vito集群进行复原操作。创立一个新的集群vito3作为复原备份的集群。如下图所示。

  1. vito3的集群计算节点是47003。登录节点查看没有任何表。

  1. 将vito集群之前的备份在vito3集群中进行复原。

  1. 回档胜利后登录vito3集群的计算节点47003查看验证是否存在t2表,下图所示示意复原集群胜利。

其余集群治理性能API和XPanel操作

减少存储集群

  1. 应用XPanel减少存储集群。具体操作如下图所示。

减少、删除计算节点

  1. 应用XPanel减少计算节点。操作如下图所示。

  1. 应用XPanel删除计算节点。如下图所示。

重做备机节点

  1. 应用XPanel重做备机节点。上面是对test1集群中的shard1存储集群的备机节点57001,进行重做备机,具体操作如下图所示。

其余集群治理性能api:
  1. 应用postman创立rbr集群:

返回内容是这样:

下同,jobid会跟着变动。

  1. 减少comps:
  2. 减少shards:
  3. 减少nodes:
  4. 删除comps:
  5. 删除shards:
  6. 删除nodes:
  7. 重做shard备机

    需保障hdfs server是失常运行的,才能够胜利。
  8. 查看clustmgr 解决api的状态:
  9. 删除集群:

查看集群节点日志

前置条件

  1. 目前kunlun集群反对配置ES来收集各模块输入日志,搭建集群时(boostrap)选配是否装置ES,如果抉择装置后,则主动实现对治理模块,计算节点和存储节点日志收集。

kibana中查看日志:
查看filebeat上报的index

配置kibana index pattern

kibana discover页面操作

目前日志依照机器级别上报,机器上可能有治理模块,计算节点或者存储节点。能够在kibana页面配置过滤规定来查看不同模块日志。

  1. 集群没有配置ES,则须要登录各个模块所在集群查看日志。

查看集群节点状态

应用XPanel 查看状态变动记录

  1. XPanel中查看集群节点的状态,如下图所示。

故障报告

找到core文件

须要用户在本人的linux机器上设置:

  1. ulimit -c unlimited
  2. sysctl -w kernel.core_pattern=core.%e.%p.%t 程序运行目录下生成core文件,这个须要root权限

应用带符号的程序文件打印调用栈

如果呈现core文体,应用gdb打印调用栈
gdb -c core文件 bin文件 回车
进入后,输出bt 回车

点击浏览原文

举荐浏览

KunlunBase架构介绍
KunlunBase技术劣势介绍
KunlunBase技术特点介绍

END

昆仑数据库是一个HTAP NewSQL分布式数据库管理系统,能够满足用户对海量关系数据的存储管理和利用的全方位需要。
利用开发者和DBA的应用昆仑数据库的体验与单机MySQL和单机PostgreSQL简直完全相同,因为首先昆仑数据库反对PostgreSQL和MySQL双协定,反对规范SQL:2011的 DML 语法和性能以及PostgreSQL和MySQL对规范 SQL的扩大。同时,昆仑数据库集群反对程度弹性扩容,数据主动拆分,分布式事务处理和分布式查询处理,强壮的容错容灾能力,欠缺直观的监测剖析告警能力,集群数据备份和复原等 罕用的DBA 数据管理和操作。所有这些性能无需任何利用零碎侧的编码工作,也无需DBA人工染指,不停服不影响业务失常运行。
昆仑数据库具备全面的OLAP 数据分析能力,通过了TPC-H和TPC-DS规范测试集,能够实时剖析最新的业务数据,帮忙用户发掘出数据的价值。昆仑数据库反对私有云和公有云环境的部署,能够与docker,k8s等云基础设施无缝合作,能够轻松搭建云数据库服务。
请拜访 http://www.kunlunbase.com/ 获取更多信息并且下载昆仑数据库软件、文档和材料。
KunlunBase我的项目已开源
【GitHub:】
https://github.com/zettadb
【Gitee:】
https://gitee.com/zettadb