共计 8346 个字符,预计需要花费 21 分钟才能阅读完成。
企业在应用大数据分析平台时,首先须要把海量数据从多个数据源迁徙到大数据平台中。
在导入数据前,咱们须要了解 DolphinDB database 的基本概念和特点。
DolphinDB 数据表按存储介质分为 3 种类型:
- 内存表:数据只保留在本节点内存,存取速度最快,然而节点敞开后,数据将会失落。
- 本地磁盘表:数据保留在本地磁盘上,即便节点重启,也能够不便地通过脚本把数据加载到内存中。
- 分布式表:数据在物理上散布在不同的节点,通过 DolphinDB 的分布式计算引擎,逻辑上依然能够像本地表一样做对立查问。
DolphinDB 数据表按是否分区分为 2 种类型:
- 一般表
- 分区表
在传统的数据库中,分区是针对数据表的,即同一个数据库中的每个数据表能够有不同的分区计划;而 DolphinDB 的分区是针对数据库的,即一个数据库只能应用一种分区计划。如果两个表的分区计划不同,它们不能放在同一个数据库中。
DolphinDB 提供了 3 种灵便的数据导入办法:
- 通过 CSV 文本文件导入
- 通过 HDF5 文件导入
- 通过 ODBC 导入
1. 通过 CSV 文本文件导入
通过 CSV 文件进行数据直达是比拟通用的数据迁徙形式。DolphinDB 提供了 loadText、ploadText 和loadTextEx三个函数来导入 CSV 文件。上面咱们通过一个示例 CSV 文件 candle_201801.csv 来阐明这 3 个函数的用法。
1.1 loadText
语法:loadText(filename, [delimiter=’,’], [schema])
参数:
_filename_是文件名。
_delimiter_和_schema_都是可选参数。
_delimiter_用于指定不同字段的分隔符,默认是“,”。
_schema_用于数据导入后每个字段的数据类型,它是一个 table 类型。DolphinDB 提供了字段类型自动识别性能,然而某些状况下零碎自动识别的数据类型不合乎需要,比方咱们在导入示例 CSVcandle_201801.csv 时,volume 字段会被辨认成 INT 类型,实际上咱们须要 LONG 类型,这时就须要应用 schema 参数。
创立 schema table 的脚本:
nameCol = `symbol`exchange`cycle`tradingDay`date`time`open`high`low`close`volume`turnover`unixTime
typeCol = [SYMBOL,SYMBOL,INT,DATE,DATE,INT,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,DOUBLE,LONG]
schemaTb = table(nameCol as name,typeCol as type)
当表的字段十分多时,创立 schema table 的脚本会非常简短。为了防止这个问题,DolphinDB 提供了 extractTextSchema 函数,它能够从文本文件中提取表的构造,咱们只需批改须要指定的字段类型即可。
dataFilePath = "/home/data/candle_201801.csv"
schemaTb=extractTextSchema(dataFilePath)
update schemaTb set type=`LONG where name=`volume
tt=loadText(dataFilePath,,schemaTb)
1.2 ploadText
ploadText 把数据文件作为分区表并行加载到内存中,语法和 loadText 完全相同,然而 ploadText 的速度更快。ploadText 次要用于疾速载入大文件,它在设计上充分利用了多个 core 来并行载入文件,并行水平取决于服务器自身 core 数量和节点的 localExecutors 配置。
上面咱们比照 loadText 和 ploadText 的性能。
首先,通过脚本生成一个 4G 左右的 CSV 文件:
filePath = "/home/data/testFile.csv"
appendRows = 100000000
dateRange = 2010.01.01..2018.12.30
ints = rand(100, appendRows)
symbols = take(string('A'..'Z'), appendRows)
dates = take(dateRange, appendRows)
floats = rand(float(100), appendRows)
times = 00:00:00.000 + rand(86400000, appendRows)
t = table(ints as int, symbols as symbol, dates as date, floats as float, times as time)
t.saveText(filePath)
别离应用 loadText 和 ploadText 来导入文件,该节点是 4 核 8 线程的 CPU。
timer loadText(filePath);
//Time elapsed: 39728.393 ms
timer ploadText(filePath);
//Time elapsed: 10685.838 ms
结果显示,ploadText 的性能差不多是 loadText 的 4 倍。
1.3 loadTextEx
语法:loadTextEx(dbHandle, tableName, [partitionColumns], fileName, [delimiter=’,’], [schema])
参数:
_dbHandle_是数据库句柄。
_tableName_是保留数据的分布式表的表名。
_partitionColumns_、_delimiter_和_schema_是可选参数。
当分区计划不是程序分区时,须要指定_partitionColumns_,示意分区列。
_fileName_示意导入文件的名称。
_delimiter_用于指定不同字段的分隔符,默认是“,”。
_schema_用于数据导入后每个字段的数据类型,它是一个 table 类型。
loadText 函数总是把数据导入到内存,当数据文件十分宏大时,工作机的内存很容易成为瓶颈。loadTextEx 能够很好地解决这个问题,它通过边导入边保留的形式,把动态的 CSV 文件以较为平缓的数据流的形式“另存为”DolphinDB 的分布式表,而不是采纳全副导入内存再存为分区表的形式,大大降低了内存的应用需要。
首先创立用于保留数据的分布式表:
dataFilePath = "/home/data/candle_201801.csv"
tb = loadText(dataFilePath)
db=database("dfs://dataImportCSVDB",VALUE,2018.01.01..2018.01.31)
db.createPartitionedTable(tb, "cycle", "tradingDay")
而后将文件导入分布式表:
loadTextEx(db, "cycle", "tradingDay", dataFilePath)
当须要应用数据做剖析的时候,通过 loadTable 函数将分区元数据先载入内存,在理论执行查问的时候,DolphinDB 会按需加载数据到内存。
tb = database("dfs://dataImportCSVDB").loadTable("cycle")
2. 通过 HDF5 文件导入
HDF5 是一种比 CSV 更高效的二进制数据文件格式,在数据分析畛域宽泛应用。DolphinDB 也反对通过 HDF5 格式文件导入数据。
DolphinDB 通过 HDF5 插件来拜访 HDF5 文件,插件提供了以下办法:
- hdf5::ls : 列出 h5 文件中所有 Group 和 Dataset 对象。
- hdf5::lsTable:列出 h5 文件中所有 Dataset 对象。
- hdf5::hdf5DS:返回 h5 文件中 Dataset 的元数据。
- hdf5::loadHdf5:将 h5 文件导入内存表。
- hdf5::loadHdf5Ex:将 h5 文件导入分区表。
- hdf5::extractHdf5Schema:从 h5 文件中提取表构造。
调用插件办法时须要在办法后面提供 namespace,比方调用 loadHdf5 时 hdf5::loadHdf5,如果不想每次调用都应用 namespace,能够应用 use 关键字:
use hdf5
loadHdf5(filePath,tableName)
要应用 DolphinDB 的插件,首先须要下载 HDF5 插件,再将插件部署到节点的 plugins 目录下,在应用插件之前须要先加载,应用上面的脚本:
loadPlugin("plugins/hdf5/PluginHdf5.txt")
HDF5 文件的导入与 CSV 文件大同小异,比方咱们要将示例 HDF5 文件 candle_201801.h5 导入,它蕴含一个 Dataset:candle_201801,那么最简略的导入形式如下:
dataFilePath = "/home/data/candle_201801.h5"
datasetName = "candle_201801"
tmpTB = hdf5::loadHdf5(dataFilePath,datasetName)
如果须要指定数据类型导入能够应用 hdf5::extractHdf5Schema,脚本如下:
dataFilePath = "/home/data/candle_201801.h5"
datasetName = "candle_201801"
schema=hdf5::extractHdf5Schema(dataFilePath,datasetName)
update schema set type=`LONG where name=`volume
tt=hdf5::loadHdf5(dataFilePath,datasetName,schema)
如果 HDF5 文件十分宏大,工作机内存无奈反对全量载入,能够应用 hdf5::loadHdf5Ex 形式来载入数据。
首先创立用于保留数据的分布式表:
dataFilePath = "/home/data/candle_201801.h5"
datasetName = "candle_201801"
dfsPath = "dfs://dataImportHDF5DB"
tb = hdf5::loadHdf5(dataFilePath,datasetName)
db=database(dfsPath,VALUE,2018.01.01..2018.01.31)
db.createPartitionedTable(tb, "cycle", "tradingDay")
而后将 HDF5 文件通过 hdf5::loadHdf5Ex 函数导入:
hdf5::loadHdf5Ex(db, "cycle", "tradingDay", dataFilePath,datasetName)
3. 通过 ODBC 接口导入
DolphinDB 反对 ODBC 接口连贯第三方数据库,从数据库中间接将表读取成 DolphinDB 的内存数据表。应用 DolphinDB 提供的 ODBC 插件能够不便地从 ODBC 反对的数据库中迁徙数据至 DolphinDB 中。
ODBC 插件提供了以下四个办法用于操作第三方数据源数据:
- odbc::connect : 开启连贯。
- odbc::close : 敞开连贯。
- odbc::query : 依据给定的 SQL 语句查问数据并返回到 DolphinDB 的内存表。
- odbc::execute : 在第三方数据库内执行给定的 SQL 语句,不返回数据。
在应用 ODBC 插件前,须要先装置 ODBC 驱动,请参考 ODBC 插件应用教程。
上面以连贯 SQL Server 作为实例,现有数据库的具体配置为:
server:172.18.0.15
默认端口:1433
连贯用户名:sa
明码:123456
数据库名称:SZ_TAQ
数据库表选 2016 年 1 月 1 日的数据,表名 candle_201801,字段与 CSV 文件雷同。
要应用 ODBC 插件连贯 SQL Server 数据库,首先第一步是下载插件解压并拷贝 pluginsodbc 目录下所有文件到 DolphinDB Server 的 plugins/odbc 目录下,通过上面的脚本实现插件初始化:
// 载入插件
loadPlugin("plugins/odbc/odbc.cfg")
// 连贯 SQL Server
conn=odbc::connect("Driver=ODBC Driver 17 for SQL Server;Server=172.18.0.15;Database=SZ_TAQ;Uid=sa;
Pwd=123456;")
在导入数据之前,先创立分布式磁盘数据库用于保留数据:
// 从 SQL Server 中取到表构造作为 DolphinDB 导入表的模板
tb = odbc::query(conn,"select top 1 * from candle_201801")
db=database("dfs://dataImportODBC",VALUE,2018.01.01..2018.01.31)
db.createPartitionedTable(tb, "cycle", "tradingDay")
从 SQL Server 中导入数据并保留成 DolphinDB 分区表:
data = odbc::query(conn,"select * from candle_201801")
tb = database("dfs://dataImportODBC").loadTable("cycle")
tb.append!(data);
通过 ODBC 导入数据防止了文件导出导入的过程,而且通过 DolphinDB 的定时作业机制,它还能够作为时序数据定时同步的数据通道。
4. 金融数据导入案例
上面以证券市场日 K 线图数据文件导入作为示例,数据以 CSV 文件格式保留在磁盘上,共有 10 年的数据,按年度分目录保留,一共大概 100G 的数据,门路示例如下:
2008
—- 000001.csv
—- 000002.csv
—- 000003.csv
—- 000004.csv
—- …
2009
…
2018
每个文件的构造都是统一的,如图所示:
4.1 分区规划
要导入数据之前,首先要做好数据的分区规划,这波及到两个方面的考量:
- 确定分区字段。
- 确定分区的粒度。
首先依据日常的查问语句执行频率,咱们采纳 trading 和 symbol 两个字段进行组合范畴 (RANGE) 分区,通过对罕用检索字段分区,能够极大的晋升数据检索和剖析的效率。
接下来要做的是别离定义两个分区的粒度。
现有数据的时间跨度是从 2008-2018 年,所以这里依照年度对数据进行工夫上的划分,在布局工夫分区时要思考为后续进入的数据留出足够的空间,所以这里把工夫范畴设置为 2008-2030 年。
yearRange =date(2008.01M + 12*0..22)
这里股票代码有几千个,如果对股票代码按值 (VALUE) 分区,那么每个分区只是几兆大小,而分区数量则很多。分布式系统在执行查问时,会将查问语句分成多个子工作散发到不同的分区执行,所以按值分区形式会导致工作数量十分多,而工作执行工夫极短,导致系统在治理工作上破费的工夫反而大于工作自身的执行工夫,这样的分区形式显著是不合理的。这里咱们依照范畴将所有股票代码均分成 100 个区间,每个区间作为一个分区,最终分区的大小约 100M 左右。思考到前期有新的股票数据进来,所以减少了一个虚构的代码 999999,跟最初一个股票代码组成一个分区,用来保留后续新增股票的数据。
通过上面的脚本失去 symbol 字段的分区范畴:
// 遍历所有的年度目录,去重整顿出股票代码清单,并通过 cutPoint 分成 100 个区间
symbols = array(SYMBOL, 0, 100)
yearDirs = files(rootDir)[`filename]
for(yearDir in yearDirs){
path = rootDir + "/" + yearDir
symbols.append!(files(path)[`filename].upper().strReplace(".CSV",""))
}
// 去重并减少扩容空间:symbols = symbols.distinct().sort!().append!("999999");
// 均分成 100 份
symRanges = symbols.cutPoints(100)
通过下述脚本定义两个维度组合 (COMPO) 分区,创立 Database 和分区表:columns=`symbol`exchange`cycle`tradingDay`date`time`open`high`low`close`volume`turnover`unixTime
types = [SYMBOL,SYMBOL,INT,DATE,DATE,TIME,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG,DOUBLE,LONG]
dbDate=database("", RANGE, yearRange)
dbID=database("", RANGE, symRanges)
db = database(dbPath, COMPO, [dbDate, dbID])
pt=db.createPartitionedTable(table(1000000:0,columns,types), tableName, `tradingDay`symbol)
须要留神的是,分区是 DolphinDB 存储数据的最小单位,DolphinDB 对分区的写入操作是独占式的,当工作并行进行的时候,须要防止多任务同时向一个分区写入数据。本案例中每年的数据交给一个独自工作去做,各工作操作的数据边界没有重合,所以不可能产生多任务写入同一分区的状况。
4.2 导入数据
数据导入脚本的次要思路很简略,就是通过循环目录树,将所有的 CSV 文件一一读取并写入到分布式数据库表 dfs://SAMPLE_TRDDB 中,然而具体导入过程中还是会有很多细节问题。
首先碰到的问题是,CSV 文件中保留的数据格式与 DolphinDB 外部的数据格式存在差别,比方 time 字段,文件里是以“9390100000”示意准确到毫秒的工夫,如果间接读入会被辨认成数值类型,而不是 time 类型,所以这里须要用到数据转换函数 datetimeParse 联合格式化函数 format 在数据导入时进行转换。要害脚本如下:
datetimeParse(format(time,"000000000"),"HHmmssSSS")
尽管通过循环导入实现起来非常简单,然而实际上 100G 的数据是由极多的 5M 左右的细碎文件组成,如果单线程操作会期待很久,为了充分利用集群的资源,所以咱们依照年度把数据导入拆分成多个子工作,轮流发送到各节点的工作队列并行执行,进步导入的效率。这个过程分上面两步实现:
(1)定义一个自定义函数,函数的次要性能是导入指定年度目录下的所有文件:
// 循环解决年度目录下的所有数据文件
def loadCsvFromYearPath(path, dbPath, tableName){symbols = files(path)[`filename]
for(sym in symbols){
filePath = path + "/" + sym
t=loadText(filePath)
database(dbPath).loadTable(tableName).append!(select symbol, exchange,cycle, tradingDay,date,datetimeParse(format(time,"000000000"),"HHmmssSSS"),open,high,low,close,volume,turnover,unixTime from t )
}
}
(2)通过 rpc 函数联合 submitJob 函数把下面定义的函数提交到各节点去执行:
nodesAlias="NODE" + string(1..4)
years= files(rootDir)[`filename]
index = 0;
for(year in years){
yearPath = rootDir + "/" + year
des = "loadCsv_" + year
rpc(nodesAlias[index%nodesAlias.size()],submitJob,des,des,loadCsvFromYearPath,yearPath,dbPath,tableName)
index=index+1
}
数据导入过程中,能够通过 pnodeRun(getRecentJobs)来察看后台任务的实现状况。
案例残缺脚本