共计 67641 个字符,预计需要花费 170 分钟才能阅读完成。
概述
本文档领导和帮忙 KunlunBase 用户评测和验证 KunlunBase 的各项重要性能。用户遵循本文档操作即可装置好 KunlunBase 集群并且体验和应用到 KunlunBase 的各次要性能,把本文档作为应用 KunlunBase 的疾速入门手册。KunlunBase 的残缺的文档请拜访 doc.kunlunbase.com
用户也可依据本文档来评测和验证 KunlunBase 各项性能工作失常。KunlunBase 团队在进行 KunlunBase 产品研发过程中,会继续对其各项性能开发测试程序,并且应用自动化测试零碎每天运行所有测试程序,以便及时发现和解决问题,确保 KunlunBase 各项性能工作失常。同时,咱们会应用 mysql 和 PostgreSQL 的测试集来针对 KunlunBase 的存储节点和计算节点进行功能测试。对于 KunlunBase 的自动化测试零碎的日常测试后果,请拜访 www.kunlunbase.com 查看。
KunlunBase 集群架构
KunlunBase 是一个分布式关系数据库管理系统,面向 TB 和 PB 级别海量数据处理,反对高吞吐量和低延时的极致性能来解决海量数据的高并发读写申请。它反对程度弹性扩容,提供强壮的事务 ACID 保障,高效易用的分布式查询处理,高可扩展性,高可用性和通明的分库分表数据处理性能,业务层和终端用户无感知的程度扩大能力,是典型的 NewSQL 分布式数据库系统。
KunlunBase 的上述技术能力确保用户能够在业务持续增长时,随时不停服减少服务器节点来扩充数据存储和解决 & 计算能力,并且在服务器节点产生故障或者断电时,不失落数据,并且不进行数据读写服务。确保用户业务继续以最高品质运行。
应用软件开发者只须要 依照应用单节点关系数据库雷同的办法应用昆仑数据库 ,就能够失去所有上述 NewSQL 数据库的长处,齐全不须要思考数据的存储和治理的任何细节。用户能够 应用 PostgreSQL 和 MySQL 两种连贯协定连贯到 KunlunBase 集群的计算节点并执行 DDL 和 DML SQL 语句。KunlunBase 反对规范 SQL,PostgreSQL 的 DDL 语法,以及 PostgreSQL 和 MySQL 的公有 DML 语法。因而,本来应用 PostgreSQL 和 MySQL 的应用程序不须要批改就能够应用 KunlunBase。
KunlunBase 反对 所有 SQL 数据分析性能,能够执行 TPC-H 和 TPC-DS 的所有 OLAP 剖析 SQL 语句 ,因而,用户能够把各类业务零碎中的数据继续流入 KunlunBase 集群,并且应用 KunlunBase 集群对这些继续更新的数据做 OLAP 剖析和数据挖掘,以便 迅速发现变动和趋势,抓住转瞬即逝的商机,及时应答突发状况,取得微小的竞争劣势。
即便用户的数据量只有若干 GB,也能够应用 KunlunBase 而获益。KunlunBase 的 fullsync 性能比 MySQL 自带的 semisync(半同步)和 group replication 具备更高的可靠性 — 这两种技术如果主备节点同时断电那么用户数据可能失落或者不统一,而 fullsync 确保这种状况下不会失落用户数据。此时只部署一个存储集群(例如 一主两备)即可。当用户数据和业务负载持续上升时,能够随时按需减少更多的存储集群和计算节点,从而扩充数据存储和查问能力。
同时,用户能够通过读写拆散扩大 KunlunBase 的解决能力。通过应用读写拆散技术从备机查问数据,并安顿专属的计算节点用于 OLAP 剖析查问,能够确保 OLAP 剖析工作齐全不会影响 OLTP 业务的性能。
应用 KunlunBase
为 KunlunBase 初始化服务器(BootStrap)
通过初始化(bootstrap)的服务器才能够装置 KunlunBase 集群。初始化服务器实现后,用户就能够应用 XPanel 集群管理工具装置 KunlunBase 集群。
本节介绍如何初始化计算机服务器。
应用脚本初始化服务器
- 下载脚本
git clone https://gitee.com/zettadb/clo…
cd cloudnative/cluster - 填写配置文件,具体参数阐明参考 http://www.kunlunbase.com:818…
-
填写完配置文件后,开始初始化服务器
python setup_cluster_manager.py --config=cluster_and_node_mgr.json --action=install bash -e clustermgr/install.sh
创立 KunlunBase 集群
以下操作皆须要在初始化服务器后进行
XPanel
-
进入 xpanel web 端:http://host:17000/KunlunXPanel
a. host 就是下载 docker 镜像并运行的服务器对应 ip,能够通过 ifconfig 查看
b. 示例中的 17000 就是在运行 docker 镜像时映射的商品
- 首次进入,账号和明码都是 super_dba, 元数据节点用主的 ip 和端口就行
- 随后会须要批改明码,批改完明码后,进入到 xpanel 主界面
- 减少计算机列表
a. 查看须要减少的计算机是否存在列表中,不存在则进行该步骤,存在则跳过
b. 目前只能一个一个增加,该列表里的对应的服务器必须装置有 node_mgr, 且配置正确
c. 系统管理 — 用户治理 — 新增 — 填写对应的参数 — 确认
-
在新增完计算机后,就能够创立集群了
a. 集群治理 — 集群列表 — 新增
b. 依据需要填写对应的参数值
c. 等集群装置好后,能够在 集群展现 界面查看对应的节点信息和主备关系
集群治理 — 集群列表 — 点击对应的集群
应用 KunlunBase 做数据读写
各编程语言筹备应用 KunlunBase
以后 KunlunBase 同时反对 postgres 和 mysql 协定。
计算节点 mysql 协定端口号能够通过进入计算节点 pg 端口,应用 show mysql_port; 查看
命令行连贯
-
连贯 Kunlunbase pg 协定,能够通过 tcp 和 url 的形式进行连贯
- Tcp:psql -h host -p port -U username -d databaseName
- url: psql postgres://userName:[email protected]:port/databaseName
-
连贯 KunlunBase mysql 协定,能够通过 tcp 和 socket 的形式进行连贯
- TCP:mysql -h host -P port -ppassword -u userName
- 留神:- p 和明码之间不能有空格
各语言 connector 下载方式
-
go 和 rust 都能够间接通过自带的包管理器下载所需依赖,不须要额定装置
- go 举荐 1.17.5 版本或者更高版本
- rust 举荐 1.59.0 版本或者更高版本
java
在 maven 仓库抉择好对应的 jar 包版本后,点击 jar 就能够下载对应 jar 包
-
mysql connector/j
- https://dev.mysql.com/downloa… mysql 官网下载
- https://mvnrepository.com/art… maven 仓库
-
Postgresql JDBC Driver
- https://mvnrepository.com/art… maven 仓库
-
mariadb connector/j – 3.x 版本
- https://mariadb.org/connector… mariadb 官网,官网只有最新的 3.x 版本
- https://mvnrepository.com/art… maven 仓库
-
mariadb connector/j – 2.x 版本
- http://zettatech.tpddns.cn:14… 泽拓科技官网
- 因为官网 2.0 版本在连贯计算节点会有一些 bug,所以不倡议间接应用 mariadb 官网的 2.x 版本 connector
python
-
Psycopg2
- pip install psycopg2 通过 pip 装置
- pip install psycopg2==$version 通过 pip 装置
- https://github.com/psycopg/ps… 通过 setup.pg build
-
mysql-connector/python
- https://downloads.mysql.com/a… mysql 官网
- pip install mysql-connector-python 通过 pip 装置 8.x 版本 mysql connector/python
- https://github.com/mysql/mysq… 通过 setup.pg build
-
pymysql
- pip install pymysql 通过 pip 装置
- https://github.com/PyMySQL/Py… 通过 setup.pg build
nodejs
-
pg
- npm install pg 通过 npm 工具装置 pg connector
-
mysql
- npm install mysql 通过 npm 工具装置 mysql connector
- https://downloads.mysql.com/a… mysql 官网下载
php
-
mysql
- https://downloads.mysql.com/a… mysql 官网下载
c
-
postgresql
- sudo apt-get install libpq++-dev -y ubuntu 装置 pg 依赖
-
mysql
- sudo apt-get install libmysql++-dev -y ubuntu 装置 mysql 依赖
c#
-
postgresql
- dotnet add package Npgsql
-
mysql
- dotnet add package MySql.Data
c++
-
postgresql
sudo apt install libpq-dev git clone https://github.com/jtv/libpqxx.git cd libpqxx ./configure make sudo make install
-
mysql
- sudo apt-get install libmysqlclient-dev 装置 c ++ 连贯 mysql 的依赖
- https://downloads.mysql.com/a… mysql 官网 connector 下载
应用各编程语言对 KunlunBase 执行增删改查 DML 语句
以下所有示例能够在 https://gitee.com/zettadb/clo… 这里下载
- pg 相干示例在 cloudnative/smoke 目录下
- mysql 相干示例在 cloudnative/smoke/somkeTest-mysql 目录下
java
- Postgresql-JDBC-Driver
package kunlun.test;
/*
* Copyright (c) 2019 ZettaDB inc. All rights reserved.
* This source code is licensed under Apache 2.0 License,
* combined with Common Clause Condition 1.0, as detailed in the NOTICE file.
*/
import java.io.BufferedReader;
import java.io.FileReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
public class SmokeTest {
static {
try {Class.forName("org.postgresql.Driver");
//Class.forName("com.mysql.cj.jdbc.Driver");
} catch (Exception ex) {}}
public static Connection getConnection(String user,
String password,
String host,
int port,
String dbname) {
String proto = "postgresql";
Properties props = new Properties();
props.setProperty("user", user);
props.setProperty("password", password);
String url = "jdbc:" + proto+"://" + host + ":" + port + "/" + dbname;
try {return DriverManager.getConnection(url, props);
} catch (Exception ex) {ex.printStackTrace();
return null;
}
}
public static void smokeTest(Connection conn) throws Exception{boolean autocommit = conn.getAutoCommit();
System.out.println("default autocommit:" + autocommit);
conn.setAutoCommit(true);
Statement st =conn.createStatement();
st.execute("SET client_min_messages TO'warning';");
st.execute("drop table if exists t1;");
st.execute("RESET client_min_messages;");
String createSql = "create table t1(id integer primary key," +
"info text, wt integer);";
st.execute(createSql);
st.execute("insert into t1(id,info,wt) values(1,'record1', 1);");
st.execute("insert into t1(id,info,wt) values(2,'record2', 2);");
st.execute("update t1 set wt = 12 where id = 1;");
ResultSet res1 = st.executeQuery("select * from t1;");
System.out.printf("res1:%s%n", showResults(res1).toString());
res1.close();
st.close();
String pstr = "select * from t1 where id=?";
PreparedStatement ps = conn.prepareStatement(pstr);
ps.setInt(1, 1);
ResultSet pres = ps.executeQuery();
System.out.printf("pres1:%s%n", showResults(pres).toString());
ps.setInt(1, 2);
pres = ps.executeQuery();
System.out.printf("pres2:%s%n", showResults(pres).toString());
ps.close();
pstr = "update t1 set info=? , wt=? where id=?";
ps = conn.prepareStatement(pstr);
ps.setString(1, "Rec1");
ps.setInt(2, 2);
ps.setInt(3, 1);
ps.execute();
ps.setString(1, "Rec2");
ps.setInt(2, 3);
ps.setInt(3, 2);
ps.execute();
ps.close();
st =conn.createStatement();
ResultSet res2 = st.executeQuery("select * from t1;");
System.out.printf("res2:%s%n", showResults(res2).toString());
res2.close();
st.execute("delete from t1 where id = 1;");
ResultSet res3 = st.executeQuery("select * from t1;");
System.out.printf("res3:%s%n", showResults(res3).toString());
res3.close();
st.execute("drop table t1;");
st.close();
conn.setAutoCommit(autocommit);
}
/*
* We do the following actions:
* 1 Create the able
* 2 Insert two records
* 3 Update the first record.
* 4 Query the records(res1).
* 5 Delete the second record.
* 6 Query the records again(res2).
* 7 Drop the table.
*/
public static void smokeTestFile(Connection conn, String cmdfile) throws Exception{boolean autocommit = conn.getAutoCommit();
System.out.println("default autocommit:" + autocommit);
conn.setAutoCommit(true);
Statement st =conn.createStatement();
BufferedReader br = new BufferedReader(new FileReader(cmdfile));
String cmd = null;
do {cmd = br.readLine();
if (cmd == null) {break;}
if (cmd.toUpperCase().startsWith("SELECT")) {ResultSet res = st.executeQuery(cmd);
System.out.printf("sql:%s, res:%s%n", cmd,
showResults(res).toString());
res.close();} else {st.execute(cmd);
}
} while (cmd != null);
br.close();
st.close();
conn.setAutoCommit(autocommit);
}
private static List<List<String>> showResults(ResultSet res)
throws Exception {LinkedList<List<String>> results = new LinkedList<>();
int cols = res.getMetaData().getColumnCount();
while (res.next()) {List<String> row = new ArrayList<>(cols);
for (int i = 0; i < cols; i++) {row.add(res.getString(i + 1));
}
results.addLast(row);
}
return results;
}
public static void test1(String[] args) throws Exception{String host = args[0];
int port = Integer.valueOf(args[1]);
String user = "abc";
String password = "abc";
String database = "postgres";
Connection conn = getConnection(user, password, host, port, database);
smokeTest(conn);
conn.close();}
public static void main(String[] args) throws Exception {test1(args);
}
}
-
mysql connector/j
- 编译:javac test.java
- 运行:java -cp .:./mysql-connector-java-$version.jar test
import java.sql.DriverManager;
import java.util.ArrayList;
import java.sql.*;
import java.util.*;
public class mysql {
// MySQL 8.0 以下版本 - JDBC 驱动名及数据库 URL
static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
// MySQL 8.0 以上版本 - JDBC 驱动名及数据库 URL
//static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";
// 数据库的用户名与明码,须要依据本人的设置
static final String USER = "user";
static final String PASS = "pwd";
public static void main(String[] args) {
Connection conn = null;
Statement stmt = null;
// 解决传入的参数
String host = Arrays.toString(args);
String urls1 = "jdbc:mysql://" + host + "/postgres";
String urls2 = urls1.replace("[","");
String urls = urls2.replace("]","");
try{
// 注册 JDBC 驱动
Class.forName(JDBC_DRIVER);
// 关上链接
System.out.println("连贯数据库" + host + "...");
conn = DriverManager.getConnection(urls,USER,PASS);
// 执行查问
System.out.println("实例化 Statement 对象...");
stmt = conn.createStatement();
ArrayList sqls = new ArrayList();
sqls.add("drop table if exists myjdbc_sm;");
sqls.add("create table myjdbc_sm(a int primary key, b text);");
sqls.add("insert into myjdbc_sm values(1,'abc'),(2,'bcd'),(3,'cde');");
sqls.add("select * from myjdbc_sm;");
sqls.add("update myjdbc_sm set b ='def'where a = 1;");
sqls.add("select * from myjdbc_sm;");
sqls.add("delete from myjdbc_sm where a = 3;");
sqls.add("select * from myjdbc_sm;");
for (int i = 0; i <= 7; ++i){String sql = (String) sqls.get(i);
System.out.println(sql);
if (sql == "select * from myjdbc_sm;"){ResultSet rs = stmt.executeQuery(sql);
while(rs.next()) {int a = rs.getInt("a");
String b = rs.getString("b");
System.out.print("a:" + a + "b:" + b + "\n");
}
rs.close();}
else {stmt.execute(sql);
}
}
stmt.close();
conn.close();}catch(SQLException se){
// 解决 JDBC 谬误
se.printStackTrace();}catch(Exception e){
// 解决 Class.forName 谬误
e.printStackTrace();}finally{
// 敞开资源
try{if(stmt!=null) stmt.close();}catch(SQLException se2){ }// 什么都不做
try{if(conn!=null) conn.close();}catch(SQLException se){se.printStackTrace();
}
}
System.out.println("Goodbye!");
}
}
python
- Psycopg2
import psycopg2
import sys
def test(hoststr, portstr):
intport = int(portstr)
conn = psycopg2.connect(host=hoststr, port=intport, user='abc', password='abc', database='postgres')
conn.autocommit = True
cur = conn.cursor()
sqls=["SET client_min_messages TO'warning';",
"drop table if exists t1111",
"RESET client_min_messages;",
"create table t1111(id int primary key, info text, wt int)",
"insert into t1111(id,info,wt) values(1,'record1', 1)",
"insert into t1111(id,info,wt) values(2,'record2', 2)",
"update t1111 set wt = 12 where id = 1", "select * from t1111",
"delete from t1111 where id = 1", "select * from t1111",
"prepare q1(int) as select*from t1111 where id=$1",
"begin",
"execute q1(1)",
"execute q1(2)",
"prepare q2(text,int, int) as update t1111 set info=$1 , wt=$2 where id=$3",
"execute q2('Rec1',2,1)",
"commit",
"execute q2('Rec2',3,2)",
"drop table t1111"]
for sql in sqls:
res = cur.execute(sql+";")
print "command:%s, res:%s" % (sql, str(res))
if __name__ == '__main__':
test(sys.argv[1], sys.argv[2])
- mysql connector/python
import argparse
from time import sleep
from mysql import connector
def test(sql, host, port, user, pwd, db):
conn = connector.connect(buffered=True, host=host, port=port, user=user, passwd=pwd, database=db, ssl_disabled=True)
cur = conn.cursor()
print(sql)
if sql == 'select * from mysql_connector_python;':
cur.execute(sql)
rs = cur.fetchall()
srs = str(rs)
srs = srs.replace('[(', '')
srs = srs.replace(')]', '')
srs = srs.replace('), (', '\n------\n')
srs = srs.replace(',', '|')
srs = srs.replace('\'', '')
print('--------\na | b\n------\n' + srs + '\n--------')
else:
cur.execute(sql)
conn.commit()
cur.close()
conn.close()
def execSql(host, port, user, pwd, db):
sql = ['drop table if exists mysql_connector_python;',
'create table mysql_connector_python(a int primary key, b text);',
"insert into mysql_connector_python values(1,'a'),(2,'b'),(3,'c');",
'select * from mysql_connector_python;',
"update mysql_connector_python set b ='abc'where a = 3;",
'select * from mysql_connector_python;',
'delete from mysql_connector_python where a = 3;',
'select * from mysql_connector_python;']
for sqls in sql:
test(sqls, host, port, user, pwd, db)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description = 'this script is use to test ddl replication!')
parser.add_argument('--host', help='host')
parser.add_argument('--port', default=3306, help='port')
parser.add_argument('--db', default='mysql', help='database name')
parser.add_argument('--pwd', default='root', help='password')
parser.add_argument('--user', default='root', help='user name')
args = parser.parse_args()
host = args.host
port = args.port
db = args.db
pwd = args.pwd
user = args.user
print(host, str(port))
execSql(host, port, user, pwd, db)
nodejs
- postgresql
const {findSourceMap} = require('module');
const {CLIENT_RENEG_LIMIT} = require('tls');
const pg=require('./node_modules/pg');
var arguments = process.argv.splice(2);
console.log('host:', arguments[0], 'port:', arguments[1]);
var conString = ('postgres://abc:[email protected]'+arguments[0]+':'+arguments[1]+'/postgres');
var client = new pg.Client(conString);
client.connect(function(err){if(err){return console.error('数据库连贯出错',err);
}
console.log("")
console.log("=========== JS Driver ==============");
client.query('drop table if exists smoketesttable_js;',function(err,data){if(err){return console.error('step 1 : droped table failed!',err);
}else{console.log('step 1 : drop table success!')
}
})
client.query('drop table if exists smoketesttable_js1;');// 再运行一次的起因是因为如果失败了就只有一个 failed! 提醒,没有报错信息。所以再运行一次让这个报错信息显示进去
client.query('create table smoketesttable_js(id int primary key,name text,gender text);',function(err,data){if(err){return console.error('step 2 : create failed!',err);
}else{console.log('step 2 : create table success!')
}
})
client.query('create table smoketesttable_js1(id int primary key,name text,gender text);')
client.query("insert into smoketesttable_js values(1,'name1','male'),(2,'name2','female'),(3,'name3','male');",function(err,data){if(err){return console.error('step 3 : insert failed!',err);
}else{console.log('step 3 : insert data success!')
}
})
client.query("insert into smoketesttable_js1 values(1,'name1','male'),(2,'name2','female'),(3,'name3','male');")
client.query("delete from smoketesttable_js where id = 1;",function(err){if(err){return console.error('step 4 : delete failed!')
}else{console.log("step 4 : delete data success!")
}
})
client.query("delete from smoketesttable_js1 where id = 1;")
client.query("update smoketesttable_js set gender ='male'where id = 2;",function(err){if(err){return console.error("step 5 : update failed!")
}else{console.log('step 5 : update gender success!')
}
})
client.query("update smoketesttable_js1 set gender ='male'where id = 2;")
client.query("select * from smoketesttable_js;",function(err){if(err){return console.error("select failed!")
client.query("step 6 : select * from smoktesttable_js;")
}else{console.log('step 6 : select table success!')
}
})
client.query("select * from smoketesttable_js1;")
client.query("commit",function(err){if(err){return console.error("select failed!")
}else{console.log('step 6 : commit success!')
}
client.end();
console.log("====================================");
console.log("")
})
})
- mysql
// 应用 process 获取命令行传的参数
var arguments = process.argv.splice(2);
var hosts = arguments[0];
var ports = arguments[1];
var mysql = require('mysql');
var connection = mysql.createConnection({
host : hosts,
user : 'abc',
password : 'abc',
port: ports,
database: 'postgres'
});
//console.log(connection)
connection.connect();
var sql1 = 'drop table if exists myjs_sm';
var sql2 = 'create table if not exists myjs_sm(a int primary key,b text);'
var sql3 = 'insert into myjs_sm values(1,"abc")';
var sqls = 'select * from myjs_sm';
var sql4 = 'update myjs_sm set b ="asd"where a = 1';
var sql5 = 'delete from myjs_sm where a = 1';
var execsql=function(arg1){
// 查
connection.query(arg1, function (err, result) {if(err){console.log(err.message);
return;
}
console.log('-----------------------["' + arg1 + '"]');
console.log(result);
console.log('------------------------------------------------------------\n\n');
});
//connection.end();}
execsql(sql1);
execsql(sql2);
execsql(sql3);
execsql(sqls);
execsql(sql4);
execsql(sqls);
execsql(sql5);
connection.end();
php
-
postgresql
- php smoketest-pg.php host port
<?php
/*
* Copyright (c) 2019 ZettaDB inc. All rights reserved.
* This source code is licensed under Apache 2.0 License,
* combined with Common Clause Condition 1.0, as detailed in the NOTICE file.
*/
function showresults($ret) {
echo "=== select results ===\n";
while ($row = pg_fetch_row($ret)) {
$str="";
foreach ($row as $item){$str .= $item." ";}
echo "row: $str\n";
}
}
function checkret($db, $cmd, $ret) {if (!$ret) {echo $cmd.pg_last_error($db)."\n";
exit(1);
} else {echo $cmd.":success\n";}
}
$host = "host=$argv[1]";
$port = "port=$argv[2]";
$dbname = "dbname=postgres";
$credentials = "user=abc password=abc";
$connstr="$host $port $dbname $credentials";
echo "conn string: $connstr\n";
$db = pg_connect($connstr);
if(!$db){
echo "Error : Unable to open database\n";
exit(1);
} else {echo "Opened database successfully\n";}
$sql = "SET client_min_messages TO'warning';";
$ret = pg_query($db, $sql);
checkret($db, $sql, $ret);
$sql = "drop table if exists t1;";
$ret = pg_query($db, $sql);
checkret($db, $sql, $ret);
$sql = "RESET client_min_messages;";
$ret = pg_query($db, $sql);
checkret($db, $sql, $ret);
$sql = "create table t1(id integer primary key, info text, wt integer);";
$ret = pg_query($db, $sql);
checkret($db, $sql, $ret);
$sql = "insert into t1(id,info,wt) values(1,'record1', 1);";
$ret = pg_query($db, $sql);
checkret($db, $sql, $ret);
$sql = "insert into t1(id,info,wt) values(2,'record2', 2);";
$ret = pg_query($db, $sql);
checkret($db, $sql, $ret);
$sql = "update t1 set wt = 12 where id = 1;";
$ret = pg_query($db, $sql);
checkret($db, $sql, $ret);
$sql = "select info from t1;";
$ret = pg_query($db, $sql);
if ($ret) {showresults($ret);
}
$sql = 'select * from t1 where id=$1';
$ret = pg_prepare($db, "prep", $sql);
checkret($db, "prep:".$sql, $ret);
$ret = pg_execute($db, "prep", array(1));
showresults($ret);
$ret = pg_execute($db, "prep", array(2));
showresults($ret);
$sql = 'update t1 set info=$1 , wt=$2 where id=$3';
$ret = pg_prepare($db, "prep2", $sql);
checkret($db, "prep:".$sql, $ret);
$ret = pg_execute($db, "prep2", array('Rec1', 2, 1));
checkret($db, "prep2:{Rec1,2,1}", $ret);
$ret = pg_execute($db, "prep2", array('Rec2', 3, 2));
checkret($db, "prep2:{Rec2,3,2}", $ret);
$sql = "select * from t1;";
$ret = pg_query($db, $sql);
if ($ret) {showresults($ret);
}
$sql = "delete from t1 where id = 1;";
$ret = pg_query($db, $sql);
checkret($db, $sql, $ret);
$sql = "select * from t1;";
$ret = pg_query($db, $sql);
if ($ret) {showresults($ret);
}
pg_close($db)
?>
-
mysql
- php smoketest-my.php host port
<?php
$host = "$argv[1]";
$port = "$argv[2]";
$dbname = "postgres";
$user = "abc";
$pwd = "abc";
$conn = mysqli_connect($host, $user, $pwd, $dbname, $port) or die("数据库连贯谬误!");
$sql1 = "drop table if exists myphp_sm \n";
$sql2 = "create table if not exists myphp_sm(a int primary key, b text)\n";
$sql3 = "insert into myphp_sm values (1,'abc')\n";
$sql4 = "select * from myphp_sm\n";
$sql5 = "update myphp_sm set b ='asd'where a = 1\n";
$sql6 = "select * from myphp_sm\n";
$sql7 = "delete from myphp_sm where a = 1\n";
$rs = mysqli_query($conn, $sql1);
echo "$sql1 success\n";
$rs = mysqli_query($conn, $sql2);
echo "$sql2 success\n";
$rs = mysqli_query($conn, $sql3);
echo "$sql3 success\n";
$rs = mysqli_query($conn, $sql4);
echo "$sql4 success\n";
$row = mysqli_fetch_array($rs);
var_dump($row);
$rs = mysqli_query($conn, $sql5);
echo "$sql5 success\n";
$rs = mysqli_query($conn, $sql6);
echo "$sql6 success\n";
$row = mysqli_fetch_array($rs);
var_dump($row);
$rs = mysqli_query($conn, $sql7);
echo "$sql7 success\n";
?>
go
-
mysql
- 环境设置及编译
go env -w GO111MODULE=on
go env -w GOPROXY=https://goproxy.cn,direct
go mod init smoketest_my #初始化脚本
go mod tidy #该步 go 的包管理器会把依赖库主动解决好
go build # build
- ./smoketest_my -h host -p port
package main
import (
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"flag"
)
func checkError(err error) {
if err != nil {panic(err)
}
}
func main() {
var User string
var Pwd string
var Host string
var Port int
var Dbname string
flag.StringVar(&Host,"h",""," 默认为空 ")
flag.IntVar(&Port,"p",5001,"默认为 5001")
flag.StringVar(&Pwd,"pwd","abc","默认为 abc")
flag.StringVar(&Dbname,"d","postgres","默认为 postgres")
flag.StringVar(&User,"u","abc","默认为 abc")
flag.Parse()
fmt.Println("============= Golang-mysql ============")
// Initialize connection string.
var connectionString string = fmt.Sprintf("%s:%[email protected](%s:%d)/%s?charset=utf8", User, Pwd, Host, Port, Dbname)
// Initialize connection object.
db, err := sqlx.Open("mysql", connectionString)
checkError(err)
err = db.Ping()
checkError(err)
fmt.Println("Successfully connection to database!!!")
// Drop table
_, err = db.Exec("drop table if exists mygo_sm;")
checkError(err)
fmt.Println("Successfully drop table")
// Create table.
_, err = db.Exec("create table mygo_sm(id int primary key,name text,gender text);")
checkError(err)
fmt.Println("Successfully create table")
// Insert
// sql_statement := "insert into mygo_sm values ($1, $2, $3);"
_, err = db.Exec("insert into mygo_sm values( 1,'banana','male')")
checkError(err)
_, err = db.Exec("insert into mygo_sm values(2,'orange','female')")
checkError(err)
_, err = db.Exec("insert into mygo_sm values(3,'apple','male')")
checkError(err)
fmt.Println("Successfully insert table")
_, err = db.Exec("delete from mygo_sm where id = 2")
checkError(err)
fmt.Println("Successfully delete table")
_, err = db.Exec("update mygo_sm set name ='update'where id = 3")
checkError(err)
fmt.Println("Successfully update table")
_, err = db.Exec("select * from mygo_sm")
checkError(err)
fmt.Println("Successfully select table")
fmt.Println("=================================")
}
-
postgresql
- 环境设置及编译,参考上条
- ./smoketest -h host -p port
package main
import (
"database/sql"
"fmt"
"flag"
_ "github.com/lib/pq"
)
const (
// Initialize connection constants.
//HOST = "mydemoserver.postgres.database.azure.com"
//DATABASE = "postgres"
//USER = "abc"
//PASSWORD = "abc"
)
func checkError(err error) {
if err != nil {panic(err)
}
}
func main() {
var User string
var Pwd string
var Host string
var Port int
var Db string
flag.StringVar(&Host,"h",""," 默认为空 ")
flag.IntVar(&Port,"p",5001,"默认为 5001")
flag.StringVar(&Pwd,"pwd","abc","默认为 abc")
flag.StringVar(&Db,"d","postgres","默认为 postgres")
flag.StringVar(&User,"u","abc","默认为 abc")
flag.Parse()
fmt.Println("============= Golang ============")
// Initialize connection string.
var connectionString string = fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", Host, Port,User,Pwd,Db)
// Initialize connection object.
db, err := sql.Open("postgres", connectionString)
checkError(err)
err = db.Ping()
checkError(err)
fmt.Println("Successfully connection to database!!!")
// Drop previous table of same name if one exists.
_, err = db.Exec("drop table if exists SmokeTestTable_go;")
checkError(err)
fmt.Println("Successfully drop table")
// Create table.
_, err = db.Exec("create table SmokeTestTable_go(id int primary key,name text,gender text);")
checkError(err)
fmt.Println("Successfully create table")
// Insert some data into table.
sql_statement := "insert into SmokeTestTable_go values ($1, $2, $3);"
_, err = db.Exec(sql_statement, 1, "banana", "male")
checkError(err)
_, err = db.Exec(sql_statement, 2, "orange", "female")
checkError(err)
_, err = db.Exec(sql_statement, 3, "apple", "male")
checkError(err)
fmt.Println("Successfully insert table")
_, err = db.Exec("delete from Smoketesttable_go where id = 2")
checkError(err)
fmt.Println("Successfully delete table")
_, err = db.Exec("update SmokeTestTable_go set name ='update'where id = 3")
checkError(err)
fmt.Println("Successfully update table")
_, err = db.Exec("select * from SmokeTestTable_go")
checkError(err)
fmt.Println("Successfully select table")
fmt.Println("=================================")
}
c
-
postgresql
- gcc -o smokeTest smokeTest.c -I/path/postgresql-11.5-rel/include -L/path/postgresql-11.5-rel/lib -lpq
- ./smokeTest “dbname = postgres host=127.0.0.1 port=5401 user=user password=pwd”
/*
* Copyright (c) 2019 ZettaDB inc. All rights reserved.
* This source code is licensed under Apache 2.0 License,
* combined with Common Clause Condition 1.0, as detailed in the NOTICE file.
*
* gcc -o smokeTest smokeTest.c -I/path/postgresql-11.5-rel/include -L/path/postgresql-11.5-rel/lib -lpq
* ./smokeTest "dbname = postgres host=127.0.0.1 port=5401 user=abc password=abc"
*
* Test the C version of libpq, the PostgreSQL frontend library.
*/
#include <stdio.h>
#include <stdlib.h>
#include "libpq-fe.h"
static void
exit_nicely(PGconn *conn)
{PQfinish(conn);
exit(1);
}
int
main(int argc, char **argv)
{
const char *conninfo;
PGconn *conn;
PGresult *res;
int nFields;
int i,
j;
/*
* If the user supplies a parameter on the command line, use it as the
* conninfo string; otherwise default to setting dbname=postgres and using
* environment variables or defaults for all other connection parameters.
*/
if (argc > 1)
conninfo = argv[1];
else
conninfo = "dbname = postgres host=192.168.0.104 port=6401 user=abc password=abc";
/* Make a connection to the database */
conn = PQconnectdb(conninfo);
/* Check to see that the backend connection was successfully made */
if (PQstatus(conn) != CONNECTION_OK)
{
fprintf(stderr, "Connection to database failed: %s",
PQerrorMessage(conn));
exit_nicely(conn);
}
/* Set always-secure search path, so malicous users can't take control. */
res = PQexec(conn, "SET client_min_messages TO'warning';");
PQclear(res);
res = PQexec(conn, "drop table if exists t1");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{fprintf(stderr, "drop table failed: %s", PQerrorMessage(conn));
PQclear(res);
exit_nicely(conn);
} else {fprintf(stderr, "drop table ok\n");
}
PQclear(res);
res = PQexec(conn, "RESET client_min_messages;");
PQclear(res);
/*
* Our test case here involves using a cursor, for which we must be inside
* a transaction block. We could do the whole thing with a single
* PQexec() of "select * from pg_database", but that's too trivial to make
* a good example.
*/
/* Start a transaction block */
res = PQexec(conn, "create table t1(id integer primary key,info text, wt integer)");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{fprintf(stderr, "create table command failed: %s", PQerrorMessage(conn));
PQclear(res);
exit_nicely(conn);
} else {fprintf(stderr, "create table ok\n");
}
PQclear(res);
/*
* Fetch rows from pg_database, the system catalog of databases
*/
res = PQexec(conn, "insert into t1(id,info,wt) values(1,'record1', 1)");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{fprintf(stderr, "insert record 1 failed: %s", PQerrorMessage(conn));
PQclear(res);
exit_nicely(conn);
} else {fprintf(stderr, "insert record 1 ok\n");
}
PQclear(res);
res = PQexec(conn, "insert into t1(id,info,wt) values(2,'record2', 2)");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{fprintf(stderr, "insert record 2 failed: %s", PQerrorMessage(conn));
PQclear(res);
exit_nicely(conn);
} else {fprintf(stderr, "insert record 2 ok\n");
}
PQclear(res);
res = PQexec(conn, "update t1 set wt = 12 where id = 1");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{fprintf(stderr, "update record failed: %s", PQerrorMessage(conn));
PQclear(res);
exit_nicely(conn);
} else {fprintf(stderr, "update record ok\n");
}
PQclear(res);
res = PQexec(conn, "select * from t1");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{fprintf(stderr, "select failed: %s", PQerrorMessage(conn));
PQclear(res);
exit_nicely(conn);
} else {int nFields = PQnfields(res);
for(i=0;i<nFields;i++)
{fprintf(stderr, "%s\t\t",PQfname(res,i));
}
fprintf(stderr, "\n");
for(i=0;i<PQntuples(res);i++)
{for(j=0;j<nFields;j++)
{fprintf(stderr, "%s\t\t",PQgetvalue(res,i,j));
}
fprintf(stderr, "\n");
}
}
PQclear(res);
res = PQexec(conn, "delete from t1 where id = 1");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{fprintf(stderr, "delete record failed: %s", PQerrorMessage(conn));
PQclear(res);
exit_nicely(conn);
} else {fprintf(stderr, "delete record ok\n");
}
PQclear(res);
res = PQexec(conn, "drop table t1");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{fprintf(stderr, "drop table failed: %s", PQerrorMessage(conn));
PQclear(res);
exit_nicely(conn);
} else {fprintf(stderr, "drop table ok\n");
}
PQclear(res);
/* close the connection to the database and cleanup */
PQfinish(conn);
return 0;
}
-
mysql
- gcc -I/usr/include/mysql -L/usr/lib/mysql mysql.c -lmysqlclient -o mysql
- ./mysql host port
#include <stdio.h>
#include <mysql.h>
int main(int argc,char** argv){printf("connect to %s:%s\n",argv[1], argv[2]);
printf("version: %s\n", mysql_get_client_info());
MYSQL* my = mysql_init(NULL);
int port = atoi(argv[2]);
if(!mysql_real_connect(my, ("%s", argv[1]), "abc", "abc", "postgres", port, NULL, 0)){printf("connect error !\n");
mysql_close(my);
}
printf("drop table if exists myc_sm;\n");
mysql_query(my, "drop table if exists myc_sm;");
printf("create table myc_sm;\n");
mysql_query(my, "create table myc_sm(a int primary key, b text);");
printf("insert into myc_sm values(1,'abc'),(2,'bcd'),(3,'cde')\n");
mysql_query(my, "insert into myc_sm values(1,'abc'),(2,'bcd'),(3,'cde')");
void select(void)
{printf("\n\nselect * from myc_sm;\n");
int res = mysql_query(my, "select * from myc_sm;");
MYSQL_RES* a = mysql_store_result(my);
int rows = mysql_num_rows(a);
int cols = mysql_num_fields(a);
printf("rows: %d, cols: %d\n", rows, cols);
MYSQL_FIELD *field = mysql_fetch_fields(a);
for(int i = 0; i < cols; i++)
{printf("%-10s\t", field[i].name);
}
puts("");
MYSQL_ROW line;
for(int i = 0; i < rows; i++)
{line = mysql_fetch_row(a);
for(int j = 0; j < cols; j++)
{printf("%-10s\t", line[j]);
}
puts("");
}
}
select();
printf("update myc_sm set b ='def'where a = 1;");
mysql_query(my, "update myc_sm set b ='def'where a = 1;");
select();
printf("delete from myc_sm where a = 3;");
mysql_query(my, "delete from myc_sm where a = 3;");
select();
mysql_close(my);
}
c#
- postgresql
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Npgsql;
namespace SmokeTest
{
class Program
{static void Main(string[] args)
{Console.WriteLine("\n========== C# Driver ============");
string connSting = "Host=localhost;Port=5401;Username=abc;Password=abc;Database=postgres";
var conn = new NpgsqlConnection(connSting);
NpgsqlDataAdapter DA = new NpgsqlDataAdapter();
//NpgsqlCommand cmd_select = new NpgsqlCommand("select * from SmokeTestTable_csharp");
//DA.SelectCommand = cmd_select;
string drop1 = "drop table if exists SmokeTestTable_csharp;";
string create = "create table SmokeTestTable_csharp(id int primary key,name text,gender text);";
string insert = "insert into SmokeTestTable_csharp values(1,'li','male'),(2,'delete_me','male'),(3,'update_me','female');";
string create2 = "create table testfordelete(id int primary key);";
string droptab = "drop table testfordelete;";
string delete = "delete from Smoketesttable_csharp where id = 2";
string update = "update SmokeTestTable_csharp set name ='update'where id = 3";
string select = "select * from SmokeTestTable_csharp";
string drodb = "drop database if exists smoketestdb;";
string commit = "commit;";
string credb = "create database smoketestdb;";
string swdb = "use smoketestdb";
string dropdb = "drop database smoketestdb;";
conn.Open();
using (var com1 = new NpgsqlCommand(drop1, conn))
using (var com2 = new NpgsqlCommand(create, conn))
using (var com3 = new NpgsqlCommand(insert, conn))
using (var com4 = new NpgsqlCommand(create2, conn))
using (var com5 = new NpgsqlCommand(droptab, conn))
using (var com6 = new NpgsqlCommand(delete, conn))
using (var com7 = new NpgsqlCommand(update, conn))
using (var com8 = new NpgsqlCommand(select, conn))
using (var drobd1 = new NpgsqlCommand(drodb,conn))
using (var credb1 = new NpgsqlCommand(credb, conn))
using (var swdb1 = new NpgsqlCommand(swdb, conn))
using (var dropdb1 = new NpgsqlCommand(dropdb, conn))
using (var comm = new NpgsqlCommand(commit,conn))
{//drobd1.ExecuteNonQuery();
//Console.WriteLine("drop table success!");
//credb1.ExecuteNonQuery();
//Console.WriteLine("create database success!");
//comm.ExecuteNonQuery();
//Console.WriteLine("commit success!");
//swdb1.ExecuteNonQuery();
//Console.WriteLine("switch database success!");
com1.ExecuteNonQuery();
Console.WriteLine("drop table success!");
com2.ExecuteNonQuery();
Console.WriteLine("create table success!");
com3.ExecuteNonQuery();
Console.WriteLine("insert table success!");
com4.ExecuteNonQuery();
Console.WriteLine("create table success!");
com5.ExecuteNonQuery();
Console.WriteLine("drop table success!");
com6.ExecuteNonQuery();
Console.WriteLine("delete table success!");
com7.ExecuteNonQuery();
Console.WriteLine("update table success!");
com8.ExecuteNonQuery();
Console.WriteLine("select table success!");
comm.ExecuteNonQuery();
Console.WriteLine("commit table success!");
//dropdb1.ExecuteNonQuery();
//Console.WriteLine("drop database success!");
}
conn.Close();
Console.WriteLine("=================================\n");
}
}
}
- mysql
using System;
using System.Collections.Generic;
using System.Data;
using MySql.Data.MySqlClient;
using System.Text;
namespace mysql
{
class Program
{static void Main(string[] args)
{
//server=127.0.0.1;port=3306;user=root;password=root; database=minecraftdb;
string cs = "server=" + args[0] + ";user=abc;password=abc;port=" + args[1] + ";database=postgres";
Console.WriteLine("testing mysql:" + cs);
MySqlConnection conn = null;
conn = new MySqlConnection(cs);
//conn.Open();
//Console.WriteLine("drop table if exists mycs_sm;");
//MySqlCommand cmd = new MySqlCommand("drop table if exists mycs_sm;", conn);
//int n = cmd.ExecuteNonQuery();
List<string> sqlList = new List<string>();
sqlList.Add("drop table if exists mycs_sm;");
sqlList.Add("create table mycs_sm(a int primary key, b text);");
sqlList.Add("insert into mycs_sm values(1,'abc'),(2,'bcd'),(3,'cde');");
sqlList.Add("select * from mycs_sm;");
sqlList.Add("update mycs_sm set b ='def'where a = 1;");
sqlList.Add("select * from mycs_sm;");
sqlList.Add("delete from mycs_sm where a = 3;");
sqlList.Add("select * from mycs_sm;");
foreach (string i in sqlList){Console.WriteLine(i);
List<string> list = new List<string>();
if (i == "select * from mycs_sm;"){conn.Open();
MySqlCommand cmd = new MySqlCommand(i, conn);
MySqlDataReader reader = cmd.ExecuteReader();
while (reader.Read()){string id = reader.GetString("a");
string name = reader.GetString("b");
Console.WriteLine(id + ":" + name);
}
conn.Close();}
else {conn.Open();
MySqlCommand cmd = new MySqlCommand(i, conn);
cmd.ExecuteNonQuery();
conn.Close();}
}
//conn.Close();}
}
}
c++
-
postgresql
-
编译
- g++ -o smokeTest smokeTest.cpp -lpqxx -lpq -std=c++17
-
/*
* Copyright (c) 2019 ZettaDB inc. All rights reserved.
* This source code is licensed under Apache 2.0 License,
*
* sudo apt install libpq-dev
* git clone https://github.com/jtv/libpqxx.git
* cd libpqxx
* ./configure
* make
* sudo make install
*
* g++ -o smokeTest smokeTest.cpp -lpqxx -lpq -std=c++17
* ./smokeTest "dbname = postgres host=127.0.0.1 port=5401 user=abc password=abc"
*
* Test the C++ version of libpqxx, the PostgreSQL frontend library.
*/
#include <iostream>
#include <pqxx/pqxx>
using namespace std;
using namespace pqxx;
int
main(int argc, char **argv)
{
const char *conninfo;
if (argc > 1)
conninfo = argv[1];
else
conninfo = "dbname = postgres user=abc password=abc hostaddr=127.0.0.1 port=5401";
try{pqxx::connection db(conninfo);
if (db.is_open()) {cout << "Opened database successfully:" << db.dbname() << endl;
} else {
cout << "Can't open database" << endl;
return 1;
}
pqxx::nontransaction txn1{db};
txn1.exec("drop table if exists t1");
txn1.exec("create table t1(id integer primary key, info text, wt integer)");
txn1.commit();
pqxx::work txn2{db};
txn2.exec("insert into t1(id,info,wt) values(1,'record1', 1)");
txn2.exec("insert into t1(id,info,wt) values(2,'record2', 2)");
txn2.exec("insert into t1(id,info,wt) values(3,'record3', 3)");
txn2.exec("update t1 set wt = 12 where id = 1");
txn2.exec("delete from t1 where id = 2");
pqxx::result r2{txn2.exec("select * from t1")};
for (auto row: r2)
std::cout << row[0] << "" << row[1] <<" " << row[2] << std::endl;
txn2.commit();
pqxx::nontransaction txn3{db};
txn3.exec("drop table t1");
txn3.commit();
db.close();}catch (const std::exception &e){cerr << e.what() << std::endl;
return 1;
}
return 0;
}
-
mysql
-
编译
- g++ mysql.cpp -lmysqlcppconn -o mysql
-
// g++ mysql.cpp -lmysqlcppconn -o mysqltest
// ./mysqltest "tcp://192.168.0.113:5661"
#include "mysql_connection.h"
#include <stdlib.h>
#include <iostream>
#include <cppconn/driver.h>
#include <cppconn/exception.h>
#include <cppconn/resultset.h>
#include <cppconn/statement.h>
using namespace std;
int main(int argc,char* argv[]){
sql::Driver *driver;
sql::Connection *con;
sql::Statement *stmt;
sql::ResultSet *res;
/* Create a connection */
driver = get_driver_instance();
//string infos = sprintf("\"tcp://", argv[1] ,"\"");
//string in1 = "\"tcp://";
//string in2 = "\"";
//string infos = in1 + argv[1] + in2;
string infos = argv[1];
con = driver->connect(infos, "abc", "abc");
con->setSchema("postgres");
stmt = con->createStatement();
stmt->execute("drop table if exists mycpp_sm;");
cout<<"drop table if exists mycpp_sm;"<<endl;
stmt->execute("create table mycpp_sm(a int primary key, b text)");
cout<<"create table mycpp_sm(a int primary key, b text)"<<endl;
stmt->execute("insert into mycpp_sm values(1,'abc'),(2,'bcd'),(3,'cde')");
cout<<"insert into mycpp_sm(1,'abc'),(2,'bcd'),(3,'cde')"<<endl;
stmt->executeQuery("select * from mycpp_sm");
cout<<"select * from mycpp_sm"<<endl;
stmt->execute("update mycpp_sm set b ='qwer'where a = 2");
cout<<"update mycpp_sm set b ='qwer'where a = 2"<<endl;
stmt->executeQuery("select * from mycpp_sm");
cout<<"select * from mycpp_sm"<<endl;
stmt->execute("delete from mycpp_sm where a = 3");
cout<<"delete from mycpp_sm where a = 3"<<endl;
stmt->executeQuery("select * from mycpp_sm");
cout<<"select * from mycpp_sm"<<endl;
delete stmt;
delete con;
}
rust
-
postgresql
-
编译
- cargo build(该步会把所有依赖下载并装置)
-
use clap::{App, Arg};
use postgres::{Client, NoTls};
const DEFAULT_HOST: &str = "localhost";
const DEFAULT_PORT: &str = "7999";
const DEFAULT_USER: &str = "abc";
const DEFAULT_PASS: &str = "abc";
const DEFAULT_DB: &str = "postgres";
struct ConnectionArgs {
pub host: String,
pub port: String,
pub user: String,
pub pass: String,
pub db: String,
}
fn parse_args() -> ConnectionArgs {let matches = App::new("Execute SQL in Postgres")
.arg(Arg::with_name("host")
.short("h")
.long("host")
.value_name("HOSTNAME")
.default_value(DEFAULT_HOST)
.help("Sets the host name of PostgreSQL service")
.takes_value(true),
)
.arg(Arg::with_name("port")
.short("p")
.long("port")
.value_name("PORT")
.default_value(DEFAULT_PORT)
.help("Sets the port of PostgreSQL service")
.takes_value(true),
)
.arg(Arg::with_name("user")
.short("u")
.long("user")
.value_name("USERNAME")
.default_value(DEFAULT_USER)
.help("Sets the user to log into PostgreSQL")
.takes_value(true),
)
.arg(Arg::with_name("pass")
.long("pass")
.value_name("PASSWORD")
.default_value(DEFAULT_PASS)
.help("Sets the user's password")
.takes_value(true),
)
.arg(Arg::with_name("db")
.short("d")
.long("database")
.value_name("DATABASE")
.default_value(DEFAULT_DB)
.help("Sets the database to use")
.takes_value(true),
)
.get_matches();
ConnectionArgs {host: matches.value_of("host").unwrap().to_string(),
port: matches.value_of("port").unwrap().to_string(),
user: matches.value_of("user").unwrap().to_string(),
pass: matches.value_of("pass").unwrap().to_string(),
db: matches.value_of("db").unwrap().to_string(),
}
}
fn main() {let args = parse_args();
let sqls = [
"drop table if exists t1;",
"create table t1(id integer primary key, info text, wt integer);",
"insert into t1(id,info,wt) values(1,'record1', 1);",
"insert into t1(id,info,wt) values(2,'record2', 2);",
"update t1 set wt = 12 where id = 1;",
"select * from t1;",
"delete from t1 where id = 1;",
"select * from t1;",
];
let mut client = Client::connect(
&format!("postgres://{}:{}@{}:{}/{}",
args.user, args.pass, args.host, args.port, args.db
),
NoTls,
)
.unwrap();
for sql in sqls {// 或者应用 let result = client.query(sql, &[]);
// query 会输入后果,execute 只返回影响的行数
let result = client.execute(sql, &[]);
println!("command: {}, res: {:?}", sql, result);
}
}
-
mysql
-
编译
- cargo build(该步会把所有依赖下载并装置)
-
use mysql::*;
use mysql::prelude::*;
//use std::env;
use clap::{App, Arg};
fn main() {let matches = App::new("Execute SQL in Postgres")
.arg(Arg::with_name("host")
.short("h")
.long("host")
.value_name("HOSTNAME")
.help("Sets the host name of PostgreSQL service")
.takes_value(true),
)
.arg(Arg::with_name("port")
.short("p")
.long("port")
.value_name("PORT")
.help("Sets the port of PostgreSQL service")
.takes_value(true),
).get_matches();
let host = matches.value_of("host").unwrap_or("192.168.0.113").to_string();
let port = matches.value_of("port").unwrap_or("5662").to_string();
let front = "mysql://pgx:[email protected]".to_string();
let hosts = front + &host + ":" + &port + "/mysql";
let url = hosts.to_string();
// let mut hosts = front + &host + ":".to_string() + &port + "/mysql".to_string();
// let mut url = format!("mysql://pgx:[email protected]{:?}:{:?}/mysql", host, port);
let urls = "mysql://abc:[email protected]:5662/postgres";
println!("{}", url);
println!("{}", urls);
let pool = Pool::new(urls).unwrap(); // 获取连接池
let mut conn = pool.get_conn().unwrap();// 获取链接
let sql = [
"drop table if exists myrs_sm;",
"create table myrs_sm(a int primary key, b text);",
"insert into myrs_sm values(1,'abc'),(2,'bcd'),(3,'cde');",
"select * from myrs_sm;",
"update myrs_sm set b ='def'where a = 1",
"select * from myrs_sm;",
"delete from myrs_sm where a = 3",
"select * from myrs_sm;"
];
let selects = "select * from myrs_sm;";
for sqls in sql{conn.query_iter(sqls).unwrap();
println!();
println!("{:?}", sqls);
if sqls == "select * from myrs_sm;"{conn.query_iter(selects).unwrap()
.for_each(|row|{let r:(i32,String)=from_row(row.unwrap());
println!("result: a={},b='{}'",r.0,r.1);
});
}
}
}
根本 DDL
Database
create database
CREATE DATABASE name
[[ WITH] [OWNER [=] user_name ]
[TEMPLATE [=] template ]
[ENCODING [=] encoding ]
[LOCALE [=] locale ]
[LC_COLLATE [=] lc_collate ]
[LC_CTYPE [=] lc_ctype ]
[TABLESPACE [=] tablespace_name ]
[ALLOW_CONNECTIONS [=] allowconn ]
[CONNECTION LIMIT [=] connlimit ]
[IS_TEMPLATE [=] istemplate ] ]
创立一个新的数据库
create database kunlun;
要在一个默认表空间 pg_default
中创立一个被用户 abc 有的新数据库 kunlun:
CREATE DATABASE kunlun OWNER abc TABLESPACE pg_default;
更多的参考链接
create database
alter database
ALTER DATABASE name [[ WITH] option [...] ]
这里 option 能够是:ALLOW_CONNECTIONS allowconn
CONNECTION LIMIT connlimit
IS_TEMPLATE istemplate
ALTER DATABASE name OWNER TO {new_owner | CURRENT_USER | SESSION_USER}
ALTER DATABASE name SET TABLESPACE new_tablespace
ALTER DATABASE name SET configuration_parameter {TO | =} {value | DEFAULT}
ALTER DATABASE name SET configuration_parameter FROM CURRENT
ALTER DATABASE name RESET configuration_parameter
ALTER DATABASE name RESET ALL
批改数据库的所有者
ALTER DATABASE kunlun OWNER TO abc;
在数据库 kunlun 中禁用索引扫描
ALTER DATABASE kunlun SET enable_indexscan TO off;
批改数据库的最大链接数:
alter database kunlun connection limit 10;
更多的参考链接
alter database
drop database
DROP DATABASE [IF EXISTS] name
更多的参考链接
drop database
Schema
create schema
CREATE SCHEMA schema_name [AUTHORIZATION role_specification] [schema_element [ ...] ]
CREATE SCHEMA AUTHORIZATION role_specification [schema_element [ ...] ]
CREATE SCHEMA IF NOT EXISTS schema_name [AUTHORIZATION role_specification]
CREATE SCHEMA IF NOT EXISTS AUTHORIZATION role_specification
其中 role_specification 能够是:user_name
| CURRENT_USER
| SESSION_USER
创立一个新的模式
create schema myschema;
为用户创立 kunlun 创立一个模式, 此模式也同时命名为 kunlun:
CREATE SCHEMA AUTHORIZATION kunlun;
更多的参考链接
create schema
alter schema
ALTER SCHEMA name OWNER TO {new_owner | CURRENT_USER | SESSION_USER}
更改模式的所有者
alter schema kunlun owner to vito;
更多的参考链接
alter schema
drop schema
DROP SCHEMA [IF EXISTS] name [, ...] [CASCADE | RESTRICT]
删除名为 kunlun 的 schema:
drop schema kunlun;
更多的参考链接
drop schema
Table
create table
CREATE [[ GLOBAL | LOCAL] {TEMPORARY | TEMP} | UNLOGGED ] TABLE [IF NOT EXISTS] table_name ( [{ column_name data_type [ COLLATE collation] [column_constraint [ ...] ]
| table_constraint
| LIKE source_table [like_option ...] }
[, ...]
] )
[INHERITS ( parent_table [, ...] ) ]
[PARTITION BY { RANGE | LIST | HASH} ({ column_name | ( expression) } [COLLATE collation] [opclass] [, ...] ) ]
[USING method]
[WITH ( storage_parameter [= value] [, ...] ) | WITHOUT OIDS ]
[ON COMMIT { PRESERVE ROWS | DELETE ROWS | DROP} ]
[TABLESPACE tablespace_name]
CREATE [[ GLOBAL | LOCAL] {TEMPORARY | TEMP} | UNLOGGED ] TABLE [IF NOT EXISTS] table_name
OF type_name [ ({ column_name [ WITH OPTIONS] [column_constraint [ ...] ]
| table_constraint }
[, ...]
) ]
[PARTITION BY { RANGE | LIST | HASH} ({ column_name | ( expression) } [COLLATE collation] [opclass] [, ...] ) ]
[USING method]
[WITH ( storage_parameter [= value] [, ...] ) | WITHOUT OIDS ]
[ON COMMIT { PRESERVE ROWS | DELETE ROWS | DROP} ]
[TABLESPACE tablespace_name]
CREATE [[ GLOBAL | LOCAL] {TEMPORARY | TEMP} | UNLOGGED ] TABLE [IF NOT EXISTS] table_name
PARTITION OF parent_table [ ({ column_name [ WITH OPTIONS] [column_constraint [ ...] ]
| table_constraint }
[, ...]
) ] {FOR VALUES partition_bound_spec | DEFAULT}
[PARTITION BY { RANGE | LIST | HASH} ({ column_name | ( expression) } [COLLATE collation] [opclass] [, ...] ) ]
[USING method]
[WITH ( storage_parameter [= value] [, ...] ) | WITHOUT OIDS ]
[ON COMMIT { PRESERVE ROWS | DELETE ROWS | DROP} ]
[TABLESPACE tablespace_name]
其中 column_constraint 是:[CONSTRAINT constraint_name]
{ NOT NULL |
NULL |
DEFAULT default_expr |
GENERATED ALWAYS AS (generation_expr) STORED |
GENERATED {ALWAYS | BY DEFAULT} AS IDENTITY [( sequence_options) ] |
UNIQUE index_parameters |
PRIMARY KEY index_parameters |
REFERENCES reftable [( refcolumn) ] [MATCH FULL | MATCH PARTIAL | MATCH SIMPLE]
[ON DELETE referential_action] [ON UPDATE referential_action] }
[DEFERRABLE | NOT DEFERRABLE] [INITIALLY DEFERRED | INITIALLY IMMEDIATE]
table_constraint 是:[CONSTRAINT constraint_name]
{UNIQUE ( column_name [, ...] ) index_parameters |
PRIMARY KEY (column_name [, ...] ) index_parameters |
like_option 是:{INCLUDING | EXCLUDING} {COMMENTS | CONSTRAINTS | DEFAULTS | GENERATED | IDENTITY | INDEXES | STATISTICS | STORAGE | ALL}
partition_bound_spec 是:
IN (partition_bound_expr [, ...] ) |
FROM ({ partition_bound_expr | MINVALUE | MAXVALUE} [, ...] )
TO ({ partition_bound_expr | MINVALUE | MAXVALUE} [, ...] ) |
WITH (MODULUS numeric_literal, REMAINDER numeric_literal)
UNIQUE、PRIMARY KEY 以及 EXCLUDE 束缚中的 index_parameters 是:[INCLUDE ( column_name [, ...] ) ]
[WITH ( storage_parameter [= value] [, ...] ) ]
[USING INDEX TABLESPACE tablespace_name]
一个 EXCLUDE 束缚中的 exclude_element 是:{column_name | ( expression) } [opclass] [ASC | DESC] [NULLS { FIRST | LAST} ]
创立表 t1:
create table t1(a int , b int);
创立长期表 v1:
create temp table v1(a int, b text);
建设 hash 分区表:
CREATE TABLE t2 (
order_id bigint not null,
cust_id bigint not null,
status text
) PARTITION BY HASH (order_id);
创立一个分区表:
CREATE TABLE t3 (
logdate date not null,
peaktemp int,
unitsales int
) PARTITION BY RANGE (logdate);
更多的参考链接
create table
alter table
ALTER TABLE [IF EXISTS] [ONLY] name [*]
action [, ...]
ALTER TABLE [IF EXISTS] [ONLY] name [*]
RENAME [COLUMN] column_name TO new_column_name
ALTER TABLE [IF EXISTS] [ONLY] name [*]
RENAME CONSTRAINT constraint_name TO new_constraint_name
ALTER TABLE [IF EXISTS] name
RENAME TO new_name
ALTER TABLE [IF EXISTS] name
SET SCHEMA new_schema
ALTER TABLE ALL IN TABLESPACE name [OWNED BY role_name [, ...] ]
SET TABLESPACE new_tablespace [NOWAIT]
ALTER TABLE [IF EXISTS] name
ATTACH PARTITION partition_name {FOR VALUES partition_bound_spec | DEFAULT}
ALTER TABLE [IF EXISTS] name
DETACH PARTITION partition_name
其中 action 是以下之一:ADD [COLUMN] [IF NOT EXISTS] column_name data_type [COLLATE collation] [column_constraint [ ...] ]
DROP [COLUMN] [IF EXISTS] column_name [RESTRICT | CASCADE]
ALTER [COLUMN] column_name [SET DATA] TYPE data_type [COLLATE collation] [USING expression]
ALTER [COLUMN] column_name SET DEFAULT expression
ALTER [COLUMN] column_name DROP DEFAULT
ALTER [COLUMN] column_name {SET | DROP} NOT NULL
ALTER [COLUMN] column_name DROP EXPRESSION [IF EXISTS]
ALTER [COLUMN] column_name ADD GENERATED {ALWAYS | BY DEFAULT} AS IDENTITY [( sequence_options) ]
ALTER [COLUMN] column_name {SET GENERATED { ALWAYS | BY DEFAULT} | SET sequence_option | RESTART [[ WITH] restart ] } [...]
ALTER [COLUMN] column_name DROP IDENTITY [IF EXISTS]
ALTER [COLUMN] column_name SET STATISTICS integer
ALTER [COLUMN] column_name SET (attribute_option = value [, ...] )
ALTER [COLUMN] column_name RESET (attribute_option [, ...] )
ALTER [COLUMN] column_name SET STORAGE {PLAIN | EXTERNAL | EXTENDED | MAIN}
ADD table_constraint [NOT VALID]
ADD table_constraint_using_index
ALTER CONSTRAINT constraint_name [DEFERRABLE | NOT DEFERRABLE] [INITIALLY DEFERRED | INITIALLY IMMEDIATE]
VALIDATE CONSTRAINT constraint_name
DROP CONSTRAINT [IF EXISTS] constraint_name [RESTRICT | CASCADE]
ENABLE REPLICA TRIGGER trigger_name
ENABLE ALWAYS TRIGGER trigger_name
DISABLE RULE rewrite_rule_name
ENABLE RULE rewrite_rule_name
ENABLE REPLICA RULE rewrite_rule_name
ENABLE ALWAYS RULE rewrite_rule_name
DISABLE ROW LEVEL SECURITY
ENABLE ROW LEVEL SECURITY
FORCE ROW LEVEL SECURITY
NO FORCE ROW LEVEL SECURITY
SET TABLESPACE new_tablespace
SET (storage_parameter [= value] [, ...] )
RESET (storage_parameter [, ...] )
INHERIT parent_table
NO INHERIT parent_table
OF type_name
NOT OF
OWNER TO {new_owner | CURRENT_USER | SESSION_USER}
REPLICA IDENTITY {DEFAULT | USING INDEX index_name | FULL | NOTHING}
and partition_bound_spec is:
IN (partition_bound_expr [, ...] ) |
FROM ({ partition_bound_expr | MINVALUE | MAXVALUE} [, ...] )
TO ({ partition_bound_expr | MINVALUE | MAXVALUE} [, ...] ) |
WITH (MODULUS numeric_literal, REMAINDER numeric_literal)
and column_constraint is:
[CONSTRAINT constraint_name]
{ NOT NULL |
NULL |
DEFAULT default_expr |
GENERATED ALWAYS AS (generation_expr) STORED |
GENERATED {ALWAYS | BY DEFAULT} AS IDENTITY [( sequence_options) ] |
UNIQUE index_parameters |
PRIMARY KEY index_parameters |
REFERENCES reftable [( refcolumn) ] [MATCH FULL | MATCH PARTIAL | MATCH SIMPLE]
[ON DELETE referential_action] [ON UPDATE referential_action] }
[DEFERRABLE | NOT DEFERRABLE] [INITIALLY DEFERRED | INITIALLY IMMEDIATE]
而 table_constraint 是:[CONSTRAINT constraint_name]
{UNIQUE ( column_name [, ...] ) index_parameters |
PRIMARY KEY (column_name [, ...] ) index_parameters |
EXCLUDE [USING index_method] (exclude_element WITH operator [, ...] ) index_parameters [WHERE ( predicate) ] |
[MATCH FULL | MATCH PARTIAL | MATCH SIMPLE] [ON DELETE action] [ON UPDATE action] }
[DEFERRABLE | NOT DEFERRABLE] [INITIALLY DEFERRED | INITIALLY IMMEDIATE]
并且 table_constraint_using_index 是:[CONSTRAINT constraint_name]
{UNIQUE | PRIMARY KEY} USING INDEX index_name
[DEFERRABLE | NOT DEFERRABLE] [INITIALLY DEFERRED | INITIALLY IMMEDIATE]
UNIQUE、PRIMARY KEY 以及 EXCLUDE 束缚中的 index_parameters 是:[INCLUDE ( column_name [, ...] ) ]
[WITH ( storage_parameter [= value] [, ...] ) ]
[USING INDEX TABLESPACE tablespace_name]
exclude_element in an EXCLUDE constraint is:
{column_name | ( expression) } [opclass] [ASC | DESC] [NULLS { FIRST | LAST} ]
重命名表 kunlun 的名字:
create table kunlun(a int not null);
alter table kunlun rename to t1;
向 t1 表中减少一列:
alter table t1 add column bb text;
bb 字段重命名为 b:
alter table t1 rename column bb to b;
批改表中 b 字段的数据类型:
ALTER TABLE t1 ALTER COLUMN b type varchar(10);
向 b 字段减少惟一束缚:
alter table t1 add constraint unique_t1_b unique (b);
为 b 列减少一个非空束缚:
ALTER TABLE t1 ALTER COLUMN b SET NOT NULL;
移除 b 列的非空束缚:
ALTER TABLE t1 ALTER COLUMN b drop NOT NULL;
把 t1 表挪动至另一个模式中:
create schema kunlun;
ALTER TABLE public.t1 SET SCHEMA kunlun;
更多的参考链接
alter table
drop table
DROP TABLE [IF EXISTS] name [, ...] [CASCADE | RESTRICT]
删除表 t1:
drop table t1;
更多的参考链接
drop table
Index
create index
CREATE [UNIQUE] INDEX [CONCURRENTLY] [[ IF NOT EXISTS] name ] ON [ONLY] table_name [USING method]
({ column_name | ( expression) } [COLLATE collation] [opclass] [ASC | DESC] [NULLS { FIRST | LAST} ] [, ...] )
[INCLUDE ( column_name [, ...] ) ]
[WITH ( storage_parameter = value [, ...] ) ]
[TABLESPACE tablespace_name]
[WHERE predicate]
更多的参考链接
create index
alter index
ALTER INDEX [IF EXISTS] name RENAME TO new_name
ALTER INDEX [IF EXISTS] name SET TABLESPACE tablespace_name
ALTER INDEX name ATTACH PARTITION index_name
ALTER INDEX name DEPENDS ON EXTENSION extension_name
ALTER INDEX [IF EXISTS] name SET (storage_parameter = value [, ...] )
ALTER INDEX [IF EXISTS] name RESET (storage_parameter [, ...] )
ALTER INDEX [IF EXISTS] name ALTER [COLUMN] column_number
SET STATISTICS integer
ALTER INDEX ALL IN TABLESPACE name [OWNED BY role_name [, ...] ]
SET TABLESPACE new_tablespace [NOWAIT]
重命名一个现有的索引:
ALTER INDEX distributors RENAME TO suppliers;
更多的参考链接
alter index
drop index
DROP INDEX [CONCURRENTLY] [IF EXISTS] name [, ...] [CASCADE | RESTRICT]
删除一个存在的索引:
drop index suppliers;
更多的参考链接
drop index
Sequence
create sequence
CREATE [TEMPORARY | TEMP] SEQUENCE [IF NOT EXISTS] name [INCREMENT [ BY] increment ]
[MINVALUE minvalue | NO MINVALUE] [MAXVALUE maxvalue | NO MAXVALUE]
[START [ WITH] start ] [CACHE cache] [[ NO] CYCLE ]
[OWNED BY { table_name.column_name | NONE} ]
创立一个名为 kunlun 的序列从 100 开始:
CREATE SEQUENCE kunlun START 100;
SELECT nextval('kunlun');
nextval
---------
100
SELECT nextval('kunlun');
nextval
---------
101
更多的参考链接
create sequence
alter sequence
ALTER SEQUENCE [IF EXISTS] name
[AS data_type]
[INCREMENT [ BY] increment ]
[MINVALUE minvalue | NO MINVALUE] [MAXVALUE maxvalue | NO MAXVALUE]
[START [ WITH] start ]
[RESTART [ [ WITH] restart ] ]
[CACHE cache] [[ NO] CYCLE ]
[OWNED BY { table_name.column_name | NONE} ]
ALTER SEQUENCE [IF EXISTS] name OWNER TO {new_owner | CURRENT_USER | SESSION_USER}
ALTER SEQUENCE [IF EXISTS] name RENAME TO new_name
ALTER SEQUENCE [IF EXISTS] name SET SCHEMA new_schema
更改 kunlun 序列从 200 开始:
ALTER SEQUENCE kunlun RESTART WITH 200;
SELECT nextval('kunlun');
nextval
---------
200
SELECT nextval('kunlun');
nextval
---------
201
更多的参考链接
alter sequence
drop sequence
DROP SEQUENCE [IF EXISTS] name [, ...] [CASCADE | RESTRICT]
删除 kunlun 序列:
DROP SEQUENCE kunlun;
更多的参考链接
drop sequence
View
create view
CREATE [OR REPLACE] [TEMP | TEMPORARY] [RECURSIVE] VIEW name [( column_name [, ...] ) ]
[WITH ( view_option_name [= view_option_value] [, ...] ) ]
AS query
[WITH [ CASCADED | LOCAL] CHECK OPTION ]
创立一个为 v1 的视图:
create table t1(id int, a int);
insert into t1(id,a) values (1,2),(2,4),(3,8),(6,0);
CREATE VIEW v1 AS
SELECT *
FROM t1
WHERE id<5;
更多的参考链接
create view
alter view
ALTER VIEW [IF EXISTS] name ALTER [COLUMN] column_name SET DEFAULT expression
ALTER VIEW [IF EXISTS] name ALTER [COLUMN] column_name DROP DEFAULT
ALTER VIEW [IF EXISTS] name OWNER TO {new_owner | CURRENT_USER | SESSION_USER}
ALTER VIEW [IF EXISTS] name RENAME TO new_name
ALTER VIEW [IF EXISTS] name SET SCHEMA new_schema
ALTER VIEW [IF EXISTS] name SET (view_option_name [= view_option_value] [, ...] )
ALTER VIEW [IF EXISTS] name RESET (view_option_name [, ...] )
更改 v1 视图的名称:
alter view v1 rename to vv1;
更多的参考链接
alter view
drop view
DROP VIEW [IF EXISTS] name [, ...] [CASCADE | RESTRICT]
删除 vv1 视图:
drop view vv1;
更多的参考链接
drop view
存储过程
http://192.168.0.104/pgdocs/h…
用户和权限治理
create role
CREATE ROLE name [[ WITH] option [...] ]
where option 能够是:SUPERUSER | NOSUPERUSER
| CREATEDB | NOCREATEDB
| CREATEROLE | NOCREATEROLE
| INHERIT | NOINHERIT
| LOGIN | NOLOGIN
| REPLICATION | NOREPLICATION
| BYPASSRLS | NOBYPASSRLS
| CONNECTION LIMIT connlimit
| [ENCRYPTED] PASSWORD 'password' | PASSWORD NULL
| VALID UNTIL 'timestamp'
| IN ROLE role_name [, ...]
| IN GROUP role_name [, ...]
| ROLE role_name [, ...]
| ADMIN role_name [, ...]
| USER role_name [, ...]
| SYSID uid
创立一个能够登录的角色 kunlun, 它没有明码:
create role kunlun LOGIN;
创立一个能够登陆且有明码的用户:kunlun1:CREATE USER
和CREATE ROL
E 完全相同,除了它带有LOGIN
;
CREATE USER kunlun1 WITH PASSWORD '12345678';
创立用户 kunlun2, 明码有效期到 2024 年 1 月 1 日:
CREATE ROLE kunlun2 WITH LOGIN PASSWORD '12345678' VALID UNTIL '2024-01-01';
创立一个能够创立数据库和治理角色的角色 kunlun3:
CREATE ROLE kunlun3 WITH CREATEDB CREATEROLE;
更多的参考链接
create role
alter role
ALTER ROLE role_specification [WITH] option [...]
其中 option 能够是:SUPERUSER | NOSUPERUSER
| CREATEDB | NOCREATEDB
| CREATEROLE | NOCREATEROLE
| INHERIT | NOINHERIT
| LOGIN | NOLOGIN
| REPLICATION | NOREPLICATION
| BYPASSRLS | NOBYPASSRLS
| CONNECTION LIMIT connlimit
| [ENCRYPTED] PASSWORD 'password' | PASSWORD NULL
| VALID UNTIL 'timestamp'
ALTER ROLE name RENAME TO new_name
ALTER ROLE {role_specification | ALL} [IN DATABASE database_name] SET configuration_parameter {TO | =} {value | DEFAULT}
ALTER ROLE {role_specification | ALL} [IN DATABASE database_name] SET configuration_parameter FROM CURRENT
ALTER ROLE {role_specification | ALL} [IN DATABASE database_name] RESET configuration_parameter
ALTER ROLE {role_specification | ALL} [IN DATABASE database_name] RESET ALL
其中 role_specification 能够是:role_name
| CURRENT_USER
| SESSION_USER
更改 kunlun1 的明码:
ALTER ROLE kunlun1 WITH PASSWORD '87654321';
移除 kunlun2 的明码:
ALTER ROLE kunlun2 WITH PASSWORD NULL;
更多的参考链接
alter role
drop role
DROP ROLE [IF EXISTS] name [, ...]
删除创立的角色 kunlun:
drop role kunlun;
更多的参考链接
drop role
grant
GRANT {{ SELECT | INSERT | UPDATE | DELETE | TRUNCATE | REFERENCES | TRIGGER}
[, ...] | ALL [PRIVILEGES] }
ON {[ TABLE] table_name [, ...]
| ALL TABLES IN SCHEMA schema_name [, ...] }
TO role_specification [, ...] [WITH GRANT OPTION]
GRANT {{ SELECT | INSERT | UPDATE | REFERENCES} (column_name [, ...] )
[, ...] | ALL [PRIVILEGES] (column_name [, ...] ) }
ON [TABLE] table_name [, ...]
TO role_specification [, ...] [WITH GRANT OPTION]
GRANT {{ USAGE | SELECT | UPDATE}
[, ...] | ALL [PRIVILEGES] }
ON {SEQUENCE sequence_name [, ...]
| ALL SEQUENCES IN SCHEMA schema_name [, ...] }
TO role_specification [, ...] [WITH GRANT OPTION]
GRANT {{ CREATE | CONNECT | TEMPORARY | TEMP} [, ...] | ALL [PRIVILEGES] }
ON DATABASE database_name [, ...]
TO role_specification [, ...] [WITH GRANT OPTION]
GRANT {USAGE | ALL [ PRIVILEGES] }
ON DOMAIN domain_name [, ...]
TO role_specification [, ...] [WITH GRANT OPTION]
GRANT {USAGE | ALL [ PRIVILEGES] }
ON FOREIGN DATA WRAPPER fdw_name [, ...]
TO role_specification [, ...] [WITH GRANT OPTION]
GRANT {USAGE | ALL [ PRIVILEGES] }
ON FOREIGN SERVER server_name [, ...]
TO role_specification [, ...] [WITH GRANT OPTION]
GRANT {EXECUTE | ALL [ PRIVILEGES] }
ON {{ FUNCTION | PROCEDURE | ROUTINE} routine_name [( [ [ argmode] [arg_name] arg_type [, ...] ] ) ] [, ...]
| ALL {FUNCTIONS | PROCEDURES | ROUTINES} IN SCHEMA schema_name [, ...] }
TO role_specification [, ...] [WITH GRANT OPTION]
GRANT {USAGE | ALL [ PRIVILEGES] }
ON LANGUAGE lang_name [, ...]
TO role_specification [, ...] [WITH GRANT OPTION]
GRANT {{ SELECT | UPDATE} [, ...] | ALL [PRIVILEGES] }
ON LARGE OBJECT loid [, ...]
TO role_specification [, ...] [WITH GRANT OPTION]
GRANT {{ CREATE | USAGE} [, ...] | ALL [PRIVILEGES] }
ON SCHEMA schema_name [, ...]
TO role_specification [, ...] [WITH GRANT OPTION]
GRANT {CREATE | ALL [ PRIVILEGES] }
ON TABLESPACE tablespace_name [, ...]
TO role_specification [, ...] [WITH GRANT OPTION]
GRANT {USAGE | ALL [ PRIVILEGES] }
ON TYPE type_name [, ...]
TO role_specification [, ...] [WITH GRANT OPTION]
其中 role_specification 能够是:[GROUP] role_name
| PUBLIC
| CURRENT_USER
| SESSION_USER
GRANT role_name [, ...] TO role_name [, ...] [WITH ADMIN OPTION]
把表 t1 上的插入特权授予给所有用户:
GRANT INSERT ON t1 TO PUBLIC;
更多的参考链接
grant
revoke
REVOKE [GRANT OPTION FOR]
{{ SELECT | INSERT | UPDATE | DELETE | TRUNCATE | REFERENCES | TRIGGER}
[, ...] | ALL [PRIVILEGES] }
ON {[ TABLE] table_name [, ...]
| ALL TABLES IN SCHEMA schema_name [, ...] }
FROM {[ GROUP] role_name | PUBLIC } [, ...]
[CASCADE | RESTRICT]
REVOKE [GRANT OPTION FOR]
{{ SELECT | INSERT | UPDATE | REFERENCES} (column_name [, ...] )
[, ...] | ALL [PRIVILEGES] (column_name [, ...] ) }
ON [TABLE] table_name [, ...]
FROM {[ GROUP] role_name | PUBLIC } [, ...]
[CASCADE | RESTRICT]
REVOKE [GRANT OPTION FOR]
{{ USAGE | SELECT | UPDATE}
[, ...] | ALL [PRIVILEGES] }
ON {SEQUENCE sequence_name [, ...]
| ALL SEQUENCES IN SCHEMA schema_name [, ...] }
FROM {[ GROUP] role_name | PUBLIC } [, ...]
[CASCADE | RESTRICT]
REVOKE [GRANT OPTION FOR]
{{ CREATE | CONNECT | TEMPORARY | TEMP} [, ...] | ALL [PRIVILEGES] }
ON DATABASE database_name [, ...]
FROM {[ GROUP] role_name | PUBLIC } [, ...]
[CASCADE | RESTRICT]
REVOKE [GRANT OPTION FOR]
{USAGE | ALL [ PRIVILEGES] }
ON DOMAIN domain_name [, ...]
FROM {[ GROUP] role_name | PUBLIC } [, ...]
[CASCADE | RESTRICT]
REVOKE [GRANT OPTION FOR]
{USAGE | ALL [ PRIVILEGES] }
ON FOREIGN DATA WRAPPER fdw_name [, ...]
FROM {[ GROUP] role_name | PUBLIC } [, ...]
[CASCADE | RESTRICT]
REVOKE [GRANT OPTION FOR]
{USAGE | ALL [ PRIVILEGES] }
ON FOREIGN SERVER server_name [, ...]
FROM {[ GROUP] role_name | PUBLIC } [, ...]
[CASCADE | RESTRICT]
REVOKE [GRANT OPTION FOR]
{EXECUTE | ALL [ PRIVILEGES] }
ON {{ FUNCTION | PROCEDURE | ROUTINE} function_name [( [ [ argmode] [arg_name] arg_type [, ...] ] ) ] [, ...]
| ALL {FUNCTIONS | PROCEDURES | ROUTINES} IN SCHEMA schema_name [, ...] }
FROM {[ GROUP] role_name | PUBLIC } [, ...]
[CASCADE | RESTRICT]
REVOKE [GRANT OPTION FOR]
{USAGE | ALL [ PRIVILEGES] }
ON LANGUAGE lang_name [, ...]
FROM {[ GROUP] role_name | PUBLIC } [, ...]
[CASCADE | RESTRICT]
REVOKE [GRANT OPTION FOR]
{{ SELECT | UPDATE} [, ...] | ALL [PRIVILEGES] }
ON LARGE OBJECT loid [, ...]
FROM {[ GROUP] role_name | PUBLIC } [, ...]
[CASCADE | RESTRICT]
REVOKE [GRANT OPTION FOR]
{{ CREATE | USAGE} [, ...] | ALL [PRIVILEGES] }
ON SCHEMA schema_name [, ...]
FROM {[ GROUP] role_name | PUBLIC } [, ...]
[CASCADE | RESTRICT]
REVOKE [GRANT OPTION FOR]
{CREATE | ALL [ PRIVILEGES] }
ON TABLESPACE tablespace_name [, ...]
FROM {[ GROUP] role_name | PUBLIC } [, ...]
[CASCADE | RESTRICT]
REVOKE [GRANT OPTION FOR]
{USAGE | ALL [ PRIVILEGES] }
ON TYPE type_name [, ...]
FROM {[ GROUP] role_name | PUBLIC } [, ...]
[CASCADE | RESTRICT]
REVOKE [ADMIN OPTION FOR]
role_name [, ...] FROM role_name [, ...]
[CASCADE | RESTRICT]
从 public 发出表 t1 上的插入特权:
REVOKE INSERT ON t1 FROM PUBLIC;
更多的参考链接
revoke
其余 DDL
truncate
TRUNCATE [TABLE] [ONLY] name [*] [, ...]
[RESTART IDENTITY | CONTINUE IDENTITY] [CASCADE | RESTRICT]
更多的参考链接
truncate
根本 DML
insert
[WITH [ RECURSIVE] with_query [, ...] ]
INSERT INTO table_name [AS alias] [( column_name [, ...] ) ]
[OVERRIDING { SYSTEM | USER} VALUE ]
{DEFAULT VALUES | VALUES ( { expression | DEFAULT} [, ...] ) [, ...] | query }
[ON CONFLICT [ conflict_target] conflict_action ]
[RETURNING * | output_expression [ [ AS] output_name ] [, ...] ]
其中 conflict_target 能够是以下之一:({ index_column_name | ( index_expression) } [COLLATE collation] [opclass] [, ...] ) [WHERE index_predicate]
ON CONSTRAINT constraint_name
并且 conflict_action 是以下之一:DO NOTHING
DO UPDATE SET {column_name = { expression | DEFAULT} |
(column_name [, ...] ) = [ROW] ({ expression | DEFAULT} [, ...] ) |
(column_name [, ...] ) = (sub-SELECT)
} [, ...]
[WHERE condition]
向表 t1 中插入数据:
create table t1(id int, a int);
insert into t1(id,a) values (1,2);
insert into t1(id,a) values (2,4),(3,8),(6,0);
更多的参考链接
insert
update
[WITH [ RECURSIVE] with_query [, ...] ]
UPDATE [ONLY] table_name [*] [[ AS] alias ]
SET {column_name = { expression | DEFAULT} |
(column_name [, ...] ) = [ROW] ({ expression | DEFAULT} [, ...] ) |
(column_name [, ...] ) = (sub-SELECT)
} [, ...]
[FROM from_list]
[WHERE condition | WHERE CURRENT OF cursor_name]
[RETURNING * | output_expression [ [ AS] output_name ] [, ...] ]
在 t1 表中 a 列中的 4 改为 3:
UPDATE t1 SET a='3' WHERE id=2;
更多的参考链接
update
delete
[WITH [ RECURSIVE] with_query [, ...] ]
DELETE FROM [ONLY] table_name [*] [[ AS] alias ]
[USING from_item [, ...] ]
[WHERE condition | WHERE CURRENT OF cursor_name]
[RETURNING * | output_expression [ [ AS] output_name ] [, ...] ]
在 t1 表中删除 id 等于 6 的行:
DELETE FROM t1 WHERE id=6;
更多的参考链接
delete
select
[WITH [ RECURSIVE] with_query [, ...] ]
SELECT [ALL | DISTINCT [ ON ( expression [, ...] ) ] ]
[* | expression [ [ AS] output_name ] [, ...] ]
[FROM from_item [, ...] ]
[WHERE condition]
[GROUP BY grouping_element [, ...] ]
[HAVING condition]
[WINDOW window_name AS ( window_definition) [, ...] ]
[{ UNION | INTERSECT | EXCEPT} [ALL | DISTINCT] select ]
[ORDER BY expression [ ASC | DESC | USING operator] [NULLS { FIRST | LAST} ] [, ...] ]
[LIMIT { count | ALL} ]
[OFFSET start [ ROW | ROWS] ]
[FETCH { FIRST | NEXT} [count] {ROW | ROWS} {ONLY | WITH TIES} ]
[FOR { UPDATE | NO KEY UPDATE | SHARE | KEY SHARE} [OF table_name [, ...] ] [NOWAIT | SKIP LOCKED] [...] ]
其中 from_item 能够是以下之一:[ONLY] table_name [*] [[ AS] alias [( column_alias [, ...] ) ] ]
[TABLESAMPLE sampling_method ( argument [, ...] ) [REPEATABLE ( seed) ] ]
[LATERAL] (select) [AS] alias [( column_alias [, ...] ) ]
with_query_name [[ AS] alias [( column_alias [, ...] ) ] ]
[LATERAL] function_name ([ argument [, ...] ] )
[WITH ORDINALITY] [[ AS] alias [( column_alias [, ...] ) ] ]
[LATERAL] function_name ([ argument [, ...] ] ) [AS] alias (column_definition [, ...] )
[LATERAL] function_name ([ argument [, ...] ] ) AS (column_definition [, ...] )
[LATERAL] ROWS FROM(function_name ( [ argument [, ...] ] ) [AS ( column_definition [, ...] ) ] [, ...] )
[WITH ORDINALITY] [[ AS] alias [( column_alias [, ...] ) ] ]
from_item [NATURAL] join_type from_item [ON join_condition | USING ( join_column [, ...] ) ]
并且 grouping_element 能够是以下之一:( )
expression
(expression [, ...] )
ROLLUP ({ expression | ( expression [, ...] ) } [, ...] )
CUBE ({ expression | ( expression [, ...] ) } [, ...] )
GROUPING SETS (grouping_element [, ...] )
并且 with_query 是:with_query_name [( column_name [, ...] ) ] AS [[ NOT] MATERIALIZED ] (select | values | insert | update | delete)
TABLE [ONLY] table_name [*]
进行查看 t1 表,
select * from t1;
等价于
table t1;
更多的参考链接
select
MySQL 特有的 DML
insert ignore
如果数据库没有内容,就插入新的数据,如果有数据的话就跳过这条数据:
drop table if exists t1;
create table t1 (a int PRIMARY KEY , b int not null,CONSTRAINT t1_b_key UNIQUE (b));
insert ignore into t1(a,b) values (4,4);
#反复进行 insert, 因为曾经存在而进行跳过
insert ignore into t1(a,b) values (4,4);
replace into
查看 t1 外面存在的内容:
insert into t1(a,b) values (2,3);
table t1;
a | b
---+---
2 | 3
4 | 4
(2 rows)
如果存在抵触,则先删除其余抵触的元组,而后再进行插入:
replace into t1 values(1,1),(1,2);
table t1;
a | b
---+---
1 | 2
2 | 3
4 | 4
(3 rows)
Prepared Statement
PostgreSQL 语法和示例
(应用 client api 调用)
PREPARE name [( data_type [, ...] ) ] AS statement
MySQL 语法和示例
(应用 client api 调用)
事务处理
KunlunBase 反对与 MySQL 雷同的事务处理性能并且与 MySQL 连贯协定或者 PostgreSQL 连贯协定无关,在这两类连贯中具备雷同的行为。
autocommit 事务
在一个客户端连贯有一个 autocommit 变量,默认值是 true,用户能够在连贯中动静批改 autocommit 的值。当 autocommit 为 true 时,用户发送的任何 DML 语句都会作为独立的事务来执行,语句执行完结时 KunlunBase 就主动提交这个事务。
DDL 语句永远作为一个独立的事务来执行,无论 autocommit 是 true 还是 false。如果执行 DDL 时本连贯中已有运行中的事务,那么 kunlunbase 会先提交这个事务,而后再执行这个 DDL。
显式事务
咱们把不是 autocommit 的事务称为显式事务。一个显式事务能够显式或者隐式地启动和提交。
显式地启动和提交 / 回滚事务
- 用户执行 begin 或者 start transaction 显式开启一个事务
- 执行若干条 DML 语句
- 执行 commit 显式地提交这个事务;或者执行 rollback 显式地回滚这个事务
隐式开启和隐式提交事务
如果用户设置 autocommit 为 false,那么之后本客户端连贯 C 中收到的第一条 DML 语句后,会先主动(隐式)启动一个事务 T,在这个事务 T 中执行这条语句。后续在这个连贯 C 中收到更多 DML 语句时,KunlunBase 会持续在事务 T 中执行这些语句。
一个显示事务还能够被隐式地提交,当在显式事务 T 中 KunlunBase 收到 set autocommit=true 或者任何一个 DDL 语句时,KunlunBase 会(隐式)提交事务 T。
如果在显式事务中执行某个 DML 语句出错,那么事务 T 在 Kunlunbase 外部会被主动回滚,之后用户在连贯 C 中 的事务 T 中 发送任何 DML 语句都会被 KunlunBase 疏忽,直到在连贯 C 中收到 commit 或者 rollback 后,KunlunBase 会回滚事务 T。
事务处理性能示例
autocommit 事务示例
在 autocommit 开启状态时, 用户发送的任何 DML 语句都会作为独立的事务来执行,语句执行完结时 KunlunBase 就主动提交这个事务。
set autocommit=true;
show autocommit;
drop table if exists t1;
create table t1(a serial primary key, b int);
insert into t1(b) values(11);
insert into t1(b) values(12);
insert into t1(b) values(13);
insert into t1(b) values(14);
drop table if exists t2;
在 autocommit 敞开状态时, 须要进行手动提交或者回滚
set autocommit=off;
show autocommit;
drop table if exists t1;
create table t1(a serial primary key, b int);
insert into t1(b) values(11);
关上另一个会话查看 t1 表
进行 commit 提交之后才会胜利过插入
在向事务中插入一条数据, 进行手动回滚
DDL 语句永远作为一个独立的事务来执行
ddl 语句曾经进行提交了,commit 提醒没有检测到事务
set autocommit=true;
show autocommit;
begin;
create table t1(a serial primary key, b int);
commit;
将 autocommit 设置为 false, 也是一样的
set autocommit=false;
show autocommit;
begin;
drop table t1;
commit;
显性事务示例
先敞开隐式提交, 在显示中进行事务
show autocommit;
set autocommit=off;
begin;
create table t1(a serial primary key, b int);
insert into t1(b) values(21);
insert into t1(b) values(22);
insert into t1(b) values(23);
insert into t1(b) values(24);
关上另一个窗口, 查看 t1 表, 因为还没进行 commit; 表 t1 中是没有数据的
咱们持续进行 commit; 提交
胜利提交为 t1 表中插入数据
无论 autocommit 是 true 还是 false。如果执行 DDL 时本连贯中已有运行中的事务,那么 kunlunbase 会先提交这个事务,而后再执行这个 DDL
show autocommit;
set autocommit=off;
begin;
drop table if exists t1;
create table t1(a serial primary key, b int);
insert into t1(b) values(21);
insert into t1(b) values(22);
insert into t1(b) values(23);
insert into t1(b) values(24);
t1 目前是没有数据的
如果执行 ddl 语句会提交后面事务
当初曾经提交了事务, 再输出 commit; 也是提醒没有事务须要提交
commit;
在显式事务中执行某个 DML 语句出错,那么事务 T 在 Kunlunbase 外部会被主动回滚,之后用户在连贯 C 中 的事务 T 中 发送任何 DML 语句都会被 KunlunBase 疏忽,直到在连贯 C 中收到 commit 或者 rollback 后,KunlunBase 会回滚事务 T。
set autocommit=off;
show autocommit;
begin;
drop table if exists t1;
create table t1(a serial primary key, b int);
insert into t1(b) values(11);
insert into t1(c) values(12);
insert into t1(b) values(13);
insert into t1(b) values(14);
commit;
所有数据都进行了回滚,t1 表中没有任何数据
DDL 事务处理和复制
用户在 KunlunBase 集群的任何一个计算节点执行的 DDL,KunlunBase 会主动复制到这个集群的所有其余计算节点执行;并且用户在连贯到多个计算节点的多个客户端连贯中能够同时执行 DDL 语句,KunlunBase 会确保一个集群的所有计算节点依照雷同的程序执行用户发送给所有计算节点的 DDL 语句。因而一个 KunlunBase 集群的所有计算节点始终领有雷同的元数据从而正确运行。
当用户执行一个 DDL 语句时,KunlunBase 把这个语句作为一个独立的分布式事务来运行。如果这个事务运行过程中计算节点或者存储节点或者元数据节点产生故障或者断电,那么这个分布式事务会主动被回滚,革除所有中间状态,这样用户就能够再次执行这个 DDL 语句。
用户不须要晓得 DDL 事务处理的技术细节和原理,只须要晓得在一个 KunlunBase 集群的任何一个计算节点都能够执行 DDL 语句和 DML 语句,就像应用单机数据库一样。
体验 DDL 复制性能
这里是一个例子帮忙用户体验 KunlunBase 集群的确具备 DDL 复制的能力。例如,在任何一个计算节点执行 create table 语句,在集群的其余计算节点都能够应用到那个 create table 语句创立的表。
咱们首先须要筹备一个两个计算节点集群, 便于测试.
psql postgres://abc:[email protected]:47001/postgres
psql postgres://abc:[email protected]:47004/postgres
create
在计算节点 47001 中创立数据库 kunlundb;
在计算节点 47004 中进行查看
接着在 47004 的节点中创立 t1 表
create table t1(a int, b int);
insert into t1 values (1,2);
在 47001 计算节点中存在 kunlundb 数据库中的 t1 表
insert
持续在 47001 中减少表 t1 的数据
insert into t1 values (3,4);
insert into t1 values (5,6);
减少的数据在 47004 中也胜利复制
update
在计算节点 47004 中进行批改数据
update t1 set a='2' where a=3;
update t1 set a='3' where a=5;
在另一个计算节点 47001 中, 数据也是失去了同步
delete
在计算节点 47001 中进行删除某行数据
delete from t1 where a=1;
在另一个计算节点 47004 中, 数据也是失去了同步
drop
在计算节点 47004 中进行删除表 t1
同样在另一个计算节点 47004 中也不在表 t1
Fullsync 高可用
Fullsync 强同步机制
模仿备机故障体验 fullsync
- 购买测试集群,例如购买一主两备 shard,连贯 shard 主节点能够失常进行创立库表,插入 / 删除数据等操作。
- 将该 shard 两个备 db 的 mysqld 过程进行了。
- 连贯 shard 主节点进行创立库表,插入 / 删除数据时,都会报错返回
监控和配置 Fullsync
- 通过 XPanel 监控和配置
设置 fullsync_consistency_level 实例变量,具体操作如下图所示。
http://www.kunlunbase.com:818…
获取 fullsync_consistency_level 实例变量,具体操作如下图所示。
集群展现页面,点击某个存储节点 -》进入按钮,还能够查看以后节点的 fullsync 值。
- 通过 SQL 语句监控和配置
设置 fullsync 配置 sql:
set global fullsync_consistency_level=2
查看 fullsync 设置:
show global variables like 'fullsync_consistency_level'
主备切换
模仿主节点故障
- 设置主节点只读,执行该 sql,set global read_only=true;
- 将主节点 mysqld 过程 kill 掉 / 调用 stop 脚本进行
cluster_mgr 检测到主节点异样后,触发计时工作,主节点间断故障工夫超过设置值(默认 20s),则触发容灾切换。切换后选出新主,能够失常对外提供服务。
主备切换流程记录
查看主备切换流程记录有两个中央:
- 元数据集群的 kunlun_metadata_db.rbr_consfailover 表中。
- xpanel 中集群治理 -> 集群列表信息 -> 主备切换中。
验证新的主节点工作失常并且没有失落已提交的事务
cluster_mgr 提供验证测试程序,下载地址如下:
https://github.com/zettadb/cl…
弹性扩缩容
XPanel
1. 弹性扩容。
留神:
- 一个集群中至多存在两个以上 shard 能力发动扩容。
- 扩容分为主动扩容和手动扩容。主动扩容与手动扩容的区别:主动扩容用户不须要人为选 shard 表,由零碎自动检测所需扩容表进行扩容操作;手动扩容就是用户本人选表搬迁来达到扩容目标。
(1)主动扩容。
进到 XPanel 中,点集群治理 -》集群列表 -》集群列表信息,进到集群列表页面,找到曾经建好的多 shard 集群,发动主动扩容。如下图所示。
如果扩容胜利,如图所示:
如果扩容失败,如图所示:
2. 手动扩容。
具体操作,如下图所示。
如果扩容胜利,如图所示:
如果扩容失败,如图所示:
cluster-mgr API
- 应用 postman 创立 rbr 集群:
集群创立胜利后,连贯计算节点,如:
psql postgres://abc:[email protected]:59701/postgres
建设 student 表并写入数据。
- 减少 shard:
3. 发动扩容:
扩容胜利后,连贯新增的 shard 的主,如:
mysql -uclustmgr -pclustmgr_pwd -h192.168.0.129 -P59413
use postgres_$$_public
show tables;(查看是否有 student 表)
预期是这样:
故障复原机制及验证办法
1. 弹性扩缩容原理介绍
弹性扩容的指标是将业务表,按需从一个 shard 上不停机的迁徙到另一个 shard 上,从而实现数据的横向扩大,其过程有如下几个阶段:数据全量导出、数据全量导入、增量数据恢复、路由切换。
数据全量导出:
该阶段,会将待迁徙表利用 mydumper 工具进行以逻辑导出,并将数据保留在磁盘上。
数据全量导入:
该阶段,会将上一阶段导出的数据,利用 myloader 工具导入到指标分片上。
增量数据恢复:
该阶段,会利用 MySQL 的主备复制机制,依据 mydumper 导出全量镜像时的一致性位点,创立一个只同步待迁徙表的 MySQL 主备同步 channel,并命名为 expand_id(ID 为全局的工作 id),其中该 channel 的源节点为表的迁出源 shard,指标实例为扩容的指标 shard 的主节点。待增量数据重放实现后,在不进行整个数据同步通道的前提下,对源 shard 的待迁徙表进行 rename 操作,从而阻断源 shard 对该表的读写。当在指标 shard 的主机上也发现该表曾经被 rename 后,增量数据恢复阶段完结,并对数据同步通道进行清理,进入路由切换阶段。
路由切换阶段:
路由切换阶段,会先通过写入元数据集群的形式,告诉所有的计算节点,该表的路由曾经发生变化。在写入实现后,对指标 shard 上的曾经在上一阶段被 rename 的表进行复原操作,即再次 rename 为原表名。此时源 shard 上的表还是处在 rename 状态,指标 shard 上的表曾经被复原为业务的原表名。
所有的计算几点,如果曾经更新了路由,则如果此时有业务拜访该迁徙表,则会路由到指标 shardd 上。如果该计算节点,在业务申请到来时,还没有更新到最新的路由变动,依然拜访原 shardd 此时因为源 shard 上的表名依然处于 rename 状态,因而拜访会失败,晓得更新到最新的路由后,拜访能力胜利。
2. 故障场景及复原机制
故障复原机制次要须要解决的问题是:当扩容正在进行的过程中产生故障导致扩容流程中断的状况下,如何清理中间状态的数据或者当服务复原后如何持续未实现的扩容工作的问题。
当工作中断在数据全量导出完结后:
此时工作从新发动即可。
当工作中断在数据全量导入之后:
此时工作从新发动即可
当工作中断在增量数据恢复之后 :
则工作间接回滚,所有的中间状态的数据被清理掉。
集群数据备份和复原
- 创立 rbr 集群:
此集群作为源集群;
集群创立胜利后,连贯计算节点,如:
psql postgres://abc:[email protected]:59701/postgres
建设 student 表并写入数据
- 发动备份操作:(需保障 hdfs server 已启动)
hdfs 下记录复原的工夫,如:2022-08-23 13:52
- 创立另一个集群:
规格需与步骤 1 中的集群统一,参考步骤 1,作为指标集群。 - 发动复原操作:
复原胜利后,链接步骤 3 中集群的计算节点,如:
psql postgres://abc:[email protected]:59701/postgres
查看 student 表是否同步到指标集群中。
XPanel 配置和发动备份
- cluster_mgr 装置实现后,零碎会主动上报备份存储目录,在 XPanel 的备份存储指标治理页面中能够查看相干备份存储目录列表。如下图所示。
- 建好 vito 集群。
- 登录 vito 集群中的计算节点 47001 新建一个表,并退出两条数据。
- 对 vito 集群发动全量备份,操作如下图所示。
- 到集群备份列表中,查看备份后果信息,如下图所示。
cluster-mgr API 配置和发动备份
XPanel 启动集群复原
- 对已备份的 vito 集群进行复原操作。创立一个新的集群 vito3 作为复原备份的集群。如下图所示。
- vito3 的集群计算节点是 47003。登录节点查看没有任何表。
- 将 vito 集群之前的备份在 vito3 集群中进行复原。
- 回档胜利后登录 vito3 集群的计算节点 47003 查看验证是否存在 t2 表,下图所示示意复原集群胜利。
其余集群治理性能 API 和 XPanel 操作
减少存储集群
- 应用 XPanel 减少存储集群。具体操作如下图所示。
减少、删除计算节点
- 应用 XPanel 减少计算节点。操作如下图所示。
- 应用 XPanel 删除计算节点。如下图所示。
重做备机节点
- 应用 XPanel 重做备机节点。上面是对 test1 集群中的 shard1 存储集群的备机节点 57001,进行重做备机,具体操作如下图所示。
其余集群治理性能 api:
- 应用 postman 创立 rbr 集群:
返回内容是这样:
下同,jobid 会跟着变动。
- 减少 comps:
- 减少 shards:
- 减少 nodes:
- 删除 comps:
- 删除 shards:
- 删除 nodes:
- 重做 shard 备机
需保障 hdfs server 是失常运行的,才能够胜利。
- 查看 clustmgr 解决 api 的状态:
- 删除集群:
查看集群节点日志
前置条件
- 目前 kunlun 集群反对配置 ES 来收集各模块输入日志,搭建集群时(boostrap)选配是否装置 ES,如果抉择装置后,则主动实现对治理模块,计算节点和存储节点日志收集。
kibana 中查看日志:
查看 filebeat 上报的 index
配置 kibana index pattern
kibana discover 页面操作
目前日志依照机器级别上报,机器上可能有治理模块,计算节点或者存储节点。能够在 kibana 页面配置过滤规定来查看不同模块日志。
- 集群没有配置 ES,则须要登录各个模块所在集群查看日志。
查看集群节点状态
应用 XPanel 查看状态变动记录
- XPanel 中查看集群节点的状态,如下图所示。
故障报告
找到 core 文件
须要用户在本人的 linux 机器上设置:
- ulimit -c unlimited
- sysctl -w kernel.core_pattern=core.%e.%p.%t 程序运行目录下生成 core 文件,这个须要 root 权限
应用带符号的程序文件打印调用栈
如果呈现 core 文体,应用 gdb 打印调用栈
gdb -c core 文件 bin 文件 回车
进入后,输出 bt 回车
点击浏览原文
举荐浏览
KunlunBase 架构介绍
KunlunBase 技术劣势介绍
KunlunBase 技术特点介绍
END
昆仑数据库是一个 HTAP NewSQL 分布式数据库管理系统,能够满足用户对海量关系数据的存储管理和利用的全方位需要。
利用开发者和 DBA 的应用昆仑数据库的体验与单机 MySQL 和单机 PostgreSQL 简直完全相同,因为首先昆仑数据库反对 PostgreSQL 和 MySQL 双协定,反对规范 SQL:2011 的 DML 语法和性能以及 PostgreSQL 和 MySQL 对规范 SQL 的扩大。同时,昆仑数据库集群反对程度弹性扩容,数据主动拆分,分布式事务处理和分布式查询处理,强壮的容错容灾能力,欠缺直观的监测剖析告警能力,集群数据备份和复原等 罕用的 DBA 数据管理和操作。所有这些性能无需任何利用零碎侧的编码工作,也无需 DBA 人工染指,不停服不影响业务失常运行。
昆仑数据库具备全面的 OLAP 数据分析能力,通过了 TPC- H 和 TPC-DS 规范测试集,能够实时剖析最新的业务数据,帮忙用户发掘出数据的价值。昆仑数据库反对私有云和公有云环境的部署,能够与 docker,k8s 等云基础设施无缝合作,能够轻松搭建云数据库服务。
请拜访 http://www.kunlunbase.com/ 获取更多信息并且下载昆仑数据库软件、文档和材料。
KunlunBase 我的项目已开源
【GitHub:】
https://github.com/zettadb
【Gitee:】
https://gitee.com/zettadb