关于python:干货丨Orca写数据教程

4次阅读

共计 12145 个字符,预计需要花费 31 分钟才能阅读完成。

Orca 我的项目在 DolphinDB 之上实现了 pandas API,使用户能更高效地剖析解决海量数据。在数据存储方面,与 pandas 相比,Orca 具备以下显著劣势:

  • 更灵便的抉择

Orca 不仅能像 pandas 一样在内存中进行计算,将 DatFrame 中的数据导出到磁盘,也能随时将 DataFrame 的数据以及计算结果追加到 DolphinDB 的数据表中,为后续的数据查问、剖析提供参考。

  • 更优异的性能

当数据量十分大而又须要保留数据时,在 pandas 中能够将整个 DataFrame 的数据保留到磁盘,在下一次运行 Python 程序时,用户再从新将磁盘上的数据加载到内存,这一做法无疑须要在导入和导出操作上消耗大量工夫。而 Orca 对数据的存储与计算过程均进行了优化,用户只需在程序完结前将数据写入到 DolphinDB 数据表,在下一次运行 Python 程序时,用户毋庸从新将整个表的数据加载到内存,也能够立即进行剖析和计算操作。

本文将介绍如何通过 Orca 保留数据。

1 将数据导出到磁盘

Orca 的 Series 和 DataFrame 均反对 to_csvto_excel 等将数据导出为固定格局的文件保留到指定门路的办法。上面对 to_csv 进行非凡阐明。

  • to_csv函数

pandas 的 to_csv 函数的 engine 参数的取值能够是‘c’或者‘python’,示意应用哪一种引擎进行导入。

Orca 的 to_csv 函数的 engine 参数的取值能够是 {‘c’,‘python’,‘dolphindb’},且该参数默认取值为‘dolphindb’。当取值为‘dolphindb’时,to_csv 函数会将数据导出到 DolphinDB 服务器目录下,且只反对 sep 和 append 两个参数;当取值为‘python’或‘c’时,to_csv函数会将数据导出到 python 客户端的目录下,并反对 pandas 所反对的所有参数。

示例

调用 to_csv 函数导出数据,并通过 read_csv 函数再将数据导入。以下的脚本中 ’YOUR_DIR’ 示意用户保留 csv 文件的门路。因为数据是随机生成的,每次执行生成的表数据都不雷同,上面的输入后果仅供参考。

>>> YOUR_DIR = "/dolphindb/database" # e.g. data_dir
>>> odf = orca.DataFrame({"type": np.random.choice(list("abcde"),10), "value": np.random.sample(10)*100})
>>> odf.to_csv(path_or_buf=YOUR_DIR + "/demo.csv")
>>> df1 = orca.read_csv(path=YOUR_DIR + "/demo.csv")
>>> df1
# output
  type      value
0    c  93.697510
1    c  64.533273
2    e  11.699053
3    c  46.758312
4    d   0.262836
5    e  30.315109
6    a  72.641846
7    e  60.980473
8    c  89.597063
9    d  25.223624

2 将数据保留到 DolphinDB 数据表

应用 Orca 的一个重要场景是,用户从其余数据库系统或是第三方 Web API 中获得数据后存入 DolphinDB 数据表中。本节将介绍通过 Orca 将取到的数据上传并保留到 DolphinDB 的数据表中。

Orca 数据表按存储形式分为三种:

  • 内存表:数据仅保留在内存中,存取速度最快,然而节点敞开后数据就不存在了。
  • 本地磁盘表:数据保留在本地磁盘上。能够从磁盘加载到内存。
  • 分布式表:数据存储在 DolphinDB 服务端,并未加载到内存,客户端只是取得了数据库和表的信息,通过 DolphinDB 的分布式计算引擎,依然能够像本地表一样做对立查问。

上面以例子的模式解释这三种表的区别。

  • 内存表

能够通过 read_csv 函数导入或者通过 DataFrame 函数创立。

  • read_csv函数导入

