关于时序数据库:时序数据库DolphinDB文本数据加载教程

44次阅读

共计 21268 个字符,预计需要花费 54 分钟才能阅读完成。

DolphinDB 提供以下 4 个函数,将文本数据导入内存或数据库:

loadText: 将文本文件导入为内存表。

ploadText: 将文本文件并行导入为分区内存表。与 loadText 函数相比,速度更快。

loadTextEx: 将文本文件导入数据库中,包含分布式数据库,本地磁盘数据库或内存数据库。

textChunkDS:将文本文件划分为多个小数据源,再通过 mr 函数进行灵便的数据处理。

DolphinDB 的文本数据导入不仅灵便,功能丰富,而且速度十分快。DolphinDB 与 Clickhouse, MemSQL, Druid, Pandas 等业界风行的零碎相比,单线程导入的速度更快,最多可达一个数量级的劣势;多线程并行导入的状况下,速度劣势更加显著。

本教程介绍文本数据导入时的常见问题,相应的解决方案以及注意事项。

  1. 自动识别数据格式

大多数其它零碎中,导入文本数据时,须要由用户指定数据的格局。为了不便用户,DolphinDB 在导入数据时,可能自动识别数据格式。

自动识别数据格式包含两局部:字段名称辨认和数据类型辨认。如果文件的第一行没有任何一列以数字结尾,那么零碎认为第一行是文件头,蕴含了字段名称。DolphinDB 会抽取大量局部数据作为样本,并主动推断各列的数据类型。因为是基于局部数据,某些列的数据类型的辨认可能有误。然而对于大多数文本文件,毋庸手动指定各列的字段名称和数据类型,就能正确地导入到 DolphinDB 中。

请留神:DolphinDB 反对自动识别大部分 DolphinDB 提供的数据类型,然而目前暂不反对辨认 UUID 和 IPADDR 类型,在后续版本中会反对。

loadText 函数用于将数据导入 DolphinDB 内存表。下例调用 loadText 函数导入数据,并查看生成的数据表的构造。例子中波及到的数据文件请参考附录。

dataFilePath="/home/data/candle_201801.csv"
tmpTB=loadText(filename=dataFilePath);

查看数据表前 5 行数据:

select top 5 * from tmpTB;

symbol exchange cycle tradingDay date       time     open  high  low   close volume  turnover   unixTime
------ -------- ----- ---------- ---------- -------- ----- ----- ----- ----- ------- ---------- -------------
000001 SZSE     1     2018.01.02 2018.01.02 93100000 13.35 13.39 13.35 13.38 2003635 2.678558E7 1514856660000
000001 SZSE     1     2018.01.02 2018.01.02 93200000 13.37 13.38 13.33 13.33 867181  1.158757E7 1514856720000
000001 SZSE     1     2018.01.02 2018.01.02 93300000 13.32 13.35 13.32 13.35 903894  1.204971E7 1514856780000
000001 SZSE     1     2018.01.02 2018.01.02 93400000 13.35 13.38 13.35 13.35 1012000 1.352286E7 1514856840000
000001 SZSE     1     2018.01.02 2018.01.02 93500000 13.35 13.37 13.35 13.37 1601939 2.140652E7 1514856900000

调用 schema 函数查看表构造(字段名称、数据类型等信息):

tmpTB.schema().colDefs;

name       typeString typeInt comment
---------- ---------- ------- -------
symbol     SYMBOL     17
exchange   SYMBOL     17
cycle      INT        4
tradingDay DATE       6
date       DATE       6
time       INT        4
open       DOUBLE     16
high       DOUBLE     16
low        DOUBLE     16
close      DOUBLE     16
volume     INT        4
turnover   DOUBLE     16
unixTime   LONG       5
  1. 指定数据导入格局

本教程讲述的 4 个数据加载函数中,均可用 schema 参数指定一个表,内含各字段的名称、类型、格局、须要导入的列等信息。该表可蕴含以下 4 列:

  • name:字符串,示意列名
  • type:字符串,示意每列的数据类型
  • format:字符串,示意日期或工夫列的格局
  • col:整型,示意要加载的列的下标。该列的值必须是升序。

其中,name 和 type 这两列是必须的,而且必须是前两列。format 和 col 这两列是可选的,且没有先后关系的要求。

例如,咱们能够应用上面的数据表作为 schema 参数:

name         type
----------   -------
timestamp    SECOND
ID           INT
qty          INT
price        DOUBLE

2.1 提取文本文件的 schema

extractTextSchema 函数用于获取文本文件的 schema,包含字段名称和数据类型等信息。

例如,应用 extractTextSchema 函数失去本教程中示例文件的表构造:

dataFilePath="/home/data/candle_201801.csv"
schemaTB=extractTextSchema(dataFilePath)
schemaTB;

