关于go:golang连接IoTDB时序库

51次阅读

共计 9880 个字符,预计需要花费 25 分钟才能阅读完成。

先装置好 Golang 环境,IoTDB 时序库

MacBook Linux 树莓派 raspberrypi 装置 Golang 环境

Linux MacBook Docker 装置 IoTDB 及应用

package main

import (
    "flag"
    "fmt"
    "log"
    "math/rand"
    "time"

    "github.com/apache/iotdb-client-go/client"
    "github.com/apache/iotdb-client-go/rpc"
)

var (
    host     string
    port     string
    user     string
    password string
)
var session *client.Session

func main() {flag.StringVar(&host, "host", "127.0.0.1", "--host=192.168.1.100")
    flag.StringVar(&port, "port", "6667", "--port=6667")
    flag.StringVar(&user, "user", "root", "--user=root")
    flag.StringVar(&password, "password", "root", "--password=root")
    flag.Parse()
    config := &client.Config{
        Host:     host,
        Port:     port,
        UserName: user,
        Password: password,
    }
    session = client.NewSession(config)
    if err := session.Open(false, 0); err != nil {log.Fatal(err)
    }
    defer session.Close()

    setStorageGroup("root.ln1")
    deleteStorageGroup("root.ln1")

    setStorageGroup("root.ln1")
    setStorageGroup("root.ln2")
    deleteStorageGroups("root.ln1", "root.ln2")

    createTimeseries("root.sg1.dev1.status")
    deleteTimeseries("root.sg1.dev1.status")

    createMultiTimeseries()
    deleteTimeseries("root.sg1.dev1.temperature")

    insertStringRecord()
    deleteTimeseries("root.ln.wf02.wt02.hardware")

    insertRecord()
    deleteTimeseries("root.sg1.dev1.status")

    insertRecords()
    deleteTimeseries("root.sg1.dev1.status")

    insertTablet()
    var timeout int64 = 1000
    if ds, err := session.ExecuteQueryStatement("select * from root.ln.device1", &timeout); err == nil {printDevice1(ds)
        ds.Close()} else {log.Fatal(err)
    }

    insertTablets()
    deleteTimeseries("root.ln.device1.restart_count", "root.ln.device1.price", "root.ln.device1.tick_count", "root.ln.device1.temperature", "root.ln.device1.description", "root.ln.device1.status")

    insertRecord()
    deleteData()

    setTimeZone()
    if tz, err := getTimeZone(); err != nil {fmt.Printf("TimeZone: %s", tz)
    }

    executeStatement()
    executeQueryStatement("select count(s3) from root.sg1.dev1")
    executeRawDataQuery()
    executeBatchStatement()

    deleteTimeseries("root.sg1.dev1.status")
    deleteTimeseries("root.ln.wf02.wt02.s5")

    //0.12.x and newer
    insertRecordsOfOneDevice()
    deleteTimeseries("root.sg1.dev0.*")
}

func printDevice1(sds *client.SessionDataSet) {showTimestamp := !sds.IsIgnoreTimeStamp()
    if showTimestamp {fmt.Print("Time\t\t\t\t")
    }

    for _, columnName := range sds.GetColumnNames() {fmt.Printf("%s\t", columnName)
    }
    fmt.Println()

    for next, err := sds.Next(); err == nil && next; next, err = sds.Next() {
        if showTimestamp {fmt.Printf("%s\t", sds.GetText(client.TimestampColumnName))
        }

        var restartCount int32
        var price float64
        var tickCount int64
        var temperature float32
        var description string
        var status bool

        // All of iotdb datatypes can be scan into string variables
        // var restartCount string
        // var price string
        // var tickCount string
        // var temperature string
        // var description string
        // var status string

        if err := sds.Scan(&restartCount, &price, &tickCount, &temperature, &description, &status); err != nil {log.Fatal(err)
        }

        whitespace := "\t\t"
        fmt.Printf("%v%s", restartCount, whitespace)
        fmt.Printf("%v%s", price, whitespace)
        fmt.Printf("%v%s", tickCount, whitespace)
        fmt.Printf("%v%s", temperature, whitespace)
        fmt.Printf("%v%s", description, whitespace)
        fmt.Printf("%v%s", status, whitespace)

        fmt.Println()}
}

