DolphinDB反对动静加载内部插件,以扩大零碎性能。插件用C++编写,须要编译成".so"或".dll"共享库文件。本文着重介绍开发插件的办法和注意事项,并具体介绍以下几个具体场景的插件开发流程:

  • 如何开发反对工夫序列数据处理的插件函数
  • 如何开发用于解决分布式SQL的聚合函数
  • 如何开发反对新的分布式算法的插件函数
  • 如何开发反对流数据处理的插件函数
  • 如何开发反对内部数据源的插件函数

1. 如何开发插件

1.1 基本概念

DolphinDB的插件实现了能在脚本中调用的函数。一个插件函数可能是运算符函数(Operator function),也可能是零碎函数(System function),它们的区别在于,前者承受的参数个数小于等于2,而后者的函数能够承受任意个参数,并反对会话的拜访操作。

开发一个运算符函数,须要编写一个原型为ConstantSP (const ConstantSP& a, const ConstantSP& b)的C++函数。当函数参数个数为2时,ab别离为插件函数的第一和第二个参数;当参数个数为1时,b是一个占位符,没有理论用处;当没有参数时,ab均为占位符。

开发一个零碎函数,须要编写一个原型为ConstantSP (Heap* heap, vector<ConstantSP>& args)的C++函数。用户在DolphinDB中调用插件函数时传入的参数,都按程序保留在C++的向量args中。heap参数不须要用户传入。

函数原型中的ConstantSP能够示意绝大多数DolphinDB对象(标量、向量、矩阵、表,等等)。其余罕用的派生自它的变量类型有VectorSP(向量)、TableSP(表)等。

1.2 创立变量

创立标量,能够间接用new语句创立头文件ScalarImp.h中申明的类型对象,并将它赋值给一个ConstantSPConstantSP是一个通过封装的智能指针,会在变量的援用计数为0时主动开释内存,因而,用户不须要手动delete曾经创立的变量:

ConstantSP i = new Int(1);                 // 相当于1iConstantSP d = new Date(2019, 3, 14);      // 相当于2019.03.14ConstantSP s = new String("DolphinDB");    // 相当于"DolphinDB"ConstantSP voidConstant = new Void();      // 创立一个void类型变量,罕用于示意空的函数参数

头文件Util.h申明了一系列函数,用于疾速创立某个类型和格局的变量:

VectorSP v = Util::createVector(DT_INT, 10);     // 创立一个初始长度为10的int类型向量v->setInt(0, 60);                                // 相当于v[0] = 60VectorSP t = Util::createVector(DT_ANY, 0);      // 创立一个初始长度为0的any类型向量(元组)t->append(new Int(3));                           // 相当于t.append!(3)t->get(0)->setInt(4);                            // 相当于t[0] = 4// 这里不能用t->setInt(0, 4),因为t是一个元组,setInt(0, 4)只对int类型的向量无效ConstantSP seq = Util::createIndexVector(5, 10); // 相当于5..14int seq0 = seq->getInt(0);                       // 相当于seq[0]ConstantSP mat = Util::createDoubleMatrix(5, 10);// 创立一个10行5列的double类型矩阵mat->setColumn(3, seq);                          // 相当于mat[3] = seq

1.3 异样解决和参数校验

1.3.1 异样解决

插件开发时的异样抛出和解决,和个别C++开发中一样,都通过throw关键字抛出异样,try语句块解决异样。DolphinDB在头文件Exceptions.h中申明了异样类型。

插件函数若遇到运行时谬误,个别抛出RuntimeException

在插件开发时,通常会校验函数参数,如果参数不符合要求,抛出一个IllegalArgumentException。罕用的参数校验函数有:

  • ConstantSP->getType():返回变量的类型(int, char, date等等),DolphinDB的类型定义在头文件Types.h中。
  • ConstantSP->getCategory():返回变量的类别,罕用的类别有INTEGRAL(整数类型,包含int, char, short, long等)、FLOATING(浮点数类型,包含float, double等)、TEMPORAL(工夫类型,包含time, date, datetime等)、LITERAL(字符串类型,包含string, symbol等),都定义在头文件Types.h中。
  • ConstantSP->getForm():返回变量的格局(标量、向量、表等等),DolphinDB的格局定义在头文件Types.h中。
  • ConstantSP->isVector():判断变量是否为向量。
  • ConstantSP->isScalar():判断变量是否为标量。
  • ConstantSP->isTable():判断变量是否为表。
  • ConstantSP->isNumber():判断变量是否为数字类型。
  • ConstantSP->isNull():判断变量是否为空值。
  • ConstantSP->getInt():取得变量对应的整数值,罕用于判断边界。
  • ConstantSP->getString():取得变量对应的字符串。
  • ConstantSP->size():取得变量的长度。