name       type
---------- ------
symbol     SYMBOL
exchange   SYMBOL
cycle      INT
tradingDay DATE
date       DATE
time       INT
open       DOUBLE
high       DOUBLE
low        DOUBLE
close      DOUBLE
volume     INT
turnover   DOUBLE
unixTime   LONG

通过 extractTextSchema 函数失去数据文件的表构造 schemaTB 当前,若表中主动解析的数据类型不合乎预期,能够应用 SQL 语句对该表进行批改,从而失去满足要求的表构造。

2.2 指定字段名称和类型

当零碎自动识别的字段名称或者数据类型不合乎预期或需要时,能够通过设置 schema 参数为文本文件中的每列指定字段名称和数据类型。

例如,若导入数据的 volume 列被自动识别为 INT 类型,而须要的 volume 类型是 LONG 类型,就须要通过 schema 参数指定 volumne 列类型为 LONG。上面的例子中,首先调用 extractTextSchema 函数失去文本文件的表构造,再依据需要批改表中列的数据类型。

dataFilePath="/home/data/candle_201801.csv"
schemaTB=extractTextSchema(dataFilePath)
update schemaTB set type="LONG" where name="volume";

应用 loadText 函数导入文本文件,将数据依照 schemaTB 所规定的字段数据类型导入到数据库中。

tmpTB=loadText(filename=dataFilePath,schema=schemaTB);

查看表中前五行的数据,volume 列数据以长整型的模式失常显示:

select top 5 * from tmpTB;

symbol exchange cycle tradingDay date       time     open  high  low   close volume  turnover   unixTime
------ -------- ----- ---------- ---------- -------- ----- ----- ----- ----- ------- ---------- -------------
000001 SZSE     1     2018.01.02 2018.01.02 93100000 13.35 13.39 13.35 13.38 2003635 2.678558E7 1514856660000
000001 SZSE     1     2018.01.02 2018.01.02 93200000 13.37 13.38 13.33 13.33 867181  1.158757E7 1514856720000
000001 SZSE     1     2018.01.02 2018.01.02 93300000 13.32 13.35 13.32 13.35 903894  1.204971E7 1514856780000
000001 SZSE     1     2018.01.02 2018.01.02 93400000 13.35 13.38 13.35 13.35 1012000 1.352286E7 1514856840000
000001 SZSE     1     2018.01.02 2018.01.02 93500000 13.35 13.37 13.35 13.37 1601939 2.140652E7 1514856900000

上例介绍了批改数据类型的状况,若要批改表中的字段名称,也能够通过同样的办法实现。

请留神,若 DolphinDB 对日期和工夫相干数据类型的解析不合乎预期,须要通过本教程第 2.3 大节的形式解决。

2.3 指定日期和工夫类型的格局

对于日期列或工夫列的数据,如果 DolphinDB 辨认的数据类型不合乎预期,不仅须要在 schema 的 type 列指定数据类型,还须要在 format 列中指定格局(用字符串示意),如 ”MM/dd/yyyy”。如何示意日期和工夫格局请参考日期和工夫的调整及格局。

上面联合例子具体阐明对日期和工夫列指定数据类型的办法。

在 DolphinDB 中执行以下脚本,生成本例所需的数据文件。

dataFilePath="/home/data/timeData.csv"
t=table(["20190623 14:54:57","20190623 15:54:23","20190623 16:30:25"] as time,`AAPL`MS`IBM as sym,2200 5400 8670 as qty,54.78 59.64 65.23 as price)
saveText(t,dataFilePath);

加载数据前,应用 extractTextSchema 函数获取该数据文件的 schema:

schemaTB=extractTextSchema(dataFilePath)
schemaTB;

name  type
----- ------
time  SECOND
sym   SYMBOL
qty   INT
price DOUBLE

显然,零碎辨认 time 列的数据类型不合乎预期。如果间接加载该文件,time 列的数据将为空。为了可能正确加载该文件 time 列的数据,须要指定 time 列的数据类型为 DATETIME,并且指定该列的格局为 ”yyyyMMdd HH:mm:ss”。

update schemaTB set type="DATETIME" where name="time"
schemaTB[`format]=["yyyyMMdd HH:mm:ss",,,];

导入数据并查看,数据显示正确:

tmpTB=loadText(dataFilePath,,schemaTB)
tmpTB;

time                sym  qty  price
------------------- ---- ---- -----
2019.06.23T14:54:57 AAPL 2200 54.78
2019.06.23T15:54:23 MS   5400 59.64
2019.06.23T16:30:25 IBM  8670 65.23

2.4 导入指定列

在导入数据时,能够通过 schema 参数指定只导入文本文件中的某几列。

下例中,只需加载文本文件中 symbol, date, open, high, close, volume, turnover 这 7 列。

首先,调用 extractTextSchema 函数失去指标文本文件的表构造。

dataFilePath="/home/data/candle_201801.csv"
schemaTB=extractTextSchema(dataFilePath);