以第 1 节例子中的 csv 文件为例,像这样导入后可能间接拜访表内数据的表,咱们称之为 Orca 内存表。

>>> df1 = orca.read_csv(path=YOUR_DIR + "/demo.csv")
>>> df1
# output
  type      value
0    c  93.697510
1    c  64.533273
2    e  11.699053
3    c  46.758312
4    d   0.262836
5    e  30.315109
6    a  72.641846
7    e  60.980473
8    c  89.597063
9    d  25.223624
  • DataFrame函数创立

通过 orca.DataFrame 函数创立的内存表,也可能间接拜访表内数据:

>>> df = orca.DataFrame({"date":orca.date_range("20190101", periods=10),"price":np.random.sample(10)*100})
>>> df
# output
         date      price
0 2019-01-01  35.218404
1 2019-01-02  24.066378
2 2019-01-03   6.336181
3 2019-01-04  24.786319
4 2019-01-05  35.021376
5 2019-01-06  14.014935
6 2019-01-07   7.454209
7 2019-01-08  86.430214
8 2019-01-09  80.033767
9 2019-01-10  45.410883
  • 磁盘表

磁盘表分为本地磁盘表和磁盘分区表,其中本地磁盘表与内存表的区别就在于本地磁盘表是保留在磁盘上的内存表,不须要进行分区。而磁盘分区表则是保留在磁盘上的分区表,以下具体解释本地磁盘表。

通过 read_table 函数能够在 Orca 中加载本地磁盘表。
Orca 提供 read_table 函数,通过该函数指定 DolphinDB 数据库和表名来加载 DolphinDB 数据表的数据,该函数反对的参数如下:

  • database:数据库名称
  • table:表名
  • partition:须要导入的分区, 可选参数

请留神: read_table函数要求所要导入的数据库和表在 DolphinDB 服务器上曾经存在,若只存在数据库和没有创立表,则不能将数据胜利导入到 Python 中。

从函数定义能够看出,read_table 函数能够用于导入 Orca 的分区表, 然而当导入的表是 DolphinDB 的磁盘表时,Orca 会将表数据全都加载到内存,作为 Orca 内存表以供拜访。

示例

假如 DolphinDB Server 上已有数据库和表如下,以下的脚本中 ’YOUR_DIR’ 示意用户保留磁盘表的门路。

