关于dolphindb:干货丨大数据分析语言DolphinDB脚本语言概述

32次阅读

共计 10141 个字符,预计需要花费 26 分钟才能阅读完成。

开发大数据利用,不仅须要能撑持海量数据的分布式数据库,能高效利用多核多节点的分布式计算框架,更须要一门能与分布式数据库和分布式计算有机交融、高性能易扩大、表达能力强、满足疾速开发和建模须要的编程语言。DolphinDB 从风行的 Python 和 SQL 语言吸取了灵感,设计了大数据处理脚本语言。

提到数据库语言,咱们很容易想到规范的 SQL 语言。不同于规范的 SQL,DolphinDB 编程语言功能齐全,表达能力十分弱小,完满反对命令式编程、向量化编程、函数话编程、SQL 编程、近程过程调用编程(RPC)和元编程等多种编程范式。DolphinDB 编程语言的语法和表白习惯与 Python 和 SQL 十分类似,只有对 Python 和 SQL 有肯定的理解,就能轻松把握。相对而言,把握内存时序数据库 kdb+ 的 q 语言难度要大得多。

DolphinDB 的编程语言可能满足数据科学家疾速开发和建模的需要。DolphinDB 语言简洁灵便,表达能力强,大大提高了数据科学家的开发效率。DolphinDB 反对向量化计算和分布式计算,具备极快的运行速度。上面将具体介绍 DolphinDB 编程语言的独特之处。

1. 命令式编程

与支流的脚本语言 Python、JS 等,还有强类型语言 C、C++、Java 等一样,DolphinDB 也反对命令式编程。命令式编程是指通过执行一条一条的语句,实现最终目标。DolphinDB 的命令式编程次要是用作下层模块的解决和调度。在大数据分析中,因为须要解决的数据量十分宏大,如果咱们采纳命令式编程逐行解决数据,效率会非常低下,性能也会有所降落。因而,咱们举荐在 DolphinDB 中应用其余编程形式来批量解决数据。

//DolphinDB 反对对单变量和多变量进行赋值
x = 1 2 3
y = 4 5
y += 2
x, y = y, x //swap the value of x and y
x, y =1 2 3, 4 5

// 1 到 100 累加求和
s = 0
for(x in 1:101) s += x
print s

// 数组中的元素求和
s = 0;
for(x in 1 3 5 9 15) s += x
print s

// 打印矩阵每一列的均值
m = matrix(1 2 3, 4 5 6, 7 8 9)
for(c in m) print c.avg()

// 计算 product 表中每一个产品的销售额
t= table(["TV set", "Phone", "PC"] as productId, 1200 600 800 as price, 10 20 7 as qty)
for(row in t) print row.productId + ":" + row.price * row.qty

2. 向量化编程

跟 matlab、R 等编程语言一样,DolphinDB 也反对向量化编程。后面提到的 kdb+ 数据库的 q 语言也是向量解决语言,它在简单的计算上体现出很好的性能,并且效率很高。DolphinDB 的编程语言对很多算法都进行了优化,比方对工夫序列数据计算滑动窗口指标,大大提高了向量函数的效率。

// 两个长度为 1000 万的向量相加,采纳向量化编程比命令式编程的 for 语句更加简洁,耗耗时更短。n = 10000000
a = rand(1.0, n)
b = rand(1.0, n)

// 采纳 for 语句编程,须要 12 秒
c = array(DOUBLE, n)
for(i in 0 : n)
    c[i] = a[i] + b[i]
Time elapsed: 12341.043 ms

// 采纳向量化编程,仅需 36 毫秒
c = a + b
Time elapsed: 36.901 ms

向量化编程通常是把整个向量加载到间断内存中。有时候因为内存碎片,没有找到间断内存,向量就不可用了。DolphinDB 针对这个问题,特意提供了 big array 数据类型。big array 能够把物理上不间断的内存块组成逻辑上间断的向量,即便是十分大的向量,也能在 DolphinDB 中应用,进步了零碎的可用性。

3. 函数化编程

DolphinDB 反对函数化编程的大部分性能,包含纯函数、自定义函数、λ 函数、高阶函数、局部利用和闭包。DolphinDB 内置了 400 多个函数,涵盖了各种数据类型、数据结构和零碎调用。