应用 rowNo 函数为各列生成列号,赋值给 schema 表中的 col 列,而后批改 schema 表,仅保留示意须要导入的字段的行。

update schemaTB set col = rowNo(name)
schemaTB=select * from schemaTB where name in `symbol`date`open`high`close`volume`turnover;

请留神:
1. 列号从 0 开始。上例中第一列 symbol 列对应的列号是 0。
2. 导入数据时不能扭转各列的先后顺序。如果须要调整列的程序,能够将数据文件加载后,再应用 reorderColumns! 函数。

最初,应用 loadText 函数,并配置 schema 参数,导入文本文件中指定的列。

tmpTB=loadText(filename=dataFilePath,schema=schemaTB);

查看表中前 5 行,只导入了所需的列:

select top 5 * from tmpTB

symbol date       open   high  close volume turnover
------ ---------- ------ ----- ----- ------ ----------
000001 2018.01.02 9.31E7 13.35 13.35 13     2.003635E6
000001 2018.01.02 9.32E7 13.37 13.33 13     867181
000001 2018.01.02 9.33E7 13.32 13.32 13     903894
000001 2018.01.02 9.34E7 13.35 13.35 13     1.012E6
000001 2018.01.02 9.35E7 13.35 13.35 13     1.601939E6

2.5 跳过文本数据的前若干行

在数据导入时,若需跳过文件前 n 行(可能为文件阐明),可指定 skipRows 参数为 n。因为形容文件的阐明通常不会十分简短,因而这个参数的取值最大为 1024。本教程讲述的 4 个数据加载函数均反对 skipRows 参数。

下例中,通过 loadText 函数导入数据文件,并且查看该文件导入当前表的总行数,以及前 5 行的内容。

dataFilePath="/home/data/candle_201801.csv"
tmpTB=loadText(filename=dataFilePath)
select count(*) from tmpTB;

count
-----
5040

select top 5 * from tmpTB;

symbol exchange cycle tradingDay date       time     open  high  low   close volume  turnover   unixTime
------ -------- ----- ---------- ---------- -------- ----- ----- ----- ----- ------- ---------- -------------
000001 SZSE     1     2018.01.02 2018.01.02 93100000 13.35 13.39 13.35 13.38 2003635 2.678558E7 1514856660000
000001 SZSE     1     2018.01.02 2018.01.02 93200000 13.37 13.38 13.33 13.33 867181  1.158757E7 1514856720000
000001 SZSE     1     2018.01.02 2018.01.02 93300000 13.32 13.35 13.32 13.35 903894  1.204971E7 1514856780000
000001 SZSE     1     2018.01.02 2018.01.02 93400000 13.35 13.38 13.35 13.35 1012000 1.352286E7 1514856840000
000001 SZSE     1     2018.01.02 2018.01.02 93500000 13.35 13.37 13.35 13.37 1601939 2.140652E7 1514856900000

指定 skipRows 参数取值为 1000,跳过文本文件的前 1000 行导入文件:

tmpTB=loadText(filename=dataFilePath,skipRows=1000)
select count(*) from tmpTB;

count
-----
4041

select top 5 * from tmpTB;

col0   col1 col2 col3       col4       col5      col6  col7  col8  col9  col10  col11      col12
------ ---- ---- ---------- ---------- --------- ----- ----- ----- ----- ------ ---------- -------------
000001 SZSE 1    2018.01.08 2018.01.08 101000000 13.13 13.14 13.12 13.14 646912 8.48962E6  1515377400000
000001 SZSE 1    2018.01.08 2018.01.08 101100000 13.13 13.14 13.13 13.14 453647 5.958462E6 1515377460000
000001 SZSE 1    2018.01.08 2018.01.08 101200000 13.13 13.14 13.12 13.13 700853 9.200605E6 1515377520000
000001 SZSE 1    2018.01.08 2018.01.08 101300000 13.13 13.14 13.12 13.12 738920 9.697166E6 1515377580000
000001 SZSE 1    2018.01.08 2018.01.08 101400000 13.13 13.14 13.12 13.13 469800 6.168286E6 1515377640000

请留神:如上例所示,在跳过前 n 行进行导入时,若数据文件的第一行是列名,改行会作为第一行被略过。

在下面的例子中,文本文件指定 skipRows 参数导入当前,因为示意列名的第一行被跳过,列名变成了默认列名:col1,col2 等等。若须要保留列名而又指定跳过前 n 行,可先通过 extractTextSchema 函数失去文本文件的 schema,在导入时指定 schema 参数:

schema=extractTextSchema(dataFilePath)
tmpTB=loadText(filename=dataFilePath,schema=schema,skipRows=1000)
select count(*) from tmpTB;

count
-----
4041

select top 5 * from tmpTB;

symbol exchange cycle tradingDay date       time      open  high  low   close volume turnover   unixTime
------ -------- ----- ---------- ---------- --------- ----- ----- ----- ----- ------ ---------- -------------
000001 SZSE     1     2018.01.08 2018.01.08 101000000 13.13 13.14 13.12 13.14 646912 8.48962E6  1515377400000
000001 SZSE     1     2018.01.08 2018.01.08 101100000 13.13 13.14 13.13 13.14 453647 5.958462E6 1515377460000
000001 SZSE     1     2018.01.08 2018.01.08 101200000 13.13 13.14 13.12 13.13 700853 9.200605E6 1515377520000
000001 SZSE     1     2018.01.08 2018.01.08 101300000 13.13 13.14 13.12 13.12 738920 9.697166E6 1515377580000
000001 SZSE     1     2018.01.08 2018.01.08 101400000 13.13 13.14 13.12 13.13 469800 6.168286E6 1515377640000
  1. 并行导入数据

3.1 单个文件多线程载入内存

ploadText 函数可将一个文本文件以多线程的形式载入内存。该函数与 loadText 函数的语法是统一的,区别在于,ploadText函数能够疾速载入大型文件,并且生成内存分区表。它充分利用了多核 CPU 来并行载入文件,并行水平取决于服务器自身 CPU 核数量和节点的 localExecutors 配置。

上面比拟 loadText 函数与 ploadText 函数导入同一个文件的性能。

首先通过脚本生成一个 4GB 左右的文本文件:

filePath="/home/data/testFile.csv"
appendRows=100000000
t=table(rand(100,appendRows) as int,take(string('A'..'Z'),appendRows) as symbol,take(2010.01.01..2018.12.30,appendRows) as date,rand(float(100),appendRows) as float,00:00:00.000 + rand(86400000,appendRows) as time)
t.saveText(filePath);

别离通过 loadTextploadText来载入文件。本例所用节点是 6 核 12 超线程的 CPU。

timer loadText(filePath);
Time elapsed: 12629.492 ms

timer ploadText(filePath);
Time elapsed: 2669.702 ms

结果显示在此配置下,ploadText的性能是 loadText 的 4.5 倍左右。

3.2 多文件并行导入

在大数据应用领域,数据导入往往不只是一个或两个文件的导入,而是数十个甚至数百个大型文件的批量导入。为了达到更好的导入性能,倡议尽量以并行形式导入批量的数据文件。

loadTextEx 函数可将文本文件导入指定的数据库中,包含分布式数据库,本地磁盘数据库或内存数据库。因为 DolphinDB 的分区表反对并发读写,因而能够反对多线程导入数据。应用 loadTextEx 将文本数据导入到分布式数据库,具体实现为将数据先导入到内存,再由内存写入到数据库,这两个步骤由同一个函数实现,以保障高效率。

下例展现如何将磁盘上的多个文件批量写入到 DolphinDB 分区表中。首先,在 DolphinDB 中执行以下脚本,生成 100 个文件,共约 778MB,包含 1 千万条记录。

n=100000
dataFilePath="/home/data/multi/multiImport_"+string(1..100)+".csv"
for (i in 0..99){trades=table(sort(take(100*i+1..100,n)) as id,rand(`IBM`MSFT`GM`C`FB`GOOG`V`F`XOM`AMZN`TSLA`PG`S,n) as sym,take(2000.01.01..2000.06.30,n) as date,10.0+rand(2.0,n) as price1,100.0+rand(20.0,n) as price2,1000.0+rand(200.0,n) as price3,10000.0+rand(2000.0,n) as price4,10000.0+rand(3000.0,n) as price5)
    trades.saveText(dataFilePath[i])
};