func printDataSet0(sessionDataSet *client.SessionDataSet) {showTimestamp := !sessionDataSet.IsIgnoreTimeStamp()
    if showTimestamp {fmt.Print("Time\t\t\t\t")
    }

    for i := 0; i < sessionDataSet.GetColumnCount(); i++ {fmt.Printf("%s\t", sessionDataSet.GetColumnName(i))
    }
    fmt.Println()

    for next, err := sessionDataSet.Next(); err == nil && next; next, err = sessionDataSet.Next() {
        if showTimestamp {fmt.Printf("%s\t", sessionDataSet.GetText(client.TimestampColumnName))
        }
        for i := 0; i < sessionDataSet.GetColumnCount(); i++ {columnName := sessionDataSet.GetColumnName(i)
            switch sessionDataSet.GetColumnDataType(i) {
            case client.BOOLEAN:
                fmt.Print(sessionDataSet.GetBool(columnName))
            case client.INT32:
                fmt.Print(sessionDataSet.GetInt32(columnName))
            case client.INT64:
                fmt.Print(sessionDataSet.GetInt64(columnName))
            case client.FLOAT:
                fmt.Print(sessionDataSet.GetFloat(columnName))
            case client.DOUBLE:
                fmt.Print(sessionDataSet.GetDouble(columnName))
            case client.TEXT:
                fmt.Print(sessionDataSet.GetText(columnName))
            default:
            }
            fmt.Print("\t\t")
        }
        fmt.Println()}
}

func printDataSet1(sds *client.SessionDataSet) {showTimestamp := !sds.IsIgnoreTimeStamp()
    if showTimestamp {fmt.Print("Time\t\t\t\t")
    }

    for i := 0; i < sds.GetColumnCount(); i++ {fmt.Printf("%s\t", sds.GetColumnName(i))
    }
    fmt.Println()

    for next, err := sds.Next(); err == nil && next; next, err = sds.Next() {
        if showTimestamp {fmt.Printf("%s\t", sds.GetText(client.TimestampColumnName))
        }
        for i := 0; i < sds.GetColumnCount(); i++ {columnName := sds.GetColumnName(i)
            v := sds.GetValue(columnName)
            if v == nil {v = "null"}
            fmt.Printf("%v\t\t", v)
        }
        fmt.Println()}
}

func printDataSet2(sds *client.SessionDataSet) {showTimestamp := !sds.IsIgnoreTimeStamp()
    if showTimestamp {fmt.Print("Time\t\t\t\t")
    }

    for i := 0; i < sds.GetColumnCount(); i++ {fmt.Printf("%s\t", sds.GetColumnName(i))
    }
    fmt.Println()

    for next, err := sds.Next(); err == nil && next; next, err = sds.Next() {
        if showTimestamp {fmt.Printf("%s\t", sds.GetText(client.TimestampColumnName))
        }

        if record, err := sds.GetRowRecord(); err == nil {for _, field := range record.GetFields() {v := field.GetValue()
                if field.IsNull() {v = "null"}
                fmt.Printf("%v\t\t", v)
            }
            fmt.Println()}
    }
}

func setStorageGroup(sg string) {checkError(session.SetStorageGroup(sg))
}

func deleteStorageGroup(sg string) {checkError(session.DeleteStorageGroup(sg))
}

func deleteStorageGroups(sgs ...string) {checkError(session.DeleteStorageGroups(sgs...))
}

func createTimeseries(path string) {
    var (
        dataType   = client.FLOAT
        encoding   = client.PLAIN
        compressor = client.SNAPPY
    )
    checkError(session.CreateTimeseries(path, dataType, encoding, compressor, nil, nil))
}

func createMultiTimeseries() {
    var (paths       = []string{"root.sg1.dev1.temperature"}
        dataTypes   = []client.TSDataType{client.TEXT}
        encodings   = []client.TSEncoding{client.PLAIN}
        compressors = []client.TSCompressionType{client.SNAPPY}
    )
    checkError(session.CreateMultiTimeseries(paths, dataTypes, encodings, compressors))
}

func deleteTimeseries(paths ...string) {checkError(session.DeleteTimeseries(paths))
}

func insertStringRecord() {
    var (
        deviceId           = "root.ln.wf02.wt02"
        measurements       = []string{"hardware"}
        values             = []string{"123"}
        timestamp    int64 = 12
    )
    checkError(session.InsertStringRecord(deviceId, measurements, values, timestamp))
}

func insertRecord() {
    var (
        deviceId           = "root.sg1.dev1"
        measurements       = []string{"status"}
        values             = []interface{}{"123"}
        dataTypes          = []client.TSDataType{client.TEXT}
        timestamp    int64 = 12
    )
    checkError(session.InsertRecord(deviceId, measurements, dataTypes, values, timestamp))
}

func insertRecords() {
    var (deviceId     = []string{"root.sg1.dev1"}
        measurements = [][]string{{"status"}}
        dataTypes    = [][]client.TSDataType{{client.TEXT}}
        values       = [][]interface{}{{"123"}}
        timestamp    = []int64{12}
    )
    checkError(session.InsertRecords(deviceId, measurements, dataTypes, values, timestamp))
}