DolphinDB 的纯函数个性缩小了函数的副作用。在自定义函数时,DolphinDB 不能应用函数体外定义的变量。纯函数个性能够大幅度提高代码可读性和软件品质。

3.1 自定义函数

// 定义一个函数返回工作日
def getWorkDays(dates){return dates[def(x):weekday(x) between 1:5]
}

getWorkDays(2018.07.01 2018.08.01 2018.09.01 2018.10.01)

[2018.08.01, 2018.10.01] 

下面的例子定义一个函数 getWorkDays,该函数受一组日期,返回并返回在周一和周五之间的日期。函数的实现采纳了向量的过滤性能,也就是承受一个布尔型单目函数用于数据的过滤。

3.2 高阶函数

上面的一个例子咱们应用三个高阶函数 pivot、each 和 cross,干净利落的用三行代码,依据股票日内 tick 级别的报价数据,计算出两两之间的相关性。

// 模仿生成 10000000 万个数据点(股票代码,交易工夫和价格)n=10000000
syms = rand(`FB`GOOG`MSFT`AMZN`IBM, n)
time = 09:30:00.000 + rand(21600000, n)
price = 500.0 + rand(500.0, n)

// 利用 pivot 函数生成透视表
priceMatrix = pivot(avg, price, time.minute(), syms)
//each 和 ratios 函数的配合应用,为每个股票(矩阵的列)生成每分钟的回报序列
retMatrix = each(ratios, priceMatrix) - 1
//cross 和 corr 函数的配合应用,计算股票两两之间的相关性
corrMatrix = cross(corr, retMatrix, retMatrix)

     AMZN      FB        GOOG      IBM       MSFT
     --------- --------- --------- --------- ---------
AMZN|1         0.015181  -0.056245 0.005822  0.084104
FB  |0.015181  1         -0.028113 0.034159  -0.117279
GOOG|-0.056245 -0.028113 1         -0.039278 -0.025165
IBM |0.005822  0.034159  -0.039278 1         -0.049922
MSFT|0.084104  -0.117279 -0.025165 -0.049922 1

3.3 局部利用

高阶函数中的函数参数通常对参数有限度,通过局部利用,能够确保参数符合要求。例如,给定一个向量 a = 12 14 18,计算与矩阵中的每一列的相关性。因为要计算矩阵的每一列的相关性,当然能够应用高阶函数 each。然而 corr 函数须要两个参数,而矩阵只提供其中的一个参数,另一个参数必须当时给定,所以局部利用能够解决这个问题。当然咱们也能够用 for 语句来解决这个问题,但代码简短而低效。

a = 12 14 18
m = matrix(5 6 7, 1 3 2, 8 7 11)

// 应用 each 和局部利用计算矩阵中的每一列与给定向量 a 的相关性
each(corr{a}, m)

// 应用 for 语句解决下面的问题
cols = m.columns()
c = array(DOUBLE, cols)
for(i in 0:cols)
    c[i] = corr(a, m[i])

局部利用的另一个作用是使函数放弃状态。例如,在流计算中,用户通常须要给定一个音讯处理函数(message handler),承受一条新的信息,返回一个后果。然而咱们心愿音讯处理函数返回的是迄今为止所有数的平均数。这个问题咱们能够通过局部利用来解决。

def cumavg(mutable stat, newNum){stat[0] = (stat[0] * stat[1] + newNum)/(stat[1] + 1)
    stat[1] += 1
    return stat[0]
}

msgHandler = cumavg{0.0 0.0}
each(msgHandler, 1 2 3 4 5)

[1,1.5,2,2.5,3]

4.SQL 编程

DolphinDB 的编程语言不仅反对规范的 SQL,还针对工夫序列数据扩大了 SQL 的性能,如分组计算(context by)、数据透视(pivot by)、窗口函数、asof 连贯和窗口连贯等,更便于剖析工夫序列数据。单纯的 SQL 引擎表达能力无限,很难满足更加简单的数据分析和算法实现,影响开发效率。在 DolphinDB 中,脚本语言与 SQL 语言是齐全交融在一起的。

4.1 SQL 与编程语言交融

// 生成一个员工工资表
emp_wage = table(take(1..10, 100) as id, take(2017.10M + 1..10, 100).sort() as month, take(5000 5500 6000 6500, 100) as wage)

// 计算给定的一组员工的平均工资。员工列表存储在一个本地变量 empIds 中
empIds = 3 4 6 7 9
select avg(wage) from emp_wage where id in empIds group by id
id avg_wage
-- --------
3  5500
4  6000
6  6000
7  5500
9  5500

// 除计算平均工资外,同时显示员工的姓名。员工姓名应用一个字典 empName 来获取。empName = dict(1..10, `Alice`Bob`Jerry`Jessica`Mike`Tim`Henry`Anna`Kevin`Jones)
select empName[first(id)] as name, avg(wage) from emp_wage where id in empIds group by id
id name    avg_wage
-- ------- --------
3  Jerry   5500
4  Jessica 6000
6  Tim     6000
7  Henry   5500
9  Kevin   5500

下面的例子,SQL 语句的 where 子句和 select 子句别离用到了上下文中定义的数组和字典,使得原本须要通过子查问和多表联结来解决的问题,通过简略的 hash table 解决了。如果 SQL 波及到分布式数据库,这些上下文变量会主动序列化到须要的节点。这不仅让代码看上去更简洁,有更好的可读性,而且晋升了性能。在大数据分析中,很多数据表关联,即便 SQL 优化器做了很多优化,也不免带来性能问题。

4.2 context by——对面板数据的敌对反对

DolphinDB 提供了相似其余数据库系统的 window function——context by。然而与 window function 相比,context by 的语法更简洁,并且没有那么多限度,能够与 select 或 update 一起应用。

// 按股票代码进行分组,计算每个股票每天的回报。假如数据是工夫顺序排列的。update trades set ret = ratios(price) - 1.0 context by sym

// 按日期进行分组,计算每天每个股票的 ret 降序排名。select date, symbol,  ret, rank(ret, false) + 1 as rank from trades where isValid(ret) context by date

// 抉择每天 ret 排名前 10 的股票
select date, symbol, ret from trades where isValid(ret) context by date having rank(ret, false) < 10

4.3 asof join 和 window join——对时序数据的敌对反对

t1 =  table(09:30m 09:31m 09:33m 09:34m as minute, 29.2 28.9 29.3 30.1 as price)
t2 =  table(09:30m 09:31m 09:34m 09:36m as minute,  51.2 52.4 51.9 52.8 as price)
select * from aj(t1, t2, `minute)