创立数据库和表:

login(`admin,`123456)
dbPath="dfs://DolphinDBdatabase"
db=database(dbPath,VALUE,1..10000)
tb=db.createPartitionedTable(trades,`tb,`id);

DolphinDB 的 cut 函数可将一个向量中的元素分组。上面调用 cut 函数将待导入的文件门路进行分组,再调用 submitJob 函数,为每个线程调配写入工作,批量导入数据。

def writeData(db,file){loop(loadTextEx{db,`tb,`id,},file)
}
parallelLevel=10
for(x in dataFilePath.cut(100/parallelLevel)){submitJob("loadData"+parallelLevel,"loadData",writeData{db,x})
};

请留神:DolphinDB 的分区表不容许多个线程同时向一个分区写数据。上例中,每个文件中的分区列(id 列)取值不同,因而不会造成多个线程写入同一个分区的状况。在设计分区表的并发读写时,请确保不会有多个线程同时写入同一分区。

通过 getRecentJobs 函数能够获得以后本地节点上最近 n 个批处理作业的状态。应用 select 语句计算并行导入批量文件所需工夫,失去在 6 核 12 超线程的 CPU 上耗时约 1.59 秒。

select max(endTime) - min(startTime) from getRecentJobs() where jobId like "loadData"+string(parallelLevel)+"%";