rows=10
tdata=table(rand(`a`b`c`d`e, rows) as type, rand(100.0, rows) as value)
saveTable(YOUR_DIR + "/testOnDiskDB", tdata, `tb1)

脚本中创立的数据库的门路为 YOUR_DIR + “/testOnDiskDB”,存储的表名为 ”tb1″。在 Python 客户端中,咱们能够通过 read_table 函数将这个磁盘表表加载到内存中,寄存在一个 Orca 的 DataFrame 对象里。

>>> df = orca.read_table(YOUR_DIR + "/testOnDiskDB", "tb1")

将上述过程整合成的 Python 中可执行的脚本如下:

>>> s = orca.default_session()
>>> data_dir = "/dolphindb/database" # e.g. data_dir
>>> tableName = "tb1"
>>> create_onDisk_table = """
rows=10
tdata=table(rand(`a`b`c`d`e, rows) as type, rand(100.0, rows) as value)
saveTable("{YOUR_DIR}" + "/testOnDiskDB", tdata, `{tbName})
""".format(YOUR_DIR=data_dir, tbName=tableName)
>>> s.run(create_onDisk_table)
>>> df = orca.read_table(data_dir + "/testOnDiskDB", tableName)
>>> df
# output
  type      value
0    e  42.537911
1    b  44.813589
2    d  28.939636
3    a  73.719393
4    b  66.576416
5    c  36.265364
6    a  43.936593
7    e  56.951759
8    e   4.290316
9    d  29.229366

上述脚本中,咱们应用的 defalut_session 实际上就是通过 orca.connect 函数创立的会话,在 Python 端,咱们能够通过这个会话与 DolphinDB 服务端进行交互。对于更多功能,请参见 DolphinDB Python API。

  • 分布式表

分布式表是 DolphinDB 举荐在生产环境下应用的数据存储形式,它反对快照级别的事务隔离,保证数据一致性。分布式表反对多正本机制,既提供了数据容错能力,又能作为数据拜访的负载平衡。在 Orca 中,能够通过 read_csv 函数指定分布式表导入数据并加载分布式表信息,或者通过 read_table 函数加载分布式表信息。

  • read_csv函数

Orca 在调用 read_csv 函数时指定 db_handle, table_name 以及 partition_columns 参数,能够间接将数据间接导入 DolphinDB 的 DFS 表,对于 read_csv 函数将数据导入分区表的具体介绍请参考 Orca 的分区表。

示例

请留神只有启用 enableDFS= 1 的集群环境或者 DolphinDB 单例模式能力应用分布式表。

以第 1 节例子中的 csv 文件为例,咱们在 DolphinDB 服务端创立一个 DFS 数据库,将 demo.csv 导入数据库:

dbPath="dfs://demoDB"
login('admin', '123456')
if(existsDatabase(dbPath))
      dropDatabase(dbPath)
db=database(dbPath, VALUE, `a`b`c`d`e)

请留神:以上脚本须要在 DolphinDB 服务端执行,在 Python 客户端中则能够通过 DolphinDB Python API 执行脚本。

在 Python 客户端中调用 Orca 的 read_csv 函数,指定数据库 db_handle 为 DFS 数据库 ”dfs://demoDB”,指定表名 table_name 为 ”tb1″ 和进行分区的列 partition_columns 为 ”type”,将数据导入到 DolphinDB 分区表,这时,read_csv 函数返回的是一个示意 DolphinDB 分区表的对象,客户端并不能间接拜访表内的数据。在后续的计算中,Orca 才会从服务端下载计算所需数据。

>>> df = orca.read_csv(path=YOUR_DIR + "/demo.csv", dtype={"type": "SYMBOL", "value": np.float64}, db_handle="dfs://demoDB", table_name="tb1", partition_columns="type")
>>> df
# output
<'dolphindb.orca.core.frame.DataFrame' object representing a column in a DolphinDB segmented table>

若须要查看 df 内的数据,能够调用 to_pandas 函数查看,因为分区表的数据分布在各个分区上,调用 to_pandas 函数会将所有数据下载到客户端,且依照分区的程序输入数据。

>>> df.to_pandas()
# output
   type      value
 0    a  72.641846
 1    c  93.697510
 2    c  64.533273
 3    c  46.758312
 4    c  89.597063
 5    d   0.262836
 6    d  25.223624
 7    e  11.699053
 8    e  30.315109
 9    e  60.980473

将上述过程整合成的 Python 中可执行的脚本如下:

>>> YOUR_DIR = "/dolphindb/database" # e.g. data_dir
>>> s = orca.default_session()
>>> dfsDatabase = "dfs://demoDB"
>>> create_database = """dbPath='{dbPath}'login('admin','123456')
if(existsDatabase(dbPath))
    dropDatabase(dbPath)
db=database(dbPath, VALUE, `a`b`c`d`e)
""".format(dbPath=dfsDatabase)
>>> s.run(create_database)
>>> df=orca.read_csv(path=YOUR_DIR +"/demo.csv", dtype={"type": "SYMBOL", "value": np.float64},
                     db_handle=dfsDatabase, table_name="tb1", partition_columns="type")

请留神:在通过 read_csv 函数指定数据库导入数据之前,须要确保在 DolphinDB 服务器上曾经创立了对应的数据库。read_csv 函数依据指定的数据库,表名和分区字段导入数据到 DolphinDB 数据库中,若表存在则追加数据,若表不存在则创立表并且导入数据。

  • read_table函数加载分区表信息

若 Orca 调用 read_table 函数加载的是磁盘分区表或者 dfs 分区表,则数据不会在加载的时候被下载,以上述例子中创立的 dfs 分区表为例:

>>> df = orca.read_table("dfs://demoDB", "tb1")
>>> df
# output
<'orca.core.frame.DataFrame' object representing a column in a DolphinDB segmented table>

对 df 进行计算,则下载数据进行计算:

>>> df.groupby("type").mean()
# output
          value
type           
 a     72.641846
 c     73.646539
 d     12.743230
 e     34.331545

上面介绍向 Orca 的数据表写数据的过程。

2.1 保留数据到 Orca 内存表

pandas 提供的 append 函数用于将一个 DataFrame 追加到另一个 Dataframe,并返回一个新的 DataFrame,不会对原有的 DataFrame 进行批改。在 Orca 中,append 函数还反对 inplace 参数,当它为 True 时,会将追加的数据保留到 Dataframe 中,对原有的 DataFrame 进行了批改,这个过程就是将数据追加到 Orca 的内存表中。

>>> df1 = orca.DataFrame({"date":orca.date_range("20190101", periods=10),
                          "price":np.random.sample(10)*100})
>>> df1
# output
        date      price
0 2019-01-01  17.884136
1 2019-01-02  57.840625
2 2019-01-03  29.781247
3 2019-01-04  89.968203
4 2019-01-05  19.355847
5 2019-01-06  74.684634
6 2019-01-07  91.678632
7 2019-01-08  93.927549
8 2019-01-09  47.041906
9 2019-01-10  96.810450

>>> df2 = orca.DataFrame({"date":orca.date_range("20190111", periods=3),
                          "price":np.random.sample(3)*100})
>>> df2
# output 
        date      price
0 2019-01-11  26.959939
1 2019-01-12  75.922693
2 2019-01-13  93.012894

>>> df1.append(df2, inplace=True)
>>> df1
# output
        date      price
0 2019-01-01  17.884136
1 2019-01-02  57.840625
2 2019-01-03  29.781247
3 2019-01-04  89.968203
4 2019-01-05  19.355847
5 2019-01-06  74.684634
6 2019-01-07  91.678632
7 2019-01-08  93.927549
8 2019-01-09  47.041906
9 2019-01-10  96.810450
0 2019-01-11  26.959939
1 2019-01-12  75.922693
2 2019-01-13  93.012894

请留神:当设置 inplace 参数为 True 时,index_ignore 参数的值不容许设置,只能为 False。

2.2 保留数据到 Orca 磁盘表

Orca 提供两种形式批改磁盘表的数据:

  • save_table函数
  • append函数

2.2.1 保留数据到 Orca 本地磁盘表

Orca 提供 save_table 函数,用于保留数据到磁盘表和分布式表,该函数参数如下:

  • db_path:数据库门路
  • table_name:表名
  • df:须要保留的表
  • ignore_index:是否疏忽 index 追加数据

首先通过 read_table 函数导入上文中创立的磁盘表。

>>> df = orca.read_table(YOUR_DIR + "/testOnDiskDB", "tb1")
>>> df
# output 
  type      value
0    e  42.537911
1    b  44.813589
2    d  28.939636
3    a  73.719393
4    b  66.576416
5    c  36.265364
6    a  43.936593
7    e  56.951759
8    e   4.290316
9    d  29.229366

生成要追加的数据,追加数据到 df,并通过 save_table 保留数据。

>>> df2 = orca.DataFrame({"type": np.random.choice(list("abcde"),3), 
                          "value": np.random.sample(3)*100})
>>> df.append(df2, inplace=True)
>>> df
# output
  type      value
0    e  42.537911
1    b  44.813589
2    d  28.939636
3    a  73.719393
4    b  66.576416
5    c  36.265364
6    a  43.936593
7    e  56.951759
8    e   4.290316
9    d  29.229366
0    d  20.702066
1    c  21.241707
2    a  97.333201
>>> orca.save_table(YOUR_DIR + "/testOnDiskDB", "tb1", df)

须要留神的是,对于磁盘表,若该指定的表名不存在于数据库中,save_table会创立对应的表;若数据库中已有同名的表,save_table会笼罩该表。

2.2.2 保留数据到 Orca 磁盘分区表

磁盘分区表与分布式表的差别就在于分布式表的数据库门路以 ”dfs://” 结尾,而磁盘分区表的数据库门路是本地的一个绝对路径。

  • 通过 save_table 函数将数据保留到磁盘分区表

间接调用 save_table 函数,能够将一个内存表以分区的模式保留到磁盘上,与磁盘非分区表相似,若表已存在,会笼罩该表。

>>> df2 = orca.DataFrame({"type": np.random.choice(list("abcde"),3), 
                          "value": np.random.sample(3)*100})
>>> orca.save_table(YOUR_DIR + "/testOnDisPartitionedkDB", "tb1", df2)
>>> df = orca.read_table(YOUR_DIR + "/testOnDisPartitionedkDB", "tb1")
>>> df
# output
  type      value
0    d  86.549417
1    e  61.852710
2    d  28.747059
  • 通过 append 函数追加数据到磁盘分区表

对于磁盘分区表,调用 append 函数能够向磁盘分区表追加数据。

首先,在 DolphinDB 中创立磁盘分区表:

dbPath=YOUR_DIR + "/testOnDisPartitionedkDB"
login('admin', '123456')
if(existsDatabase(dbPath))
   dropDatabase(dbPath)
db=database(dbPath, VALUE, `a`b`c`d`e)

在 Python 客户端中导入第 1 节例子中的 csv 文件

>>> df = orca.read_csv(path=YOUR_DIR + "/demo.csv", dtype={"type": "SYMBOL", "value": np.float64}, db_handle=YOUR_DIR + "/testOnDisPartitionedkDB", table_name="tb1", partition_columns="type")
>>> df.to_pandas()
# output
type      value
0    a  72.641846
1    c  93.697510
2    c  64.533273
3    c  46.758312
4    c  89.597063
5    d   0.262836
6    d  25.223624
7    e  11.699053
8    e  30.315109
9    e  60.980473

调用 append 函数向 df 表追加数据,从新加载该磁盘分区表,发现数据曾经追加:

>>> df2 = orca.DataFrame({"type": np.random.choice(list("abcde"),3), 
                          "value": np.random.sample(3)*100})
>>> df.append(df2,inplace=True)
>>> df = orca.read_table(YOUR_DIR + "/testOnDisPartitionedkDB", "tb1")
>>> df.to_pandas()
# output
   type      value
0     a  72.641846
1     c  93.697510
2     c  64.533273
3     c  46.758312
4     c  89.597063
5     c  29.233253
6     c  38.753028
7     d   0.262836
8     d  25.223624
9     d  55.085909
10    e  11.699053
11    e  30.315109
12    e  60.980473

将上述过程整合成 Python 端的可执行脚本如下:

>>> YOUR_DIR = "/dolphindb/database" # e.g. data_dir
>>> s = orca.default_session()
>>> create_database = """dbPath='{dbPath}'login('admin','123456')
if(existsDatabase(dbPath))
   dropDatabase(dbPath)
db=database(dbPath, VALUE, `a`b`c`d`e)
""".format(dbPath=YOUR_DIR +"/testOnDisPartitionedkDB")
>>> s.run(create_database)
>>> df = orca.read_csv(path=YOUR_DIR + "/demo.csv", dtype={"type": "SYMBOL", "value": np.float64}, db_handle=YOUR_DIR + "/testOnDisPartitionedkDB", table_name="tb1", partition_columns="type")
>>> df2 = orca.DataFrame({"type": np.random.choice(list("abcde"),3), 
                          "value": np.random.sample(3)*100})
>>> df.append(df2,inplace=True)
>>> df = orca.read_table(YOUR_DIR + "/testOnDisPartitionedkDB", "tb1")

2.3 保留数据到 Orca 分布式表

  • 通过 append 函数追加数据到分布式表

对于分布式表,能够间接通过 append 函数追加数据。

首先,在 DolphinDB 中创立分布式表:

dbPath="dfs://demoDB"
login('admin', '123456')
if(existsDatabase(dbPath))
   dropDatabase(dbPath)
db=database(dbPath, VALUE, `a`b`c`d`e)

在 Python 客户端中导入第 1 节例子中的 csv 文件:

>>> df = orca.read_csv(path=YOUR_DIR + "/demo.csv", dtype={"type": "SYMBOL", "value": np.float64}, db_handle="dfs://demoDB", table_name="tb1", partition_columns="type")
>>> df.to_pandas()
# output
type      value
0    a  72.641846
1    c  93.697510
2    c  64.533273
3    c  46.758312
4    c  89.597063
5    d   0.262836
6    d  25.223624
7    e  11.699053
8    e  30.315109
9    e  60.980473

调用 append 函数向 df 表追加数据,从新加载该分布式表,发现数据曾经追加:

>>> df2 = orca.DataFrame({"type": np.random.choice(list("abcde"),3), 
                          "value": np.random.sample(3)*100})
>>> df.append(df2,inplace=True)
>>> df = orca.read_table("dfs://demoDB", "tb1")
>>> df.to_pandas()
# output
   type      value
0     a  72.641846
1     a  55.429765
2     a  51.230669
3     c  93.697510
4     c  64.533273
5     c  46.758312
6     c  89.597063
7     c  71.821263
8     d   0.262836
9     d  25.223624
10    e  11.699053
11    e  30.315109
12    e  60.980473

将上述过程整合成 Python 端的可执行脚本如下:

>>> YOUR_DIR = "/dolphindb/database" # e.g. data_dir
>>> s = orca.default_session()
>>> create_database = """dbPath='{dbPath}'login('admin','123456')
if(existsDatabase(dbPath))
   dropDatabase(dbPath)
db=database(dbPath, VALUE, `a`b`c`d`e)
""".format(dbPath="dfs://demoDB")
>>> s.run(create_database)
>>> df = orca.read_csv(path=YOUR_DIR + "/demo.csv", dtype={"type": "SYMBOL", "value": np.float64}, db_handle="dfs://demoDB", table_name="tb1", partition_columns="type")
>>> df2 = orca.DataFrame({"type": np.random.choice(list("abcde"),3), 
                          "value": np.random.sample(3)*100})
>>> df.append(df2,inplace=True)
>>> df = orca.read_table("dfs://demoDB", "tb1")
  • 通过 save_table 函数追加数据到分布式表

与磁盘表不同的是,对分布式表调用 save_table 函数,能够间接追加数据,而不是笼罩数据。且与 append 函数相比,save_table函数无需先在客户端通过 read_table 取得将要追加的表信息,就间接在 DolphinDB 服务端上追加数据的操作。

上面的例子中,通过 save_table 函数间接将内存表的数据追加到指定表:

>>> df2 = orca.DataFrame({"type": np.random.choice(list("abcde"),3), 
                          "value": np.random.sample(3)*100})
>>> orca.save_table("dfs://demoDB", "tb1", df2)
>>> df = orca.read_table("dfs://demoDB", "tb1")
>>> df.to_pandas()
# output
   type      value
0     a  72.641846
1     a  55.429765
2     a  51.230669
3     b  40.724064
4     c  93.697510
5     c  64.533273
6     c  46.758312
7     c  89.597063
8     c  71.821263
9     c  93.533380
10    d   0.262836
11    d  25.223624
12    d  47.238962
13    e  11.699053
14    e  30.315109
15    e  60.980473

3 小结

  1. Orca 的 to_csv 函数在 engine=’dolphindb’ 的默认状态下只反对 sep 和 append 两个参数。
  2. 对于一般磁盘表以外的表,inplce 参数置为 True 时,append 办法将追加数据。
  3. save_table 函数,对于本地磁盘表会笼罩原表;对于 dfs 表,数据会被追加到表中
正文完
 0