minute price t2_minute t2_price
------ ----- --------- --------
09:30m 29.2  09:30m    51.2
09:31m 28.9  09:31m    52.4
09:33m 29.3  09:31m    52.4
09:34m 30.1  09:34m    51.9

下面的例子中,t2 中没有与 09:33m、09:34m 对应的记录,asof join(aj)会别离取 t2 中在 09:33m、09:34m 之前最近工夫对应的记录,即取 t2 中 09:31m 的记录。

p = table(1 2 3 as id, 2018.06M 2018.07M 2018.07M as month)
s = table(1 2 1 2 1 2 as id, 2018.04M 2018.04M 2018.05M 2018.05M 2018.06M 2018.06M as month, 4500 5000 6000 5000 6000 4500 as wage)
select * from wj(p, s, -3:-1,<avg(wage)>,`id`month)

id month    avg_wage
-- -------- -----------
1  2018.06M 5250
2  2018.07M 4833.333333
3  2018.07M

下面的例子阐明了 window join(wj)的用法。wj 首先取表 p 第一行记录,即 id=1,month=2018.06M。而后在表 s 中抉择 id= 1 并且 month 在 (2018.06M-3) 到(2018.06M-1),即 2018.03M 到 2018.05M 之间的记录来计算 avg(wage)。因而 avg_wage=(4500+6000)/2=5250。如此类推。

asof join 和 window join 在金融剖析畛域有着宽泛的利用。一个经典的利用是将交易表和报价表进行关联,计算个股交易成本。详情能够参考应用 Window Join 疾速预计个股交易成本。

4.4 SQL 其它扩大

为了满足大数据分析的要求,DolphinDB 对 SQL 还做了很多扩大。比方,用户的自定义函数无需编译、打包或部署,即可在 SQL 中应用。又比方 DolphinDB 反对组合字段(Composite Column),能够将简单剖析函数的多个返回值输入到数据表的一行。