max_endTime_sub
---------------
1590

执行以下脚本,将 100 个文件单线程程序导入数据库,记录所需工夫,耗时约 8.65 秒。

timer writeData(db, dataFilePath);
Time elapsed: 8647.645 ms

结果显示在此配置下,并行开启 10 个线程导入速度是单线程导入的 5.5 倍左右。

查看数据表中的记录条数:

select count(*) from loadTable("dfs://DolphinDBdatabase", `tb);

count
------
10000000
  1. 导入数据库前的预处理

在将数据导入数据库之前,若须要对数据进行简单的解决,例如日期和工夫数据类型的强制转换,填充空值等,能够在调用 loadTextEx 函数时指定 transform 参数。tansform 参数承受一个函数作为参数,并且要求该函数只能承受一个参数。函数的输出是一个未分区的内存表,输入也是一个未分区的内存表。须要留神的是,只有 loadTextEx 函数提供 transform 参数。

4.1 指定日期和工夫数据的数据类型

4.1.1 将数值类型示意的日期和工夫转化为指定类型

数据文件中示意工夫的数据可能是整型或者长整型,而在进行数据分析时,往往又须要将这类数据强制转化为工夫类型的格局导入并存储到数据库中。针对这种场景,可通过 loadTextEx 函数的 transform 参数为文本文件中的日期和工夫列指定相应的数据类型。

首先,创立分布式数据库和表。

login(`admin,`123456)
dataFilePath="/home/data/candle_201801.csv"
dbPath="dfs://DolphinDBdatabase"
db=database(dbPath,VALUE,2018.01.02..2018.01.30)
schemaTB=extractTextSchema(dataFilePath)
update schemaTB set type="TIME" where name="time"
tb=table(1:0,schemaTB.name,schemaTB.type)
tb=db.createPartitionedTable(tb,`tb1,`date);

自定义函数foo,用于对数据进行预处理,并返回解决过后的数据表。

def foo(mutable t){return t.replaceColumn!(`time,time(t.time/10))
}

请留神:在自定义函数体内对数据进行解决时,请尽量应用本地的批改(带有!的函数)来晋升性能。

调用 loadTextEx 函数,并且指定 transform 参数,零碎会对文本文件中的数据执行 transform 参数指定的函数,即 foo 函数,再将失去的后果保留到数据库中。

tmpTB=loadTextEx(dbHandle=db,tableName=`tb1,partitionColumns=`date,filename=dataFilePath,transform=foo);

查看表内前 5 行数据。可见 time 列是以 TIME 类型存储,而不是文本文件中的 INT 类型:

select top 5* from loadTable(dbPath,`tb1);

symbol exchange cycle tradingDay date       time               open  high  low   close volume  turnover   unixTime
------ -------- ----- ---------- ---------- ------------------ ----- ----- ----- ----- ------- ---------- -------------
000001 SZSE     1     2018.01.02 2018.01.02 02:35:10.000000000 13.35 13.39 13.35 13.38 2003635 2.678558E7 1514856660000
000001 SZSE     1     2018.01.02 2018.01.02 02:35:20.000000000 13.37 13.38 13.33 13.33 867181  1.158757E7 1514856720000
000001 SZSE     1     2018.01.02 2018.01.02 02:35:30.000000000 13.32 13.35 13.32 13.35 903894  1.204971E7 1514856780000
000001 SZSE     1     2018.01.02 2018.01.02 02:35:40.000000000 13.35 13.38 13.35 13.35 1012000 1.352286E7 1514856840000
000001 SZSE     1     2018.01.02 2018.01.02 02:35:50.000000000 13.35 13.37 13.35 13.37 1601939 2.140652E7 1514856900000

4.1.2 为文本文件中的日期和工夫相干列指定数据类型

另一种与日期和工夫列相干的解决是,文本文件中日期以 DATE 类型存储,在导入数据库时心愿以 MONTH 的模式存储。这种状况也可通过 loadTextEx 函数的 transform 参数转换该日期列的数据类型,步骤与上述过程统一。

login(`admin,`123456)
dbPath="dfs://DolphinDBdatabase"
db=database(dbPath,VALUE,2018.01.02..2018.01.30)
schemaTB=extractTextSchema(dataFilePath)
update schemaTB set type="MONTH" where name="tradingDay"
tb=table(1:0,schemaTB.name,schemaTB.type)
tb=db.createPartitionedTable(tb,`tb1,`date)
def fee(mutable t){return t.replaceColumn!(`tradingDay,month(t.tradingDay))
}
tmpTB=loadTextEx(dbHandle=db,tableName=`tb1,partitionColumns=`date,filename=dataFilePath,transform=fee);

查看表内前 5 行数据。可见 tradingDay 列是以 MONTH 类型存储,而不是文本文件中的 DATE 类型:

select top 5* from loadTable(dbPath,`tb1);

symbol exchange cycle tradingDay date       time     open  high  low   close volume  turnover   unixTime
------ -------- ----- ---------- ---------- -------- ----- ----- ----- ----- ------- ---------- -------------
000001 SZSE     1     2018.01M   2018.01.02 93100000 13.35 13.39 13.35 13.38 2003635 2.678558E7 1514856660000
000001 SZSE     1     2018.01M   2018.01.02 93200000 13.37 13.38 13.33 13.33 867181  1.158757E7 1514856720000
000001 SZSE     1     2018.01M   2018.01.02 93300000 13.32 13.35 13.32 13.35 903894  1.204971E7 1514856780000
000001 SZSE     1     2018.01M   2018.01.02 93400000 13.35 13.38 13.35 13.35 1012000 1.352286E7 1514856840000
000001 SZSE     1     2018.01M   2018.01.02 93500000 13.35 13.37 13.35 13.37 1601939 2.140652E7 1514856900000

4.2 对表内数据填充空值

transform 参数反对调用 DolphinDB 的内置函数,当内置函数要求多个参数时,咱们能够应用局部利用将多参数函数转换为一个参数的函数。例如,调用 nullFill! 函数对文本文件中的空值进行填充。

db=database(dbPath,VALUE,2018.01.02..2018.01.30)
tb=db.createPartitionedTable(tb,`tb1,`date)
tmpTB=loadTextEx(dbHandle=db,tableName=`pt,partitionColumns=`date,filename=dataFilePath,transform=nullFill!{,0});
  1. 应用 Map-Reduce 自定义数据导入

DolphinDB 反对应用 Map-Reduce 自定义数据导入,将数据按行进行划分,并将划分后的数据通过 Map-Reduce 导入到 DolphinDB。

可应用 textChunkDS 函数将文件划分为多个小文件数据源,再通过 mr 函数写入到数据库中。在调用 mr 将数据存入数据库前,用户还可进行灵便的数据处理,从而实现更简单的导入需要。

5.1 将文件中的股票和期货数据存储到两个不同的数据表

在 DolphinDB 中执行以下脚本,生成一个大小约为 1GB 的数据文件,其中包含股票数据和期货数据。

n=10000000
dataFilePath="/home/data/chunkText.csv"
trades=table(rand(`stock`futures,n) as type, rand(`IBM`MSFT`GM`C`FB`GOOG`V`F`XOM`AMZN`TSLA`PG`S,n) as sym,take(2000.01.01..2000.06.30,n) as date,10.0+rand(2.0,n) as price1,100.0+rand(20.0,n) as price2,1000.0+rand(200.0,n) as price3,10000.0+rand(2000.0,n) as price4,10000.0+rand(3000.0,n) as price5,10000.0+rand(4000.0,n) as price6,rand(10,n) as qty1,rand(100,n) as qty2,rand(1000,n) as qty3,rand(10000,n) as qty4,rand(10000,n) as qty5,rand(10000,n) as qty6)
trades.saveText(dataFilePath);

别离创立用于寄存股票数据和期货数据的分布式数据库和表:

login(`admin,`123456)
dbPath1="dfs://DolphinDBTickDatabase"
dbPath2="dfs://DolphinDBFuturesDatabase"
db1=database(dbPath1,VALUE,`IBM`MSFT`GM`C`FB`GOOG`V`F`XOM`AMZN`TSLA`PG`S)
db2=database(dbPath2,VALUE,2000.01.01..2000.06.30)
tb1=db1.createPartitionedTable(trades,`stock,`sym)
tb2=db2.createPartitionedTable(trades,`futures,`date);

定义函数,用于划分数据,并将数据写入到不同的数据库。

def divideImport(tb, mutable stockTB, mutable futuresTB)
{
    tdata1=select * from tb where type="stock"
    tdata2=select * from tb where type="futures"
    append!(stockTB, tdata1)
    append!(futuresTB, tdata2)
}

再通过 textChunkDS 函数划分文本文件,以 300MB 为单位进行划分,文件被划分成了 4 局部。

ds=textChunkDS(dataFilePath,300)
ds;

(DataSource<readTableFromFileSegment, DataSource<readTableFromFileSegment, DataSource<readTableFromFileSegment, DataSource<readTableFromFileSegment)

调用 mr 函数,指定数据源将文件导入到数据库中。因为 map 函数(由 mapFunc 参数指定)只承受一个表作为参数,这里咱们应用局部利用将多参数函数转换为一个参数的函数。

mr(ds=ds, mapFunc=divideImport{,tb1,tb2}, parallel=false);

请留神,这里每个小文件数据源可能蕴含雷同分区的数据。DolphinDB 不容许多个线程同时对雷同分区进行写入,因而要将 mr 函数的 parallel 参数设置为 false,否则会抛出异样。

查看 2 个数据库中表的前 5 行,股票数据库中均为股票数据,期货数据库中均为期货数据。

stock 表:

select top 5 * from loadTable("dfs://DolphinDBTickDatabase", `stock);

