共计 9880 个字符,预计需要花费 25 分钟才能阅读完成。
先装置好 Golang 环境,IoTDB 时序库
MacBook Linux 树莓派 raspberrypi 装置 Golang 环境
Linux MacBook Docker 装置 IoTDB 及应用
package main
import (
"flag"
"fmt"
"log"
"math/rand"
"time"
"github.com/apache/iotdb-client-go/client"
"github.com/apache/iotdb-client-go/rpc"
)
var (
host string
port string
user string
password string
)
var session *client.Session
func main() {flag.StringVar(&host, "host", "127.0.0.1", "--host=192.168.1.100")
flag.StringVar(&port, "port", "6667", "--port=6667")
flag.StringVar(&user, "user", "root", "--user=root")
flag.StringVar(&password, "password", "root", "--password=root")
flag.Parse()
config := &client.Config{
Host: host,
Port: port,
UserName: user,
Password: password,
}
session = client.NewSession(config)
if err := session.Open(false, 0); err != nil {log.Fatal(err)
}
defer session.Close()
setStorageGroup("root.ln1")
deleteStorageGroup("root.ln1")
setStorageGroup("root.ln1")
setStorageGroup("root.ln2")
deleteStorageGroups("root.ln1", "root.ln2")
createTimeseries("root.sg1.dev1.status")
deleteTimeseries("root.sg1.dev1.status")
createMultiTimeseries()
deleteTimeseries("root.sg1.dev1.temperature")
insertStringRecord()
deleteTimeseries("root.ln.wf02.wt02.hardware")
insertRecord()
deleteTimeseries("root.sg1.dev1.status")
insertRecords()
deleteTimeseries("root.sg1.dev1.status")
insertTablet()
var timeout int64 = 1000
if ds, err := session.ExecuteQueryStatement("select * from root.ln.device1", &timeout); err == nil {printDevice1(ds)
ds.Close()} else {log.Fatal(err)
}
insertTablets()
deleteTimeseries("root.ln.device1.restart_count", "root.ln.device1.price", "root.ln.device1.tick_count", "root.ln.device1.temperature", "root.ln.device1.description", "root.ln.device1.status")
insertRecord()
deleteData()
setTimeZone()
if tz, err := getTimeZone(); err != nil {fmt.Printf("TimeZone: %s", tz)
}
executeStatement()
executeQueryStatement("select count(s3) from root.sg1.dev1")
executeRawDataQuery()
executeBatchStatement()
deleteTimeseries("root.sg1.dev1.status")
deleteTimeseries("root.ln.wf02.wt02.s5")
//0.12.x and newer
insertRecordsOfOneDevice()
deleteTimeseries("root.sg1.dev0.*")
}
func printDevice1(sds *client.SessionDataSet) {showTimestamp := !sds.IsIgnoreTimeStamp()
if showTimestamp {fmt.Print("Time\t\t\t\t")
}
for _, columnName := range sds.GetColumnNames() {fmt.Printf("%s\t", columnName)
}
fmt.Println()
for next, err := sds.Next(); err == nil && next; next, err = sds.Next() {
if showTimestamp {fmt.Printf("%s\t", sds.GetText(client.TimestampColumnName))
}
var restartCount int32
var price float64
var tickCount int64
var temperature float32
var description string
var status bool
// All of iotdb datatypes can be scan into string variables
// var restartCount string
// var price string
// var tickCount string
// var temperature string
// var description string
// var status string
if err := sds.Scan(&restartCount, &price, &tickCount, &temperature, &description, &status); err != nil {log.Fatal(err)
}
whitespace := "\t\t"
fmt.Printf("%v%s", restartCount, whitespace)
fmt.Printf("%v%s", price, whitespace)
fmt.Printf("%v%s", tickCount, whitespace)
fmt.Printf("%v%s", temperature, whitespace)
fmt.Printf("%v%s", description, whitespace)
fmt.Printf("%v%s", status, whitespace)
fmt.Println()}
}
func printDataSet0(sessionDataSet *client.SessionDataSet) {showTimestamp := !sessionDataSet.IsIgnoreTimeStamp()
if showTimestamp {fmt.Print("Time\t\t\t\t")
}
for i := 0; i < sessionDataSet.GetColumnCount(); i++ {fmt.Printf("%s\t", sessionDataSet.GetColumnName(i))
}
fmt.Println()
for next, err := sessionDataSet.Next(); err == nil && next; next, err = sessionDataSet.Next() {
if showTimestamp {fmt.Printf("%s\t", sessionDataSet.GetText(client.TimestampColumnName))
}
for i := 0; i < sessionDataSet.GetColumnCount(); i++ {columnName := sessionDataSet.GetColumnName(i)
switch sessionDataSet.GetColumnDataType(i) {
case client.BOOLEAN:
fmt.Print(sessionDataSet.GetBool(columnName))
case client.INT32:
fmt.Print(sessionDataSet.GetInt32(columnName))
case client.INT64:
fmt.Print(sessionDataSet.GetInt64(columnName))
case client.FLOAT:
fmt.Print(sessionDataSet.GetFloat(columnName))
case client.DOUBLE:
fmt.Print(sessionDataSet.GetDouble(columnName))
case client.TEXT:
fmt.Print(sessionDataSet.GetText(columnName))
default:
}
fmt.Print("\t\t")
}
fmt.Println()}
}
func printDataSet1(sds *client.SessionDataSet) {showTimestamp := !sds.IsIgnoreTimeStamp()
if showTimestamp {fmt.Print("Time\t\t\t\t")
}
for i := 0; i < sds.GetColumnCount(); i++ {fmt.Printf("%s\t", sds.GetColumnName(i))
}
fmt.Println()
for next, err := sds.Next(); err == nil && next; next, err = sds.Next() {
if showTimestamp {fmt.Printf("%s\t", sds.GetText(client.TimestampColumnName))
}
for i := 0; i < sds.GetColumnCount(); i++ {columnName := sds.GetColumnName(i)
v := sds.GetValue(columnName)
if v == nil {v = "null"}
fmt.Printf("%v\t\t", v)
}
fmt.Println()}
}
func printDataSet2(sds *client.SessionDataSet) {showTimestamp := !sds.IsIgnoreTimeStamp()
if showTimestamp {fmt.Print("Time\t\t\t\t")
}
for i := 0; i < sds.GetColumnCount(); i++ {fmt.Printf("%s\t", sds.GetColumnName(i))
}
fmt.Println()
for next, err := sds.Next(); err == nil && next; next, err = sds.Next() {
if showTimestamp {fmt.Printf("%s\t", sds.GetText(client.TimestampColumnName))
}
if record, err := sds.GetRowRecord(); err == nil {for _, field := range record.GetFields() {v := field.GetValue()
if field.IsNull() {v = "null"}
fmt.Printf("%v\t\t", v)
}
fmt.Println()}
}
}
func setStorageGroup(sg string) {checkError(session.SetStorageGroup(sg))
}
func deleteStorageGroup(sg string) {checkError(session.DeleteStorageGroup(sg))
}
func deleteStorageGroups(sgs ...string) {checkError(session.DeleteStorageGroups(sgs...))
}
func createTimeseries(path string) {
var (
dataType = client.FLOAT
encoding = client.PLAIN
compressor = client.SNAPPY
)
checkError(session.CreateTimeseries(path, dataType, encoding, compressor, nil, nil))
}
func createMultiTimeseries() {
var (paths = []string{"root.sg1.dev1.temperature"}
dataTypes = []client.TSDataType{client.TEXT}
encodings = []client.TSEncoding{client.PLAIN}
compressors = []client.TSCompressionType{client.SNAPPY}
)
checkError(session.CreateMultiTimeseries(paths, dataTypes, encodings, compressors))
}
func deleteTimeseries(paths ...string) {checkError(session.DeleteTimeseries(paths))
}
func insertStringRecord() {
var (
deviceId = "root.ln.wf02.wt02"
measurements = []string{"hardware"}
values = []string{"123"}
timestamp int64 = 12
)
checkError(session.InsertStringRecord(deviceId, measurements, values, timestamp))
}
func insertRecord() {
var (
deviceId = "root.sg1.dev1"
measurements = []string{"status"}
values = []interface{}{"123"}
dataTypes = []client.TSDataType{client.TEXT}
timestamp int64 = 12
)
checkError(session.InsertRecord(deviceId, measurements, dataTypes, values, timestamp))
}
func insertRecords() {
var (deviceId = []string{"root.sg1.dev1"}
measurements = [][]string{{"status"}}
dataTypes = [][]client.TSDataType{{client.TEXT}}
values = [][]interface{}{{"123"}}
timestamp = []int64{12}
)
checkError(session.InsertRecords(deviceId, measurements, dataTypes, values, timestamp))
}
func insertRecordsOfOneDevice() {ts := time.Now().UTC().UnixNano() / 1000000
var (
deviceId = "root.sg1.dev0"
measurementsSlice = [][]string{{"restart_count", "tick_count", "price"},
{"temperature", "description", "status"},
}
dataTypes = [][]client.TSDataType{{client.INT32, client.INT64, client.DOUBLE},
{client.FLOAT, client.TEXT, client.BOOLEAN},
}
values = [][]interface{}{{int32(1), int64(2018), float64(1988.1)},
{float32(12.1), "Test Device 1", false},
}
timestamps = []int64{ts, ts - 1}
)
checkError(session.InsertRecordsOfOneDevice(deviceId, timestamps, measurementsSlice, dataTypes, values, false))
}
func deleteData() {
var (paths = []string{"root.sg1.dev1.status"}
startTime int64 = 0
endTime int64 = 12
)
checkError(session.DeleteData(paths, startTime, endTime))
}
func insertTablet() {if tablet, err := createTablet(12); err == nil {status, err := session.InsertTablet(tablet, false)
checkError(status, err)
} else {log.Fatal(err)
}
}
func createTablet(rowCount int) (*client.Tablet, error) {tablet, err := client.NewTablet("root.ln.device1", []*client.MeasurementSchema{
{
Measurement: "restart_count",
DataType: client.INT32,
Encoding: client.RLE,
Compressor: client.SNAPPY,
}, {
Measurement: "price",
DataType: client.DOUBLE,
Encoding: client.GORILLA,
Compressor: client.SNAPPY,
}, {
Measurement: "tick_count",
DataType: client.INT64,
Encoding: client.RLE,
Compressor: client.SNAPPY,
}, {
Measurement: "temperature",
DataType: client.FLOAT,
Encoding: client.GORILLA,
Compressor: client.SNAPPY,
}, {
Measurement: "description",
DataType: client.TEXT,
Encoding: client.PLAIN,
Compressor: client.SNAPPY,
},
{
Measurement: "status",
DataType: client.BOOLEAN,
Encoding: client.RLE,
Compressor: client.SNAPPY,
},
}, rowCount)
if err != nil {return nil, err}
ts := time.Now().UTC().UnixNano() / 1000000
for row := 0; row < int(rowCount); row++ {
ts++
tablet.SetTimestamp(ts, row)
tablet.SetValueAt(rand.Int31(), 0, row)
tablet.SetValueAt(rand.Float64(), 1, row)
tablet.SetValueAt(rand.Int63(), 2, row)
tablet.SetValueAt(rand.Float32(), 3, row)
tablet.SetValueAt(fmt.Sprintf("Test Device %d", row+1), 4, row)
tablet.SetValueAt(bool(ts%2 == 0), 5, row)
}
return tablet, nil
}
func insertTablets() {tablet1, err := createTablet(8)
if err != nil {log.Fatal(err)
}
tablet2, err := createTablet(4)
if err != nil {log.Fatal(err)
}
tablets := []*client.Tablet{tablet1, tablet2}
checkError(session.InsertTablets(tablets, false))
}
func setTimeZone() {
var timeZone = "GMT"
session.SetTimeZone(timeZone)
}
func getTimeZone() (string, error) {return session.GetTimeZone()
}
func executeStatement() {
var sql = "show storage group"
sessionDataSet, err := session.ExecuteStatement(sql)
if err == nil {printDataSet0(sessionDataSet)
sessionDataSet.Close()} else {log.Println(err)
}
}
func executeQueryStatement(sql string) {
var timeout int64 = 1000
sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout)
if err == nil {printDataSet1(sessionDataSet)
sessionDataSet.Close()} else {log.Println(err)
}
}
func executeRawDataQuery() {session.ExecuteUpdateStatement("insert into root.ln.wf02.wt02(time,s5) values(1,true)")
var (paths []string = []string{"root.ln.wf02.wt02.s5"}
startTime int64 = 1
endTime int64 = 200
)
sessionDataSet, err := session.ExecuteRawDataQuery(paths, startTime, endTime)
if err == nil {printDataSet2(sessionDataSet)
sessionDataSet.Close()} else {log.Println(err)
}
}
func executeBatchStatement() {var sqls = []string{"insert into root.ln.wf02.wt02(time,s5) values(1,true)",
"insert into root.ln.wf02.wt02(time,s5) values(2,true)"}
checkError(session.ExecuteBatchStatement(sqls))
}
func checkError(status *rpc.TSStatus, err error) {
if err != nil {log.Fatal(err)
}
if status != nil {if err = client.VerifySuccess(status); err != nil {log.Println(err)
}
}
}
参考链接: https://github.com/apache/iot…
正文完