func insertRecordsOfOneDevice() {ts := time.Now().UTC().UnixNano() / 1000000
    var (
        deviceId          = "root.sg1.dev0"
        measurementsSlice = [][]string{{"restart_count", "tick_count", "price"},
            {"temperature", "description", "status"},
        }
        dataTypes = [][]client.TSDataType{{client.INT32, client.INT64, client.DOUBLE},
            {client.FLOAT, client.TEXT, client.BOOLEAN},
        }
        values = [][]interface{}{{int32(1), int64(2018), float64(1988.1)},
            {float32(12.1), "Test Device 1", false},
        }
        timestamps = []int64{ts, ts - 1}
    )
    checkError(session.InsertRecordsOfOneDevice(deviceId, timestamps, measurementsSlice, dataTypes, values, false))
}

func deleteData() {
    var (paths           = []string{"root.sg1.dev1.status"}
        startTime int64 = 0
        endTime   int64 = 12
    )
    checkError(session.DeleteData(paths, startTime, endTime))
}

func insertTablet() {if tablet, err := createTablet(12); err == nil {status, err := session.InsertTablet(tablet, false)
        checkError(status, err)
    } else {log.Fatal(err)
    }
}

func createTablet(rowCount int) (*client.Tablet, error) {tablet, err := client.NewTablet("root.ln.device1", []*client.MeasurementSchema{
        {
            Measurement: "restart_count",
            DataType:    client.INT32,
            Encoding:    client.RLE,
            Compressor:  client.SNAPPY,
        }, {
            Measurement: "price",
            DataType:    client.DOUBLE,
            Encoding:    client.GORILLA,
            Compressor:  client.SNAPPY,
        }, {
            Measurement: "tick_count",
            DataType:    client.INT64,
            Encoding:    client.RLE,
            Compressor:  client.SNAPPY,
        }, {
            Measurement: "temperature",
            DataType:    client.FLOAT,
            Encoding:    client.GORILLA,
            Compressor:  client.SNAPPY,
        }, {
            Measurement: "description",
            DataType:    client.TEXT,
            Encoding:    client.PLAIN,
            Compressor:  client.SNAPPY,
        },
        {
            Measurement: "status",
            DataType:    client.BOOLEAN,
            Encoding:    client.RLE,
            Compressor:  client.SNAPPY,
        },
    }, rowCount)

    if err != nil {return nil, err}
    ts := time.Now().UTC().UnixNano() / 1000000
    for row := 0; row < int(rowCount); row++ {
        ts++
        tablet.SetTimestamp(ts, row)
        tablet.SetValueAt(rand.Int31(), 0, row)
        tablet.SetValueAt(rand.Float64(), 1, row)
        tablet.SetValueAt(rand.Int63(), 2, row)
        tablet.SetValueAt(rand.Float32(), 3, row)
        tablet.SetValueAt(fmt.Sprintf("Test Device %d", row+1), 4, row)
        tablet.SetValueAt(bool(ts%2 == 0), 5, row)
    }
    return tablet, nil
}

func insertTablets() {tablet1, err := createTablet(8)
    if err != nil {log.Fatal(err)
    }
    tablet2, err := createTablet(4)
    if err != nil {log.Fatal(err)
    }

    tablets := []*client.Tablet{tablet1, tablet2}
    checkError(session.InsertTablets(tablets, false))
}

func setTimeZone() {
    var timeZone = "GMT"
    session.SetTimeZone(timeZone)
}

func getTimeZone() (string, error) {return session.GetTimeZone()
}

func executeStatement() {
    var sql = "show storage group"
    sessionDataSet, err := session.ExecuteStatement(sql)
    if err == nil {printDataSet0(sessionDataSet)
        sessionDataSet.Close()} else {log.Println(err)
    }
}

func executeQueryStatement(sql string) {
    var timeout int64 = 1000
    sessionDataSet, err := session.ExecuteQueryStatement(sql, &timeout)
    if err == nil {printDataSet1(sessionDataSet)
        sessionDataSet.Close()} else {log.Println(err)
    }
}

func executeRawDataQuery() {session.ExecuteUpdateStatement("insert into root.ln.wf02.wt02(time,s5) values(1,true)")
    var (paths     []string = []string{"root.ln.wf02.wt02.s5"}
        startTime int64    = 1
        endTime   int64    = 200
    )
    sessionDataSet, err := session.ExecuteRawDataQuery(paths, startTime, endTime)
    if err == nil {printDataSet2(sessionDataSet)
        sessionDataSet.Close()} else {log.Println(err)
    }
}

func executeBatchStatement() {var sqls = []string{"insert into root.ln.wf02.wt02(time,s5) values(1,true)",
        "insert into root.ln.wf02.wt02(time,s5) values(2,true)"}
    checkError(session.ExecuteBatchStatement(sqls))
}

func checkError(status *rpc.TSStatus, err error) {
    if err != nil {log.Fatal(err)
    }

    if status != nil {if err = client.VerifySuccess(status); err != nil {log.Println(err)
        }
    }
}

参考链接: https://github.com/apache/iot…

正文完
 0