factor1=3.2 1.2 5.9 6.9 11.1 9.6 1.4 7.3 2.0 0.1 6.1 2.9 6.3 8.4 5.6
factor2=1.7 1.3 4.2 6.8 9.2 1.3 1.4 7.8 7.9 9.9 9.3 4.6 7.8 2.4 8.7
t=table(take(1 2 3, 15).sort() as id, 1..15 as y, factor1, factor2)

// 在输入参数的同时,输入 t 统计值。应用自定义函数包装输入后果
def myols(y,x){r=ols(y,x,true,2)
    return r.Coefficient.beta join r.RegressionStat.statistics[0]
}
select myols(y,[factor1,factor2]) as `alpha`beta1`beta2`R2 from t group by id

id alpha     beta1     beta2     R2
-- --------- --------- --------- --------
1  1.063991  -0.258685 0.732795  0.946056
2  6.886877  -0.148325 0.303584  0.992413
3  11.833867 0.272352  -0.065526 0.144837

5. 近程过程调用编程

DolphinDB 与其余零碎相比,在近程过程调用(RPC)上的劣势次要体现在两个方面:第一,在 DolphinDB 中,无论是自定义函数还是内置函数,咱们都能够通过近程过程调用发送到其余节点上运行,而其余零碎不能近程调用与自定义函数相干的函数。第二,DolphinDB 的近程过程调用无需编译或者部署。零碎会主动把相干函数定义和所需数据序列化到近程节点。数据科学家或数据分析师在编写与近程过程调用相干的函数时,不须要工程师配合编译和部署,能够间接在线应用,极大地提高了开发和剖析效率。

上面的例子是应用 remoteRun 执行近程函数:

h = xdb("localhost", 8081)
// 在近程节点上执行一段脚本
remoteRun(h, "sum(1 3 5 7)")
16

// 上述近程调用也能够简写成
h("sum(1 3 5 7)")
16

// 在近程节点上执行一个在近程节点注册的函数
h("sum", 1 3 5 7)
16

// 在近程系节点上执行本地的自定义函数
def mysum(x) : reduce(+, x)
h(mysum, 1 3 5 7)
16

// 在近程节点(localhost:8081)上创立一个共享表 sales
h("share table(2018.07.02 2018.07.02 2018.07.03 as date, 1 2 3 as qty, 10 15 7 as price) as sales")
// 如果本地的自定义函数有依赖,依赖的自定义函数也会序列化到近程节点
defg salesSum(tableName, d): select mysum(price*qty) from objByName(tableName) where date=d
h(salesSum, "sales", 2018.07.02)
40

DolphinDB 还提供了与 分布式计算 相干的函数。mr 和 imr 别离用于开发基于 map-reduce 和迭代的 map-reduce 分布式算法。用户只须要指定分布式数据源和定制的外围函数,譬如 map 函数,reduce 函数,final 函数等。上面咱们先创立一个分布式表,增加一些模仿数据,而后演示开发计算中位数和线性回归的例子。

// 模仿生成分布式表 sample,用 id 分区
//y = 0.5 + 3x1 -0.5x2
n=10000000
x1 = pow(rand(1.0,n), 2)
x2 = norm(3.0:1.0, n)
y = 0.5 + 3 * x1 - 0.5*x2 + norm(0.0:1.0, n)
t=table(rand(10, n) as id, y, x1, x2)

login(`admin,"123456")
db = database("dfs://testdb", VALUE, 0..9)
db.createPartitionedTable(t, "sample", "id").append!(t)

利用自定义的 map 函数 myOLSMap,内置的 reudce 函数加函数(+),自定义的 final 函数 myOLSFinal,以及内置的 map-reduce 框架函数 mr,疾速构建了一个在分布式数据源上运行线性回归的函数 myOLSEx。

def myOLSMap(table, yColName, xColNames){x = matrix(take(1.0, table.rows()), table[xColNames])
    xt = x.transpose();
    return xt.dot(x), xt.dot(table[yColName])
}

def myOLSFinal(result){xtx = result[0]
    xty = result[1]
    return xtx.inv().dot(xty)[0]
}

def myOLSEx(ds, yColName, xColNames){return mr(ds, myOLSMap{, yColName, xColNames}, +, myOLSFinal)
}

// 应用本人开发的分布式算法和分布式数据源计算线性回归系数
sample = loadTable("dfs://testdb", "sample")
myOLSEx(sqlDS(<select * from sample>), `y, `x1`x2)
[0.4991, 3.0001, -0.4996]