type  sym  date       price1    price2     price3      price4       price5       price6       qty1 qty2 qty3 qty4 qty5 qty6
----- ---- ---------- --------- ---------- ----------- ------------ ------------ ------------ ---- ---- ---- ---- ---- ----
stock AMZN 2000.02.14 11.224234 112.26763  1160.926836 11661.418403 11902.403305 11636.093467 4    53   450  2072 9116 12
stock AMZN 2000.03.29 10.119057 111.132165 1031.171855 10655.048121 12682.656303 11182.317321 6    21   651  2078 7971 6207
stock AMZN 2000.06.16 11.61637  101.943971 1019.122963 10768.996906 11091.395164 11239.242307 0    91   857  3129 3829 811
stock AMZN 2000.02.20 11.69517  114.607763 1005.724332 10548.273754 12548.185724 12750.524002 1    39   270  4216 8607 6578
stock AMZN 2000.02.23 11.534805 106.040664 1085.913295 11461.783565 12496.932604 12995.461331 4    35   488  4042 6500 4826

futures 表:

select top 5 * from loadTable("dfs://DolphinDBFuturesDatabase", `futures);

type    sym  date       price1    price2     price3      price4       price5       price6       qty1 qty2 qty3 qty4 qty5 ...
------- ---- ---------- --------- ---------- ----------- ------------ ------------ ------------ ---- ---- ---- ---- ---- ---
futures MSFT 2000.01.01 11.894442 106.494131 1000.600933 10927.639217 10648.298313 11680.875797 9    10   241  524  8325 ...
futures S    2000.01.01 10.13728  115.907379 1140.10161  11222.057315 10909.352983 13535.931446 3    69   461  4560 2583 ...
futures GM   2000.01.01 10.339581 112.602729 1097.198543 10938.208083 10761.688725 11121.888288 1    1    714  6701 9203 ...
futures IBM  2000.01.01 10.45422  112.229537 1087.366764 10356.28124  11829.206165 11724.680443 0    47   741  7794 5529 ...
futures TSLA 2000.01.01 11.901426 106.127109 1144.022732 10465.529256 12831.721586 10621.111858 4    43   136  9858 8487 ...
n=10000000
dataFilePath="/home/data/chunkText.csv"
trades=table(rand(`IBM`MSFT`GM`C`FB`GOOG`V`F`XOM`AMZN`TSLA`PG`S,n) as sym,sort(take(2000.01.01..2000.06.30,n)) as date,10.0+rand(2.0,n) as price1,100.0+rand(20.0,n) as price2,1000.0+rand(200.0,n) as price3,10000.0+rand(2000.0,n) as price4,10000.0+rand(3000.0,n) as price5,10000.0+rand(4000.0,n) as price6,rand(10,n) as qty1,rand(100,n) as qty2,rand(1000,n) as qty3,rand(10000,n) as qty4, rand(10000,n) as qty5, rand(1000,n) as qty6)
trades.saveText(dataFilePath);

5.2 疾速加载大文件首尾局部数据

可应用 textChunkDS 将大文件划分成多个小的数据源(chunk),而后加载首尾两个数据源。在 DolphinDB 中执行以下脚本生成数据文件:

n=10000000
dataFilePath="/home/data/chunkText.csv"
trades=table(rand(`IBM`MSFT`GM`C`FB`GOOG`V`F`XOM`AMZN`TSLA`PG`S,n) as sym,sort(take(2000.01.01..2000.06.30,n)) as date,10.0+rand(2.0,n) as price1,100.0+rand(20.0,n) as price2,1000.0+rand(200.0,n) as price3,10000.0+rand(2000.0,n) as price4,10000.0+rand(3000.0,n) as price5,10000.0+rand(4000.0,n) as price6,rand(10,n) as qty1,rand(100,n) as qty2,rand(1000,n) as qty3,rand(10000,n) as qty4, rand(10000,n) as qty5, rand(1000,n) as qty6)
trades.saveText(dataFilePath);

再通过 textChunkDS 函数划分文本文件,以 10MB 为单位进行划分。

ds=textChunkDS(dataFilePath, 10);

调用 mr 函数,加载首尾两个 chunk 的数据。因为这两个 chunk 的数据十分小,加载速度十分快。

head_tail_tb = mr(ds=[ds.head(), ds.tail()], mapFunc=x->x, finalFunc=unionAll{,false});

查看 head_tail_tb 表中的记录数以及前 5 条记录。因为数据是随机生成,记录数可能每次会略有不同,前 5 行的数据也会跟上面显示的不同。

select count(*) from head_tail_tb;

count
------
192262

查看表的前 5 行数据:

select top 5 * from head_tail_tb;

sym  date       price1    price2     price3      price4       price5       price6       qty1 qty2 qty3 qty4 qty5 qty6
---- ---------- --------- ---------- ----------- ------------ ------------ ------------ ---- ---- ---- ---- ---- ----
IBM  2000.01.01 10.978551 114.535418 1163.425635 11827.976468 11028.01038  10810.987825 2    51   396  6636 9403 937
MSFT 2000.01.01 11.776656 106.472172 1138.718459 10720.778545 10164.638399 11348.744314 9    79   691  533  5669 72
FB   2000.01.01 11.515097 118.674854 1153.305462 10478.6335   12160.662041 13874.09572  3    29   592  2097 4103 113
MSFT 2000.01.01 11.72034  105.760547 1139.238066 10669.293733 11314.226676 12560.093619 1    99   166  2282 9167 483
TSLA 2000.01.01 10.272615 114.748639 1043.019437 11508.695323 11825.865846 10495.364306 6    43   95   9433 6641 490
  1. 其它注意事项

6.1 不同编码的数据的解决

因为 DolphinDB 的字符串采纳 UTF- 8 编码,加载的文件必须是 UTF- 8 编码。若为其它模式的编码,能够在导入当前进行转化。DolphinDB 提供了 convertEncode、fromUTF8 和 toUTF8 函数,用于导入数据后对字符串编码进行转换。

例如,应用 convertEncode 函数转换表 tmpTB 中的 exchange 列的编码:

dataFilePath="/home/data/candle_201801.csv"
tmpTB=loadText(filename=dataFilePath, skipRows=0)
tmpTB.replaceColumn!(`exchange, convertEncode(tmpTB.exchange,"gbk","utf-8"));

6.2 数值类型的解析

本教程第 1 节介绍了 DolphinDB 在导入数据时的数据类型主动解析机制,本节解说数值类型数据的解析。在数据导入时,若指定数据类型为数值类型(包含 CHAR,SHORT,INT,LONG,FLOAT 和 DOUBLE),则零碎可能辨认以下几种模式的数据:

  • 数字示意的数值,例如:123
  • 以逗号分隔的数字示意的数值,例如:100,000
  • 带有小数点的数字示意的数值,即浮点数,例如:1.231
  • 迷信计数法示意的数值,例如:1.23E5

DolphinDB 在导入时会会主动疏忽数字前后的字母及其他符号,如果没有呈现任何数字,则解析为 NULL 值。上面联合例子具体阐明。

首先,执行以下脚本,创立一个文本文件。

dataFilePath="/home/data/testSym.csv"
prices1=["2131","$2,131", "N/A"]
prices2=["213.1","$213.1", "N/A"]
totals=["2.658E7","-2.658e7","2.658e-7"]
tt=table(1..3 as id, prices1 as price1, prices2 as price2, totals as total)
saveText(tt,dataFilePath);

创立的文本文件中,price1 和 price2 列中既有数字,又有字符。若不指定 schema 参数导入数据,DolphinDB 会将 price1 和 price2 列均辨认为 SYMBOL 类型:

tmpTB=loadText(dataFilePath)
tmpTB;

id price1 price2 total
-- ------ ------ --------
1  2131   213.1  2.658E7
2  $2,131 $213.1 -2.658E7
3  N/A    N/A    2.658E-7

tmpTB.schema().colDefs;

name   typeString typeInt comment
------ ---------- ------- -------
id     INT        4
price1 SYMBOL     17
price2 SYMBOL     17
total  DOUBLE     16

若别离指定 price1 和 price2 列为 INT 和 FLOAT 类型,DolphinDB 在导入时会会主动疏忽数字前后的字母及其他符号。如果没有呈现任何数字,则解析为 NULL 值。

schemaTB=table(`id`price1`price2`total as name, `INT`INT`FLOAT`DOUBLE as type)
tmpTB=loadText(dataFilePath,,schemaTB)
tmpTB;

id price1 price2     total
-- ------ ---------- --------
1  2131   213.100006 2.658E7
2  2131   213.100006 -2.658E7
3                    2.658E-7

6.3 主动脱去文本外的双引号

在 CSV 文件中,有时候会用双引号来解决文本和数值中含有的特殊字符(譬如分隔符)的字段。DolphinDB 解决这样的数据时,会主动脱去文本外的双引号。上面联合例子具体阐明。

首先生成示例数据。生成的文件中,num 列数据为应用三位分节法示意的数值。

dataFilePath="/home/data/testSym.csv"
tt=table(1..3 as id,  [""500"",""3,500"",""9,000,000""] as num)
saveText(tt,dataFilePath);

导入数据并查看表内数据,DolphinDB 主动脱去了文本外的双引号。

tmpTB=loadText(dataFilePath,,schemaTB)
tmpTB;

id num
-- -------
1  500
2  3500
3  9000000

附录

本教程的例子中应用的数据文件:candle_201801.csv。

正文完
 0