更多参数校验函数个别在头文件CoreConcept.hConstant类办法中。

1.3.2 参数校验的范例

本节将开发一个插件函数用于求非负整数的阶乘,返回一个long类型变量。

DolphinDB中long类型的最大值为2^63 - 1,能示意的阶乘最大为25!,因而只有0~25范畴内的参数是非法的。

#include "CoreConcept.h"#include "Exceptions.h"#include "ScalarImp.h"ConstantSP factorial(const ConstantSP &n, const ConstantSP &placeholder) {    string syntax = "Usage: factorial(n). ";    if (!n->isScalar() || n->getCategory() != INTEGRAL)        throw IllegalArgumentException("factorial", syntax + "n must be an integral scalar.");    int nValue = n->getInt();    if (nValue < 0 || nValue > 25)        throw IllegalArgumentException("factorial", syntax + "n must be a non-negative integer less than 26.");    long long fact = 1;    for (int i = nValue; i > 0; i--)        fact *= i;    return new Long(fact);}

1.4 调用DolphinDB内置函数

有时会须要调用DolphinDB的内置函数对数据进行解决。有些类曾经定义了一些罕用的内置函数作为办法:

VectorSP v = Util::createIndexVector(1, 100);ConstantSP avg = v->avg();     // 相当于avg(v)ConstantSP sum2 = v->sum2();   // 相当于sum2(v)v->sort(false);                // 相当于sort(v, false)

如果须要调用其它内置函数,插件函数的类型必须是零碎函数。通过heap->currentSession()->getFunctionDef函数取得一个内置函数,而后用call办法调用它。如果该内置函数是运算符函数,应调用原型call(Heap, const ConstantSP&, const ConstantSP&);如果是零碎函数,应调用原型call(Heap, vector<ConstantSP>&)。以下是调用内置函数cumsum的一个例子:

ConstantSP v = Util::createIndexVector(1, 100);v->setTemporary(false);                                   // v的值可能在内置函数调用时被批改。如果不心愿它被批改,应先调用setTemporary(false)FunctionDefSP cumsum = heap->currentSession()->getFunctionDef("cumsum");ConstantSP result = cumsum->call(heap, v, new Void());    // 相当于cumsum(v),这里的new Void()是一个占位符,没有理论用处

2. 如何开发反对工夫序列数据处理的插件函数

DolphinDB的特色之一在于它对工夫序列有良好反对。

本章以编写一个msum函数的插件为例,介绍如何开发插件函数反对工夫序列数据处理。

工夫序列处理函数通常承受向量作为参数,并对向量中的每个元素进行计算解决。在本例中,msum函数承受两个参数:一个向量和一个窗口大小。它的原型是:

ConstantSP msum(const ConstantSP &X, const ConstantSP &window);

msum函数的返回值是一个和输出向量同样长度的向量。本例为简便起见,假设返回值是一个double类型的向量。能够通过Util::createVector函数事后为返回值调配空间:

int size = X->size();int windowSize = window->getInt();ConstantSP result = Util::createVector(DT_DOUBLE, size);

在DolphinDB插件编写时解决向量,能够循环应用getDoubleConst,getIntConst等函数,批量取得肯定长度的只读数据,保留在相应类型的缓冲区中,从缓冲区中获得数据进行计算。这样做的效率比循环应用getDouble,getInt等函数要高。本例为简便起见,对立应用getDoubleConst,每次取得长度为Util::BUF_SIZE的数据。这个函数返回一个const double*,指向缓冲区头部:

double buf[Util::BUF_SIZE];INDEX start = 0;while (start < size) {    int len = std::min(Util::BUF_SIZE, size - start);    const double *p = X->getDoubleConst(start, len, buf);    for (int i = 0; i < len; i++) {        double val = p[i];        // ...    }    start += len;}

在本例中,msum将计算X中长度为windowSize的窗口中所有数据的和。能够用一个长期变量tmpSum记录以后窗口的和,每当窗口挪动时,只有给tmpSum减少新窗口尾部的值,减去旧窗口头部的值,就能计算失去以后窗口中数据的和。为了将计算值写入result,能够循环用result->getDoubleBuffer获取一个可读写的缓冲区,写完后应用result->setDouble函数将缓冲区写回数组。setDouble函数会查看给定的缓冲区地址和变量底层贮存的地址是否统一,如果统一就不会产生数据拷贝。在少数状况下,用getDoubleBuffer取得的缓冲区就是变量理论的存储区域,这样能缩小数据拷贝,进步性能。

须要留神的是,DolphinDB用double类型的最小值(曾经定义为宏DBL_NMIN)示意double类型的NULL值,要专门判断。

返回值的前windowSize - 1个元素为NULL。能够对X中的前windowSize个元素和之后的元素用两个循环别离解决,前一个循环只计算累加,后一个循环执行加和减的操作。最终的实现如下:

ConstantSP msum(const ConstantSP &X, const ConstantSP &window) {    INDEX size = X->size();    int windowSize = window->getInt();    ConstantSP result = Util::createVector(DT_DOUBLE, size);    double buf[Util::BUF_SIZE];    double windowHeadBuf[Util::BUF_SIZE];    double resultBuf[Util::BUF_SIZE];    double tmpSum = 0.0;    INDEX start = 0;    while (start < windowSize) {        int len = std::min(Util::BUF_SIZE, windowSize - start);        const double *p = X->getDoubleConst(start, len, buf);        double *r = result->getDoubleBuffer(start, len, resultBuf);        for (int i = 0; i < len; i++) {            if (p[i] != DBL_NMIN)    // p[i] is not NULL                tmpSum += p[i];            r[i] = DBL_NMIN;        }        result->setDouble(start, len, r);        start += len;    }    result->setDouble(windowSize - 1, tmpSum);    // 上一个循环多设置了一个NULL,填充为tmpSum    while (start < size) {        int len = std::min(Util::BUF_SIZE, size - start);        const double *p = X->getDoubleConst(start, len, buf);        const double *q = X->getDoubleConst(start - windowSize, len, windowHeadBuf);        double *r = result->getDoubleBuffer(start, len, resultBuf);        for (int i = 0; i < len; i++) {            if (p[i] != DBL_NMIN)                tmpSum += p[i];            if (q[i] != DBL_NMIN)                tmpSum -= q[i];            r[i] = tmpSum;        }        result->setDouble(start, len, r);        start += len;    }    return result;}

3. 如何开发用于解决分布式SQL的聚合函数

在DolphinDB中,SQL的聚合函数通常承受一个或多个向量作为参数,最终返回一个标量。在开发聚合函数的插件时,须要理解如何拜访向量中的元素。

DolphinDB中的向量有两种存储形式。一种是惯例数组,数据在内存中间断存储;另一种是大数组,其中的数据分块存储。

本章将以编写一个求几何平均数的函数为例,介绍如何开发聚合函数,重点关注数组中元素的拜访。

3.1 聚合函数范例

几何平均数geometricMean函数承受一个向量作为参数。为了避免溢出,个别采纳其对数模式计算,即

geometricMean([x1, x2, ..., xn])    = exp((log(x1) + log(x2) + log(x3) + ... + log(xn))/n)

为了实现这个函数的分布式版本,能够先开发聚合函数插件logSum,用以计算某个分区上的数据的对数和,而后用defg关键字定义一个Reduce函数,用mapr关键字定义一个MapReduce函数。

在DolphinDB插件开发中,对数组的操作通常要思考它是惯例数组还是大数组。能够用isFastMode函数判断:

ConstantSP logSum(const ConstantSP &x, const ConstantSP &placeholder) {    if (((VectorSP) x)->isFastMode()) {        // ...    }    else {        // ...    }}

如果数组是惯例数组,它在内存中间断存储。能够用getDataArray函数取得它数据的指针。假设数据是以double类型存储的:

if (((VectorSP) x)->isFastMode()) {    int size = x->size();    double *data = (double *) x->getDataArray();        double logSum = 0;    for (int i = 0; i < size; i++) {        if (data[i] != DBL_NMIN)    // is not NULL            logSum += std::log(data[i]);    }    return new Double(logSum);}

如果数据是大数组,它在内存中分块存储。能够用getSegmentSize取得每个块的大小,用getDataSegment取得首个块的地址。它返回一个二级指针,指向一个指针数组,这个数组中的每个元素指向每个块的数据数组:

// ...else {    int size = x->size();    int segmentSize = x->getSegmentSize();    double **segments = (double **) x->getDataSegment();    INDEX start = 0;    int segmentId = 0;    double logSum = 0;    while (start < size) {        double *block = segments[segmentId];        int blockSize = std::min(segmentSize, size - start);        for (int i = 0; i < blockSize; i++) {            if (block[i] != DBL_NMIN)    // is not NULL                logSum += std::log(block[i]);        }        start += blockSize;        segmentId++;    }    return new Double(logSum);}

在理论开发中,数组的底层存储不肯定是double类型。用户须要思考具体类型。本例采纳了泛型编程对立解决不同类型,具体代码参见附件。

3.2 在DolphinDB中调用函数

通常须要实现一个聚合函数的非分布式版本和分布式版本,零碎会基于哪个版本更高效来抉择调用这个版本。

在DolphinDB中定义非分布式的geometricMean函数:

def geometricMean(x) {    return exp(logSum::logSum(x)  count(x))}

而后通过定义Map和Reduce函数,最终用mapr定义分布式的版本:

def geometricMeanMap(x) {    return logSum::logSum(x)}defg geometricMeanReduce(myLogSum, myCount) {    return exp(sum(myLogSum)  sum(myCount))}mapr geometricMean(x) { geometricMeanMap(x), count(x) -> geometricMeanReduce }

这样就实现了geometricMean函数。

如果是在单机环境中执行这个函数,只须要在执行的节点上加载插件。如果有数据位于近程节点,须要在每一个近程节点加载插件。能够手动在每个节点执行loadPlugin函数,也能够用以下脚本疾速在每个节点上加载插件:

each(rpc{, loadPlugin, pathToPlugin}, getDataNodes())

通过以下脚本创立一个分区表,验证函数:

db = database("", VALUE, 1 2 3 4)t = table(take(1..4, 100) as id, rand(1.0, 100) as val)t0 = db.createPartitionedTable(t, `tb, `id)t0.append!(t)select geometricMean(val) from t0 group by id

3.3 随机拜访大数组

能够对大数组进行随机拜访,但要通过下标计算。用getSegmentSizeInBit函数取得块大小的二进制位数,通过位运算取得块的偏移量和块内偏移量:

int segmentSizeInBit = x->getSegmentSizeInBit();int segmentMask = (1 << segmentSizeInBit) - 1;double **segments = (double **) x->getDataSegment();int index = 3000000;    // 想要拜访的下标double result = segments[index >> segmentSizeInBit][index & segmentMask];//                       ^ 块的偏移量                ^ 块内偏移量

3.4 应该抉择哪种形式拜访向量

上一章介绍了通过getDoubleConst,getIntConst等一族办法取得只读缓冲区,以及通过getDoubleBuffer,getIntBuffer等一族办法取得可读写缓冲区,这两种拜访向量的办法。本章介绍了通过getDataArraygetDataSegment办法间接拜访向量的底层存储。在理论开发中,前一种办法更通用,个别应该抉择前一种办法。但在某些特别的场合(例如明确晓得数据存储在大数组中,且晓得数据的类型),能够采纳第二种办法。

4. 如何开发反对新的分布式算法的插件函数

在DolphinDB中,Map-Reduce是执行分布式算法的通用计算框架。DolphinDB提供了mr函数和imr函数,使用户能通过脚本实现分布式算法。而在编写分布式算法的插件时,应用的同样是这两个函数。本章次要介绍如何用C++语言编写自定义的map, reduce等函数,并调用mr和imr两个函数,最终实现分布式计算。

4.1 分布式算法范例

本章将以mr为例,实现一个函数,求分布式表中相应列名的所有列平均值,介绍编写DolphinDB 分布式算法插件的整体流程,及须要留神的技术细节。

在插件开发中,用户自定义的map, reduce, final, term函数,能够是运算符函数,也能够是零碎函数。

本例的map函数,对表的一个分区内对应列名的列做计算,返回一个长度为2的元组,别离蕴含数据的和,及数据非空元素的个数。具体实现如下:

ConstantSP columnAvgMap(Heap *heap, vector<ConstantSP> &args) {    TableSP table = args[0];    ConstantSP colNames = args[1];    double sum = 0.0;    int count = 0;        for (int i = 0; i < colNames->size(); i++) {        string colName = colNames->getString(i);        VectorSP col = table->getColumn(colName);        sum += col->sum()->getDouble();        count += col->count();    }    ConstantSP result = Util::createVector(DT_ANY, 2);    result->set(0, new Double(sum));    result->set(1, new Int(count));    return result;}

本例的reduce函数,是对map后果的相加。DolphinDB的内置函数add就提供了这个性能,能够用heap->currentSession()->getFunctionDef("add")取得这个函数:

FunctionDefSP reduceFunc = heap->currentSession()->getFunctionDef("add");

本例的final函数,是对reduce后果中的数据总和sum和非空元素个数count做除法,求得所有分区中对应列的平均数。具体实现如下:

ConstantSP columnAvgFinal(const ConstantSP &result, const ConstantSP &placeholder) {    double sum = result->get(0)->getDouble();    int count = result->get(1)->getInt();        return new Double(sum / count);}

定义了map, reduce, final等函数后,将它们导出为插件函数(在头文件的函数申明前加上extern "C",并在加载插件的文本文件中列出这些函数),而后通过heap->currentSession->getFunctionDef获取这些函数,就能以这些函数为参数调用mr函数。如:

FunctionDefSP mapFunc = Heap->currentSession()->getFunctionDef("columnAvg::columnAvgMap");

在本例中,map函数承受两个参数tablecolNames,但mr只容许map函数有一个参数,因而须要以局部利用的模式调用map函数,能够用Util::createPartialFunction将它包装为局部利用,实现如下:

vector<ConstantSP> mapWithColNamesArgs {new Void(), colNames};FunctionDefSP mapWithColNames = Util::createPartitalFunction(mapFunc, mapWithColNamesArgs);

heap->currentSession()->getFunctionDef("mr")取得零碎内置函数mr,调用mr->call办法,就相当于在DolphinDB脚本中调用mr函数。最初实现的columnAvg函数定义如下:

ConstantSP columnAvg(Heap *heap, vector<ConstantSP> &args) {    ConstantSP ds = args[0];    ConstantSP colNames = args[1];    FunctionDefSP mapFunc = heap->currentSession()->getFunctionDef("columnAvg::columnAvgMap");    vector<ConstantSP> mapWithColNamesArgs = {new Void(), colNames};    FunctionDefSP mapWithColNames = Util::createPartialFunction(mapFunc, mapWithColNamesArgs);    // columnAvgMap{, colNames}    FunctionDefSP reduceFunc = heap->currentSession()->getFunctionDef("add");    FunctionDefSP finalFunc = heap->currentSession()->getFunctionDef("columnAvg::columnAvgFinal");    FunctionDefSP mr = heap->currentSession()->getFunctionDef("mr");    // mr(ds, columnAvgMap{, colNames}, add, columnAvgFinal)    vector<ConstantSP> mrArgs = {ds, mapWithColNames, reduceFunc, finalFunc};    return mr->call(heap, mrArgs);}

4.2 在DolphinDB中调用函数

如果是在单机环境中执行这个函数,只须要在执行的节点上加载插件。但如果有数据位于近程节点,须要在每一个近程节点加载插件。能够手动在每个节点执行loadPlugin函数,也能够用以下脚本疾速在每个节点上加载插件:

each(rpc{, loadPlugin, pathToPlugin}, getDataNodes())

加载插件后,用sqlDS函数生成数据源,并调用函数:

n = 100db = database("dfs://testColumnAvg", VALUE, 1..4)t = db.createPartitionedTable(table(10:0, `id`v1`v2, [INT,DOUBLE,DOUBLE]), `t, `id)t.append!(table(take(1..4, n) as id, rand(10.0, n) as v1, rand(100.0, n) as v2))ds = sqlDS(<select * from t>)columnAvg::columnAvg(ds, `v1`v2)

5.如何开发反对流数据处理的插件函数

在DolphinDB中,流数据订阅端能够通过一个handler函数解决收到的数据。订阅数据能够是一个数据表,或一个元组,由subsrciebeTable函数的msgAsTable参数决定。通常能够用handler函数对流数据进行过滤、插入另一张表等操作。

本章将编写一个handler函数。它承受的音讯类型是元组。另外承受两个参数:一个是int类型的标量或向量indices,示意元组中元素的下标,另一个是一个表table。它将元组中对应下标的列插入到表中。

向表中增加数据的接口是bool append(vector<ConstantSP>& values, INDEX& insertedRows, string& errMsg),如果插入胜利,返回true,并向insertedRows中写入插入的行数。否则返回false,并在errMsg中写入出错信息。插件的实现如下:

ConstantSP handler(Heap *heap, vector<ConstantSP> &args) {    ConstantSP indices = args[0];    TableSP table = args[1];    ConstantSP msg = args[2];    vector<ConstantSP> msgToAppend;    for (int i = 0; i < indices->size(); i++) {        int index = indices->get(i);        msgToAppend.push_back(msg->get(index));    }    INDEX insertedRows;    string errMsg;    table->append(msgToAppend, insertedRows, errMsg);    return new Void();}

在理论利用中,可能须要晓得插入出错时的起因。能够引入头文件Logger.h,将出错信息写入日志中。留神须要在编译插件时加上宏定义-DLOGGING_LEVEL_2

// ...bool success = table->append(msgToAppend, insertedRows, errMsg);if (!success)    LOG_ERR("Failed to append to table: ", errMsg);

能够用以下脚本模仿流数据写入,验证handler函数:

loadPlugin("/path/to/PluginHandler.txt")share streamTable(10:0, `id`sym`timestamp, [INT,SYMBOL,TIMESTAMP]) as t0t1 = table(10:0, `sym`timestamp, [SYMBOL,TIMESTAMP])subscribeTable(, `t0, , , handler::handler{[1,2], t1})t0.append!(table(1..100 as id, take(`a`b`c`d, 100) as symbol, now() + 1..100 as timestamp))select * from t1

6.如何开发反对内部数据源的插件函数

在为第三方数据设计可扩大的接口插件时,有几个须要关注的问题:

  1. 数据源(Data source)。数据源是一个非凡的数据对象,蕴含了数据实体的元形容,执行一个数据源能取得数据实体,可能是表、矩阵、向量等等。用户能够提供数据源调用olsEx, randomForestClassifier等分布式计算函数,也能够调用mr, imrComputingModel.h中定义的更底层的计算模型做并行计算。DolphinDB的内置函数sqlDS就通过SQL表达式获取数据源。在设计第三方数据接口时,通常须要实现一个获取数据源的函数,它将大的文件分成若干个局部,每局部都示意数据的一个子集,最初返回一个数据源的元组。数据源个别用一个Code object示意,是一个函数调用,它的参数是元数据,返回一个表。
  2. 构造(Schema)。表的构造形容了表的列数,每一列的列名和数据类型。第三方接口通常须要实现一个函数,疾速取得数据的表构造,以便用户在这个构造的根底上调整列名和列的数据类型。
  3. IO问题。在多核多CPU的环境中,IO可能成为瓶颈。DolphinDB提供了形象的IO接口,DataInputStreamDataOutputStream,这些接口封装了数据压缩,Endianness,IO类型(网络,磁盘,buffer等)等细节,不便开发。此外还特地实现了针对多线程的IO实现,BlockFileInputStreamBlockFileOutputStream。这个实现有两个长处:
  • 实现计算和IO并行。A线程在解决数据的时候,后盾线程在异步帮A线程预读取前面须要的数据。
  • 防止了多线程的磁盘竞争。当线程个数减少的时候,如果并行往同一个磁盘上读写,性能会急剧下降。这个实现,会对同一个磁盘的读写串行化,从而进步吞吐量。

本章将介绍通常须要实现的几个函数,为设计第三方数据接口提供一个简略的范例。

6.1 数据格式形容

假设本例中的数据贮存在立体文件数据库,以二进制格局按行存储,数据从文件头部间接开始存储。每行有四列,别离为id(按有符号64位长整型格局存储,8字节),symbol(按C字符串格局存储,8字节),date(按BCD码格局存储,8字节),value(按IEEE 754规范的双精度浮点数格局存储,8字节),每行共32字节。以下是一行的例子:

这一行的十六进制示意为:

0x 00 00 00 00 00 00 00 050x 49 42 4D 00 00 00 00 000x 02 00 01 09 00 03 01 030x 40 24 33 33 33 33 33 33

6.2 extractMyDataSchema函数

这个函数提取数据文件的表构造。在本例中,表构造是确定的,不须要理论读取文件。该函数提供了一个如何生成表构造的范例。它通过Util::createTable函数创立一张构造表:

ConstantSP extractMyDataSchema(const ConstantSP &placeholderA, const ConstantSP &placeholderB) {    ConstantSP colNames = Util::createVector(DT_STRING, 4);    ConstantSP colTypes = Util::createVector(DT_STRING, 4);    string names[] = {"id", "symbol", "date", "value"};    string types[] = {"LONG", "SYMBOL", "DATE", "DOUBLE"};    colNames->setString(0, 4, names);    colTypes->setString(0, 4, types);    vector<ConstantSP> schema = {colNames, colTypes};    vector<string> header = {"name", "type"};    return Util::createTable(header, schema);}

在理论开发中,可能须要以读取文件头等形式取得表构造。如何读文件将在前面介绍。

6.3 loadMyData函数

loadMyData函数读取文件,并输入一张DolphinDB表。给定一个文件的门路,能够通过Util::createBlockFileInputStream创立一个输出流,尔后,可对这个流调用readBytes函数读取给定长度的字节,readBool读取下一个bool值,readInt读取下一个int值,等等。本例给loadMyData函数设计的语法为:loadMyData(path, [start], [length])。除了承受文件门路path,还承受两个int类型的参数startlength,别离示意开始读取的行数和须要读取的总行数。createBlockFileInputStream函数能够通过参数决定开始读取的字节数和须要读取的总字节数:

ConstantSP loadMyData(Heap *heap, vector<ConstantSP> &args) {    ConstantSP path = args[0];    long long fileLength = Util::getFileLength(path->getString());    size_t bytesPerRow = 32;    int start = args.size() >= 2 ? args[1]->getInt() : 0;    int length = args.size() >= 3 ? args[2]->getInt() : fileLength / bytesPerRow - start;    DataInputStreamSP inputStream = Util::createBlockFileInputStream(path->getString(), 0, fileLength, Util::BUF_SIZE, start * bytesPerRow, length * bytesPerRow);    char buf[Util::BUF_SIZE];    size_t actualLength;        while (true) {        inputStream->readBytes(buf, Util::BUF_SIZE, actualLength);        if (actualLength <= 0)            break;        // ...    }}

在读取数据时,通常将数据缓存到数组中,期待缓冲区满后批量插入。例如,假设要读取一个内容全为char类型字节的二进制文件,将它写入一个char类型的DolphinDB向量vec。最初返回只由vec一列组成的表:

char buf[Util::BUF_SIZE];VectorSP vec = Util::createVector(DT_CHAR, 0);size_t actualLength;while (true) {    inputStream->readBytes(buf, Util::BUF_SIZE, actualLength);    if (actualLength <= 0)        break;    vec->appendChar(buf, actualLength);}vector<ConstantSP> cols = {vec};vector<string> colNames = {"col0"};return Util::createTable(colNames, cols);

本节的残缺代码请参考附件中的代码。在理论开发中,加载数据的函数可能还会承受表构造参数schema,按理论须要扭转读取的数据类型。

6.4 loadMyDataEx函数

loadMyData函数总是将数据加载到内存,当数据文件十分宏大时,工作机的内存很容易成为瓶颈。所以设计loadMyDataEx函数解决这个问题。它通过边导入边保留的形式,把动态的二进制文件以较为平缓的数据流的形式保留为DolphinDB的分布式表,而不是采纳全副导入内存再存为分区表的形式,从而升高内存的应用需要。

loadMyDataEx函数的参数能够参考DolphinDB内置函数loadTextEx。它的语法是:loadMyDataEx(dbHandle, tableName, partitionColumns, path, [start], [length])。如果数据库中的表存在,则将导入的数据增加到已有的表result中。如果表不存在,则创立一张表result,而后增加数据。最初返回这张表:

string dbPath = ((SystemHandleSP) db)->getDatabaseDir();vector<ConstantSP> existsTableArgs = {new String(dbPath), tableName};bool existsTable = heap->currentSession()->getFunctionDef("existsTable")->call(heap, existsTableArgs)->getBool();    // 相当于existsTable(dbPath, tableName)ConstantSP result;if (existsTable) {    // 表存在,间接加载表    vector<ConstantSP> loadTableArgs = {db, tableName};    result = heap->currentSession()->getFunctionDef("loadTable")->call(heap, loadTableArgs);    // 相当于loadTable(db, tableName)}else {    // 表不存在,创立表    TableSP schema = extractMyDataSchema(new Void(), new Void());    ConstantSP dummyTable = DBFileIO::createEmptyTableFromSchema(schema);    vector<ConstantSP> createTableArgs = {db, dummyTable, tableName, partitionColumns};    result = heap->currentSession()->getFunctionDef("createPartitionedTable")->call(heap, createTableArgs);    // 相当于createPartitionedTable(db, dummyTable, tableName, partitionColumns)}

读取数据并增加到表中的代码实现采纳了Pipeline框架。它的初始工作是一系列具备不同start参数的loadMyData函数调用,pipeline的follower函数是一个局部利用append!{result},相当于把整个读取数据的工作分成若干份执行,调用loadMyData分块读取后,将相应的数据通过append!插入表中。外围局部的代码如下:

int sizePerPartition = 16 * 1024 * 1024;int partitionNum = fileLength / sizePerPartition;vector<DistributedCallSP> tasks;FunctionDefSP func = Util::createSystemFunction("loadMyData", loadMyData, 1, 3, false);int partitionStart = start;int partitionLength = length / partitionNum;for (int i = 0; i < partitionNum; i++) {    if (i == partitionNum - 1)        partitionLength = length - partitionLength * i;    vector<ConstantSP> partitionArgs = {path, new Int(partitionStart), new Int(partitionLength)};    ObjectSP call = Util::createRegularFunctionCall(func, partitionArgs);    // 将会调用loadMyData(path, partitionStart, partitionLength)    tasks.push_back(new DistributedCall(call, true));    partitionStart += partitionLength;}vector<ConstantSP> appendToResultArgs = {result};FunctionDefSP appendToResult = Util::createPartialFunction(heap->currentSession()->getFunctionDef("append!"), appendToResultArgs);    // 相当于append!{result}vector<FunctionDefSP> functors = {appendToResult};PipelineStageExecutor executor(functors, false);executor.execute(heap, tasks);

本节的残缺代码请参考附件中的代码。用Pipeline框架实现数据的分块导入,只是一种思路。在具体开发时,能够采纳ComputingModel.h中申明的StaticStageExecutor,也能够应用Concurrent.h中申明的线程模型Thread。实现办法有很多种,须要依据理论场景抉择。

6.5 myDataDS函数

myDataDS函数返回一个数据源的元组。每个数据源都是一个示意函数调用的Code object,能够通过Util::createRegularFunctionCall生成。执行这个对象能够获得对应的数据。以下是基于loadMyData函数产生数据源的一个例子:

ConstantSP myDataDS(Heap *heap, vector<ConstantSP> &args) {    ConstantSP path = args[0];    long long fileLength = Util::getFileLength(path->getString());    size_t bytesPerRow = 32;    int start = args.size() >= 2 ? args[1]->getInt() : 0;    int length = args.size() >= 3 ? args[2]->getInt() : fileLength / bytesPerRow - start;    int sizePerPartition = 16 * 1024 * 1024;    int partitionNum = fileLength / sizePerPartition;    int partitionStart = start;    int partitionLength = length / partitionNum;    FunctionDefSP func = Util::createSystemFunction("loadMyData", loadMyData, 1, 3, false);    ConstantSP dataSources = Util::createVector(DT_ANY, partitionNum);    for (int i = 0; i < partitionNum; i++) {        if (i == partitionNum - 1)            partitionLength = length - partitionLength * i;        vector<ConstantSP> partitionArgs = {path, new Int(partitionStart), new Int(partitionLength)};        ObjectSP code = Util::createRegularFunctionCall(func, partitionArgs);    // 将会调用loadMyData(path, partitionStart, partitionLength)        dataSources->set(i, new DataSource(code));    }    return dataSources;}

教程中的残缺代码见https://github.com/dolphindb/Tu