// 应用内置的函数 ols 和未分的数据计算线性回归的系数,失去雷同的后果
ols(y, [x1,x2],true)
[0.4991, 3.0001, -0.4996]

上面这个例子,咱们结构一个算法,在分布式数据源上计算一组数据的近似中位数。算法的基本原理是利用 bucketCount 函数,在每一个节点上别离计算一组 bucket 内的数据个数,而后把各个节点上的数据累加。这样咱们能够找到中位数应该落在哪个区间内。如果这个区间不够小,进一步细分这个区间,直到小于给定的精度要求。中位数的算法须要屡次迭代,咱们因而应用了迭代计算框架 imr。

def medMap(data, range, colName): bucketCount(data[colName], double(range), 1024, true)

def medFinal(range, result){x= result.cumsum()
    index = x.asof(x[1025]/2.0)
    ranges = range[1] - range[0]
    if(index == -1)
        return (range[0] - ranges*32):range[1]
    else if(index == 1024)
        return range[0]:(range[1] + ranges*32)
    else{
        interval = ranges / 1024.0
        startValue = range[0] + (index - 1) * interval
        return startValue : (startValue + interval)
    }
}

def medEx(ds, colName, range, precision){termFunc = def(prev, cur): cur[1] - cur[0] <= precision
    return imr(ds, range, medMap{,,colName}, +, medFinal, termFunc).avg()}

// 应用本人开发的近似中位数算法,计算分布式数据的中位数。sample = loadTable("dfs://testdb", "sample")
medEx(sqlDS(<select y from sample>), `y, 0.0 : 1.0, 0.001)
-0.052973

// 应用内置的 med 函数计算未分区的数据的中位数。med(y)
-0.052947

6. 元编程

DolphinDB 反对应用元编程来动态创建表达式,如函数调用的表达式和 SQL 查问表达式。元编程的一个典型利用是定制报表。用户只须要输出数据表、字段名称和字段格局就能生成报表。具体实现如下:

// 依据输出的数据表,字段名称和格局,以及过滤条件,动静生成 SQL 表达式并执行
def generateReport(tbl, colNames, colFormat, filter){colCount = colNames.size()
    colDefs = array(ANY, colCount)
    for(i in 0:colCount){if(colFormat[i] == "") 
            colDefs[i] = sqlCol(colNames[i])
        else
            colDefs[i] = sqlCol(colNames[i], format{,colFormat[i]})
    }
    return sql(colDefs, tbl, filter).eval()}

// 模仿生成一个 100 行的数据表
t = table(1..100 as id, (1..100 + 2018.01.01) as date, rand(100.0, 100) as price, rand(10000, 100) as qty)

// 输出过滤条件,字段和格局,定制报表。过滤条件应用了元编程。generateReport(t, ["id","date","price","qty"], ["000","MM/dd/yyyy", "00.00", "#,###"], < id<5 or id>95 >)

id  date       price qty
--- ---------- ----- -----
001 01/02/2018 50.27 2,886
002 01/03/2018 30.85 1,331
003 01/04/2018 17.89 18
004 01/05/2018 51.00 6,439
096 04/07/2018 57.73 8,339
097 04/08/2018 47.16 2,425
098 04/09/2018 27.90 4,621
099 04/10/2018 31.55 7,644
100 04/11/2018 46.63 8,383

DolphinDB 编程语言为数据分析而生,天生具备解决海量数据的能力,功能强大,简略易用。如果想要理解更多对于 DolphinDB 脚本,能够参考 DolphinDB 脚本语言的混合范式编程。

此外,还提供了多种编程 API,如 R、Python、Java、C# 等,可能不便地与已有的利用集成。

Java API:dolphindb/api-java

Python 3 API:dolphindb/api-python3

Python 2.7 API:dolphindb/api-python

C# API:dolphindb/api-csharp

R: dolphindb/api-r

欢送拜访官网下载 DolphinDB database 试用版

正文完
 0