7 Star 3 Fork 3

浙江智臾科技有限公司 / api-cplusplus

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
Apache-2.0

本教程主要介绍以下内容:

DolphinDB C++ API

DolphinDB C++ API

DolphinDB C++ API支持以下开发环境:

  • Linux
  • Windows Visual Studio
  • Windows GNU(MinGW)

1. 项目编译

1.1 在Linux环境下编译项目

1.1.1 环境配置

C++ API需要使用g++ 4.8.5及以上版本。

1.1.2 下载bin文件和头文件

从本GitHub项目中下载以下文件:

  • bin (libDolphinDBAPI.so)
  • include (DolphinDB.h Exceptions.h SmartPointer.h SysIO.h Types.h Util.h)

1.1.3 编译main.cpp

在bin和include的同级目录中创建project目录。进入project目录,并创建文件main.cpp:

#include "DolphinDB.h"
#include "Util.h"
#include <iostream>
#include <string>
using namespace dolphindb; 
using namespace std; 

int main(int argc, char *argv[]){

    DBConnection conn;
    bool ret = conn.connect("111.222.3.44", 8503);
    if(!ret){
        cout<<"Failed to connect to the server"<<endl;
        return 0;
    }
    ConstantSP vector = conn.run("`IBM`GOOG`YHOO");
    int size = vector->rows();
    for(int i=0; i<size; ++i)
        cout<<vector->getString(i)<<endl;
    return 0;
}

1.1.4 编译

为了兼容旧的编译器,libDolphinDBAPI.so提供了2个版本,一个版本在编译时使用了-D_GLIBCXX_USE_CXX11_ABI=0的选项,放在bin/linux_x64/ABI0目录下,另一个版本未使用-D_GLIBCXX_USE_CXX11_ABI=0,放在bin/linux_x64/ABI1目录下。下面是使用第一个动态库版本的g++编译命令:

g++ main.cpp -std=c++11 -DLINUX -D_GLIBCXX_USE_CXX11_ABI=0 -DLOGGING_LEVEL_2 -O2 -I../include   -lDolphinDBAPI -lpthread -lssl -L../bin/linux_x64/ABI0  -Wl,-rpath,.:../bin/linux_x64/ABI0 -o main

下面是使用另一个动态库版本的g++编译命令:

g++ main.cpp -std=c++11 -DLINUX -D_GLIBCXX_USE_CXX11_ABI=1 -DLOGGING_LEVEL_2 -O2 -I../include   -lDolphinDBAPI -lpthread -lssl -L../bin/linux_x64/ABI1  -Wl,-rpath,.:../bin/linux_x64/ABI1 -o main

1.1.5 运行

编译成功后,启动DolphinDB,运行main程序并连接到DolphinDB,连接时需要指定IP地址和端口号,如以上程序中的111.222.3.44:8503。

1.2 Windows环境下编译

1.2.1 环境配置

本教程使用了Visual Studio 2017 64位版本。

1.2.2 下载bin文件和头文件

将本GitHub项目下载到本地。

1.2.3 创建Visual Studio项目

创建windows console project,导入include目录下头文件,创建1.1.3节中的main.cpp文件,导入libDolphinDBAPI.lib,并且配置lib目录。注意:

由于VS里默认定义了min/max两个宏,会与头文件中 minmax 函数冲突。为了解决这个问题,在预处理宏定义中需要加入 __NOMINMAX__ 。 api源代码中用宏定义LINUX、WINDOWS等区分不同平台,因此在预处理宏定义中需要加入 WINDOWS

1.2.4 编译和运行

启动编译,将对应的libDolphinDBAPI.dll拷贝到可执行程序的输出目录,即可运行。

Windows gnu开发环境与Linux相似,可以参考上一章的Linux编译。

2. 建立DolphinDB连接

DolphinDB C++ API 提供的最核心的对象是DBConnection。C++应用可以通过它在DolphinDB服务器上执行脚本和函数,并在两者之间双向传递数据。DBConnection类提供如下主要方法:

方法名 详情
connect(host, port, [username, password]) 将会话连接到DolphinDB服务器
login(username,password,enableEncryption) 登陆服务器
run(script) 将脚本在DolphinDB服务器运行
run(functionName,args) 调用DolphinDB服务器上的函数
upload(variableObjectMap) 将本地数据对象上传到DolphinDB服务器
initialize() 初始化连接信息
close() 关闭当前会话

C++ API通过TCP/IP协议连接到DolphinDB。使用 connect 方法创建连接时,需要提供DolphinDB server的IP和端口。

DBConnection conn;
bool ret = conn.connect("127.0.0.1", 8848);

我们创建连接时也可以使用用户名和密码登录,默认的管理员名称为"admin",密码是"123456"。

DBConnection conn; 
bool ret = conn.connect("127.0.0.1", 8848, "admin", "123456"); 

若未使用用户名及密码连接成功,则脚本在Guest权限下运行。后续运行中若需要提升权限,可以使用 conn.login('admin','123456',true) 登录获取权限。

声明connection变量的时候,有两个可选参数: enableSSL(支持SSL), enableAYSN(支持一部分).这两个参数默认值为false.

下面例子是,建立支持SSL而非支持异步的connection,同时服务器端应该添加参数enableHTTPS=true。

DBConnection conn(true,false)

下面建立即不支持SSL,但支持异步的connection。异步情况下,步只能执行DolphinDB脚本和函数, 且不再有返回值,该功能适用于异步写入数据。

DBConnection conn(false,true)

3. 运行DolphinDB脚本

通过 run 方法运行DolphinDB脚本:

ConstantSP v = conn.run(" `IBM` GOOG`YHOO");
cout<<v->getString()<<endl;

输出结果为:

["IBM", "GOOG", "YHOO"]

需要注意的是,脚本的最大长度为65, 535字节。

4. 运行DolphinDB函数

除了运行脚本之外,run命令还可以直接在远程DolphinDB服务器上执行DolphinDB内置或用户自定义函数。若 run 方法只有一个参数,则该参数为脚本;若 run 方法有两个参数,则第一个参数为DolphinDB中的函数名,第二个参数是该函数的参数,为ConstantSP类型的向量。

下面的示例展示C++程序通过 run 调用DolphinDB内置的 add 函数。 add 函数有两个参数 x 和 y。参数的存储位置不同,也会导致调用方式的不同。可能有以下三种情况:

  • 所有参数都在DolphinDB server端

若变量 x 和 y 已经通过C++程序在服务器端生成,

conn.run("x = [1, 3, 5]; y = [2, 4, 6]"); 

那么在C++端要对这两个向量做加法运算,只需直接使用 run 即可。

ConstantSP result = conn.run("add(x,y)");
cout<<result->getString()<<endl;

输出结果为:

[3, 7, 11]

  • 仅有一个参数在DolphinDB server端存在

若变量 x 已经通过C++程序在服务器端生成,

conn.run("x = [1, 3, 5]"); 

而参数 y 要在C++客户端生成,这时就需要使用“部分应用”方式,把参数 x 固化在 add 函数内。具体请参考部分应用文档

vector<ConstantSP> args;
ConstantSP y = Util::createVector(DT_DOUBLE, 3);
double array_y[] = {1.5, 2.5, 7};
y->setDouble(0, 3, array_y);
args.push_back(y);
ConstantSP result = conn.run("add{x,}", args);
cout<<result->getString()<<endl;

输出结果为:

[2.5, 5.5, 12]

  • 两个参数都待由C++客户端赋值
vector<ConstantSP> args; 
ConstantSP x = Util::createVector(DT_DOUBLE, 3); 
double array_x[] = {1.5, 2.5, 7}; 
x->setDouble(0, 3, array_x); 
ConstantSP y = Util::createVector(DT_DOUBLE, 3); 
double array_y[] = {8.5, 7.5, 3}; 
y->setDouble(0, 3, array_y); 
args.push_back(x); 
args.push_back(y); 
ConstantSP result = conn.run("add", args); 
cout<<result->getString()<<endl; 

输出结果为:

[10, 10, 10]

5. 上传数据对象

C++ API提供 upload 方法,将本地对象上传到DolphinDB。

下面的例子在C++定义了一个 createDemoTable 函数,该函数创建了一个本地的表对象。

TableSP createDemoTable(){
    vector<string> colNames = {"name", "date"," price"};
    vector<DATA_TYPE> colTypes = {DT_STRING, DT_DATE, DT_DOUBLE};
    int colNum = 3, rowNum = 10000, indexCapacity=10000;
    ConstantSP table = Util::createTable(colNames, colTypes, rowNum, indexCapacity);
    vector<VectorSP> columnVecs;
    for(int i = 0; i < colNum; ++i)
        columnVecs.push_back(table->getColumn(i));

    for(unsigned int i = 0  i < rowNum; ++i){
        columnVecs[0]->set(i, Util::createString("name_"+std::to_string(i)));
        columnVecs[1]->set(i, Util::createDate(2010, 1, i+1));
        columnVecs[2]->set(i, Util::createDouble((rand()%100)/3.0));
    }
    return table;
}

需要注意的是,上述例子中采用的 set 方法作为一个虚函数,会产生较大的开销,调用 set 方法对表的列向量逐个赋值,在数据量很大的情况下会导致效率低下。此外, createString , createDate , createDouble 等构造方法要求操作系统分配内存,反复调用同样会产生很大的开销。

相对合理的做法是定义一个相应类型的数组,通过诸如 setInt(INDEX start, int len, const int* buf) 的方式一次或者多次地将数据批量传给列向量。

当表对象的数据量较小时,可以采用上述例子中的方式生成 TableSP 对象的数据,但是当数据量较多时,建议采用如下方式来生成数据。

TableSP createDemoTable(){

    vector<string> colNames = {"name", "date", "price"};
    vector<DATA_TYPE> colTypes = {DT_STRING, DT_DATE, DT_DOUBLE};
    int colNum = 3, rowNum = 10000, indexCapacity=10000;
    ConstantSP table = Util::createTable(colNames, colTypes, rowNum, indexCapacity);
    vector<VectorSP> columnVecs;
    for(int i = 0; i < colNum; ++i)
        columnVecs.push_back(table->getColumn(i));

    int array_dt_buf[Util::BUF_SIZE]; //定义date列缓冲区数组
    double array_db_buf[Util::BUF_SIZE]; //定义price列缓冲区数组

    int start = 0;
    int no=0;
    while (start < rowNum) {
        size_t len = std::min(Util::BUF_SIZE, rowNum - start);
        int *dtp = columnVecs[1]->getIntBuffer(start, len, array_dt_buf); //dtp指向每次通过 `getIntBuffer` 得到的缓冲区的头部
        double *dbp = columnVecs[2]->getDoubleBuffer(start, len, array_db_buf); //dbp指向每次通过 `getDoubleBuffer` 得到的缓冲区的头部
        for (int i = 0; i < len; ++i) {
            columnVecs[0]->setString(i+start, "name_"+std::to_string(++no)); //对string类型的name列直接进行赋值,不采用getbuffer的方式
            dtp[i] = 17898+i;
            dbp[i] = (rand()%100)/3.0;
        }
        columnVecs[1]->setInt(start, len, dtp); //写完后使用 `setInt` 将缓冲区写回数组
        columnVecs[2]->setDouble(start, len, dbp); //写完后使用 `setDouble` 将缓冲区写回数组
        start += len;
    }
    return table;
}

上述例子采用的诸如 getIntBuffer 等方法能够直接获取一个可读写的缓冲区,写完后使用 setInt 将缓冲区写回数组,这类函数会检查给定的缓冲区地址和变量底层储存的地址是否一致,如果一致就不会发生数据拷贝。在多数情况下,用 getIntBuffer 获得的缓冲区就是变量实际的存储区域,这样能减少数据拷贝,提高性能。

以下利用自定义的 createDemoTable 函数创建表对象之后,通过 upload 方法把它上传到DolphinDB,再从DolphinDB获取这个表的数据,保存到本地对象result并打印。

TableSP table = createDemoTable();
conn.upload("myTable", table);
string script = "select * from myTable;";
ConstantSP result = conn.run(script);
cout<<result->getString()<<endl;

输出结果为:

name    date       price
------- ---------- ---------
name_1  2019.01.02 27.666667
name_2  2019.01.03 28.666667
name_3  2019.01.04 25.666667
name_4  2019.01.05 5
name_5  2019.01.06 31
...

6. 读取数据示例

DolphinDB C++ API 不仅支持Int, Float, String, Date, DataTime等多种数据类型,也支持向量(VectorSP), 集合(SetSP), 矩阵(MatrixSP), 字典(DictionarySP), 表(TableSP)等多种数据形式。下面介绍如何通过DBConnection对象,读取并操作DolphinDB的各种形式的对象。

首先加上必要的头文件:

#include "DolphinDB.h"
#include "Util.h"

6.1 向量

创建INT类型的向量:

VectorSP v = conn.run("1..10");
int size = v->size();
for(int i = 0; i < size; ++i)
    cout<<v->getInt(i)<<endl;

创建DATE类型的向量:

VectorSP v = conn.run("2010.10.01..2010.10.30"); 
int size = v->size(); 
for(int i = 0; i < size; ++i)
    cout<<v->getString(i)<<endl;

6.2 集合

创建一个集合:

SetSP set = conn.run("set(4 5 5 2 3 11 6)");
cout<<set->getString()<<endl;

6.3 矩阵

创建一个矩阵:

ConstantSP matrix = conn.run("1..6$2:3"); 
cout<<matrix->getString()<<endl; 

6.4 字典

创建一个字典:

DictionarySP dict = conn.run("dict(1 2 3, `IBM` MSFT`GOOG)");
cout << dict->get(Util::createInt(1))->getString()<<endl;

上例通过 Util::createInt 创建Int类型的值,并使用 get 方法来获得key为1对应的值。

6.5 表

在C++客户端中执行以下脚本创建一个表:

string sb; 
sb.append("n=200\n"); 
sb.append("syms= `IBM`C`MS`MSFT`JPM`ORCL`BIDU`SOHU`GE`EBAY`GOOG`FORD`GS`PEP`USO`GLD`GDX`EEM`FXI`SLV`SINA`BAC`AAPL`PALL`YHOO`KOH`TSLA`CS`CISO`SUN\n"); 
sb.append("mytrades=table(09:30:00+rand(18000, n) as timestamp, rand(syms, n) as sym, 10*(1+rand(100, n)) as qty, 5.0+rand(100.0, n) as price); \n"); 
sb.append("select * from mytrades"); 
TableSP table = conn.run(sb); 

6.5.1 getString()方法获取表的内容

cout<<table->getString()<<endl; 

6.5.2 getColumn()方法按列获取表的内容

下面的脚本中,首先定义一个VectorSP类型的动态数组columnVecs,用于存放从表中获取的列,然后依次访问columnVecs处理数据。

对于表的各列,我们可以通过getString()方法获得每一列的字符串类型数组,再通过C++的数据类型转换函数将数值类型的数据转换成对应的数据类型,从而进行计算。对于时间类型的数据,则需要以字符串的形式存储。

vector<VectorSP> columnVecs;
int qty[200],sum[200];
double price[200];
for(int i=0; i<200;++i){
    qty[i]=atoi(columnVecs[2]->getString(i).c_str());
    price[i]=atof(columnVecs[3]->getString(i).c_str());
    sum[i]=qty[i]*price[i];
}

for(int i = 0; i < 200; ++i){
    cout<<columnVecs[0]->getString(i)<<", "<<columnVecs[1]->getString(i)<<", "<<sum[i]<<endl;
}

6.5.3 getRow()方法按照行获取表的内容

例如,打印table的第一行,返回的结果是一个字典。

cout<<table->getRow(0)->getString()<<endl; 

// output
price->37.811678
qty->410
sym->IBM
timestamp->13:45:15

若要先按行获取table的内容,再对其中的数据进行操作,则需要调用getMember()方法来获取对应列的数据。其中,getMember()函数的参数不是C++内置的string类型对象,而是DolphinDB C++ API的string类型Constant对象。

cout<<table->getRow(0)->getMember(Util::createString("price"))->getDouble()<<endl;

// output
37.811678

需要注意的是,按行访问table并逐一进行计算实际上非常低效,为了达到更好的性能,建议参考6.5.2小节的方式按列访问table并批量计算。

6.5.4 使用BlockReaderSP对象分段读取表数据

对于大数据量的表,API提供了分段读取方法。(此方法仅适用于DolphinDB 1.20.5, 1.10.16及其以上版本)

在C++客户端中执行以下脚本创建一个大数据量的表:

string script; 
script.append("n=20000\n"); 
script.append("syms= `IBM`C`MS`MSFT`JPM`ORCL`BIDU`SOHU`GE`EBAY`GOOG`FORD`GS`PEP`USO`GLD`GDX`EEM`FXI`SLV`SINA`BAC`AAPL`PALL`YHOO`KOH`TSLA`CS`CISO`SUN\n"); 
script.append("mytrades=table(09:30:00+rand(18000, n) as timestamp, rand(syms, n) as sym, 10*(1+rand(100, n)) as qty, 5.0+rand(100.0, n) as price); \n"); 
conn.run(script); 

分段读取数据并用getString()方法获取表的内容, 需要注意的是fetchSize必须不小于8192。

string sb = "select * from mytrades";
int fetchSize = 8192;
BlockReaderSP reader = conn.run(sb,4,2,fetchSize);//priority=4, parallelism=2
ConstantSP table;
int total = 0;
while(reader->hasNext()){
    table=reader->read();
    total += table->size();
    cout<< "read" <<table->size()<<endl;
    cout<<table->getString()<<endl; 
}

6.6 AnyVector

AnyVector是DolphinDB中一种特殊的数据形式,与常规的向量不同,它的每个元素可以是不同的数据类型或数据形式。

ConstantSP result = conn.run("{1, 2, {1,3,5}, {0.9, 0.8}}");
cout<<result->getString()<<endl;

使用 get 方法获取第三个元素:

VectorSP v = result->get(2); 
cout<<v->getString()<<endl; 

结果是一个Int类型的向量[1,3,5]。

7. 保存数据到DolphinDB数据表

DolphinDB数据表按存储方式分为三种:

  • 内存表: 数据仅保存在内存中,存取速度最快,但是节点关闭后数据就不存在了。
  • 本地磁盘表:数据保存在本地磁盘上。可以从磁盘加载到内存。
  • 分布式表:数据分布在不同的节点,通过DolphinDB的分布式计算引擎,仍然可以像本地表一样做统一查询。

7.1 保存数据到DolphinDB内存表

DolphinDB提供多种方式来保存数据到内存表:

下面分别介绍三种方式保存数据的实例,在例子中使用到的数据表有3个列,分别是STRING, DATE, DOUBLE类型,列名分别为name, date和price。 在DolphinDB中执行以下脚本创建内存表:

t = table(100:0, `name` date`price, [STRING, DATE, DOUBLE]); 
share t as tglobal; 

上面的例子中,我们通过table函数来创建表,指定了表的容量和初始大小、列名和数据类型。由于内存表是会话隔离的,所以普通内存表只有当前会话可见。为了让多个客户端可以同时访问t,我们使用share在会话间共享内存表。

7.1.1 使用insert into语句保存数据

我们可以采用如下方式保存单条数据。

char script[100];
sprintf(script, "insert into tglobal values(%s, date(timestamp(%ld)), %lf)", "`a", 1546300800000, 1.5);
conn.run(script);

也可以使用insert into 语句保存多条数据,实现如下:

string script; 
int rowNum=10000, indexCapacity=10000; 
VectorSP names = Util::createVector(DT_STRING, rowNum, indexCapacity); 
VectorSP dates = Util::createVector(DT_DATE, rowNum, indexCapacity); 
VectorSP prices = Util::createVector(DT_DOUBLE, rowNum, indexCapacity); 

int array_dt_buf[Util:: BUF_SIZE]; //定义date列缓冲区数组
double array_db_buf[Util:: BUF_SIZE]; //定义price列缓冲区数组

int start = 0; 
int no=0; 
while (start < rowNum) {

    size_t len = std::min(Util::BUF_SIZE, rowNum - start);
    int *dtp = dates->getIntBuffer(start, len, array_dt_buf); //dtp指向每次通过 `getIntBuffer` 得到的缓冲区的头部
    double *dbp = prices->getDoubleBuffer(start, len, array_db_buf); //dbp指向每次通过 `getDoubleBuffer` 得到的缓冲区的头部
    for (int i = 0; i < len; i++) {
        names->setString(i+start, "name_"+std::to_string(++no)); //对string类型的name列直接进行赋值,不采用getbuffer的方式
        dtp[i] = 17898+i;
        dbp[i] = (rand()%100)/3.0;
    }
    dates->setInt(start, len, dtp); //写完后使用 `setInt` 将缓冲区写回数组
    prices->setDouble(start, len, dbp); //写完后使用 `setDouble` 将缓冲区写回数组
    start += len;

}
vector<string> allnames = {"names", "dates", "prices"}; 
vector<ConstantSP> allcols = {names, dates, prices}; 
conn.upload(allnames, allcols); 

script += "insert into tglobal values(names, dates, prices); tglobal"; 
TableSP table = conn.run(script); 

7.1.2 使用tableInsert函数批量保存多条数据

在这个例子中,我们利用索引指定TableSP对象的多行数据,将它们批量保存到DolphinDB server上。

vector<ConstantSP> args;
TableSP table = createDemoTable();
VectorSP range = Util::createPair(DT_INDEX);
range->setIndex(0, 0);
range->setIndex(1, 10);
cout<<range->getString()<<endl;
args.push_back(table->get(range));
conn.run("tableInsert{tglobal}", args);

7.1.3 使用tableInsert函数保存TableSP对象

vector<ConstantSP> args; 
TableSP table = createDemoTable(); 
args.push_back(table); 
conn.run("tableInsert{tglobal}", args); 

把数据保存到内存表,还可以使用append!函数,它可以把一张表追加到另一张表。但是,一般不建议通过append!函数保存数据,因为append!函数会返回一个表的schema,增加通信量。

vector<ConstantSP> args;
TableSP table = createDemoTable();
args.push_back(table);
conn.run("append!(tglobal);", args);

7.2 保存数据到本地磁盘表

本地磁盘表通用用于静态数据集的计算分析,既可以用于数据的输入,也可以作为计算的输出。它不支持事务,也不持支并发读写。

在DolphinDB中使用以下脚本创建一个本地磁盘表,使用database函数创建数据库,调用saveTable命令将内存表保存到磁盘中:

t = table(100:0, `name` date`price, [STRING,DATE,DOUBLE]);
db=database("/home/dolphindb/demoDB");
saveTable(db, t, `dt);
share t as tDiskGlobal;

使用tableInsert函数是向本地磁盘表追加数据最为常用的方式。这个例子中,我们使用tableInsert向共享的内存表tDiskGlobal中插入数据,接着调用saveTable把插入的数据保存到磁盘上。

TableSP table = createDemoTable(); 
vector<ConstantSP> args; 
args.push_back(table); 
conn.run("tableInsert{tDiskGlobal}", args); 
conn.run("saveTable(db, tDiskGlobal, `dt); "); 

本地磁盘表支持使用append!函数把数据追加到表中:

TableSP table = createDemoTable();
conn.upload("mt", table);
string script;
script += "db=database(\"/home/demoTable1\");";
script += "tDiskGlobal.append!(mt);";
script += "saveTable(db,tDiskGlobal,`dt);";
conn.run(script);

注意:

  1. 对于本地磁盘表,append!函数只把数据追加到内存,如果要保存到磁盘上,必须再次执行saveTable函数。
  2. 除了使用share让表在其他会话中可见,也可以在C++ API中使用loadTable来加载磁盘表,使用append!来追加数据。但是,我们不推荐这种方法,因为loadTable函数从磁盘加载数据,会消耗大量时间。如果有多个客户端都使用loadTable ,内存中会有多个表的副本,造成数据不一致。

7.3 保存数据到分布式表

分布式表是DolphinDB推荐在生产环境下使用的数据存储方式,它支持快照级别的事务隔离,保证数据一致性。分布式表支持多副本机制,既提供了数据容错能力,又能作为数据访问的负载均衡。下面的例子通过C++ API把数据保存至分布式表。

7.3.1 使用tableInsert函数保存TableSP对象

在DolphinDB中使用以下脚本创建分布式表。database函数用于创建数据库,对于分布式数据库,路径必须以 dfs 开头。createPartitionedTable函数用于创建分区表。

login( `admin, ` 123456)
dbPath = "dfs://SAMPLE_TRDDB";
tableName = `demoTable
db = database(dbPath, VALUE, 2010.01.01..2010.01.30)
pt=db.createPartitionedTable(table(1000000:0, `name` date `price, [STRING,DATE,DOUBLE]), tableName, ` date)

使用loadTable方法加载分布式表,通过tableInsert方式追加数据:

TableSP table = createDemoTable(); 
vector<ConstantSP> args; 
args.push_back(table); 
conn.run("tableInsert{loadTable('dfs://SAMPLE_TRDDB', `demoTable)}", args); 

append!函数也能向分布式表追加数据,但是性能与tableInsert相比要差,建议不要轻易使用:

TableSP table = createDemoTable();
conn.upload("mt", table);
conn.run("loadTable('dfs://SAMPLE_TRDDB', `demoTable).append!(mt);");
conn.run(script);

7.3.2 分布式表的并发写入

DolphinDB的分布式表支持并发读写,下面展示如何在C++客户端中将数据并发写入DolphinDB的分布式表。

首先,在DolphinDB服务端执行以下脚本,创建分布式数据库"dfs://natlog"和分布式表"natlogrecords"。其中,数据库按照VALUE-HASH-HASH的组合进行三级分区。

dbName="dfs://natlog"
tableName="natlogrecords"
db1 = database("", VALUE, datehour(2019.09.11T00:00:00)..datehour(2019.12.30T00:00:00) )//starttime,  newValuePartitionPolicy=add
db2 = database("", HASH, [IPADDR, 50]) //source_address 
db3 = database("", HASH,  [IPADDR, 50]) //destination_address
db = database(dbName, COMPO, [db1,db2,db3])
data = table(1:0, ["fwname","filename","source_address","source_port","destination_address","destination_port","nat_source_address","nat_source_port","starttime","stoptime","elapsed_time"], [SYMBOL,STRING,IPADDR,INT,IPADDR,INT,IPADDR,INT,DATETIME,DATETIME,INT])
db.createPartitionedTable(data,tableName,`starttime`source_address`destination_address)

请注意:DolphinDB不允许多个writer同时将数据写入到同一个分区,因此在客户端多线程并行写入数据时,需要确保每个线程分别写入不同的分区。

对于按哈希值进行分区的分布式表, DolphinDB C++ API 提供了getHash函数来数据的hash值。在客户端设计多线程并发写入分布式表时,根据哈希分区字段数据的哈希值分组,每组指定一个写线程。这样就能保证每个线程同时将数据写到不同的哈希分区。

ConstantSP spIP = Util::createConstant(DT_IP);
int key = spIP->getHash(BUCKETS);

开启生产数据和消费数据的线程,下面的genData用于生成模拟数据,writeData用于写数据。

for (int i = 0; i < tLong; ++i) {
    arg[i].index = i;
    arg[i].count = tLong;
    arg[i].nLong = nLong;
    arg[i].cLong = cLong;
    arg[i].nTime = 0;
    arg[i].nStarttime = sLong;
    genThreads[i] = std::thread(genData, (void *)&arg[i]);
    writeThreads[i] = std::thread(writeData, (void *)&arg[i]);
}

每个生产线程首先生成数据,其中createDemoTable函数用于产生模拟数据,并返回一个TableSP对象。

void *genData(void *arg) {
  struct parameter *pParam;
  pParam = (struct parameter *)arg;
  long partitionCount = BUCKETS / pParam->count;

  for (unsigned int i = 0; i < pParam->nLong; i++) {
    TableSP table =
        createDemoTable(pParam->cLong, partitionCount * pParam->index, partitionCount, pParam->nStarttime, i * 5);
    tableQueue[pParam->index]->push(table);
  }
  return NULL;
}

每个消费线程开始向DolphinDB并行写入数据。

void *writeData(void *arg) {
    struct parameter *pParam;
    pParam = (struct parameter *)arg;

    TableSP table;
    for (unsigned int i = 0; i < pParam->nLong; i++) {
        tableQueue[pParam->index]->pop(table);
        long long startTime = Util::getEpochTime();
        vector<ConstantSP> args;
        args.push_back(table);
        conn[pParam->index].run("tableInsert{loadTable('dfs://natlog', `natlogrecords)}", args);
        pParam->nTime += Util::getEpochTime() - startTime;
    }
    printf("Thread %d,insert %ld rows %ld times, used %ld ms.\n", pParam->index, pParam->cLong, pParam->nLong, pParam->nTime);
    return NULL;
}

更多分布式表的并发写入案例可以参考样例MultiThreadDFSWriting.cpp

8. 注意事项

  1. DBConnection类的所有函数都不是线程安全的,不可以并行调用,否则可能会导致程序崩溃。

关于C++ API的更多信息,可以参考C++ API 头文件dolphindb.h

C++ Streaming API

C++ API处理流数据的方式有三种:ThreadedClient, ThreadPooledClient 和 PollingClient。

三种实现方式可以参考test/StreamingThreadedClientTester.cpp, test/StreamingThreadPooledClientTester.cpptest/StreamingPollingClientTester.cpp

9 编译

9.1 Linux 64位

9.1.1 通过cmake

安装cmake:

sudo apt-get install cmake

编译:

mkdir build && cd build
cmake -DCMAKE_BUILD_TYPE=Release ../path_to_api-cplusplus/
make -j `nproc` 

编译成功后,会生成三个可执行文件。

10 在Windows中使用MinGW编译

安装MinGWcmake:

mkdir build && cd build
cmake -DCMAKE_BUILD_TYPE=Release `path_to_api-cplusplus` -G "MinGW Makefiles"
mingw32-make -j `nproc` 

编译成功后,会生成三个可执行文件。

注意:

  1. 编译前,需要把libDolphinDBAPI.dll复制到编译目录。
  2. 执行例子前,需要把libDolphinDBAPI.dll和libgcc_s_seh-1.dll复制到可执行文件的相同目录下。

11. API

11.1 ThreadedClient

ThreadedClient 产生一个线程。每次新数据从流数据表发布时,该线程去获取和处理数据。

11.1.1 定义线程客户端

ThreadedClient::ThreadClient(int listeningPort);
  • listeningPort 是单线程客户端的订阅端口号。

11.1.2 调用订阅函数

ThreadSP ThreadedClient::subscribe(string host, int port, MessageHandler handler, string tableName, string actionName = DEFAULT_ACTION_NAME, int64_t offset = -1, bool resub = true, VectorSP filter = nullptr);
  • host是发布端节点的主机名。

  • port是发布端节点的端口号。

  • handler是用户自定义的回调函数,用于处理每次流入的消息。函数的参数是流入的消息,每条消息就是六数据表的一行。函数的结果必须是void。

  • tableName是字符串,表示发布端上共享流数据表的名称。

  • actionName是字符串,表示订阅任务的名称。它可以包含字母、数字和下划线。

  • offset是整数,表示订阅任务开始后的第一条消息所在的位置。消息是流数据表中的行。如果没有指定offset,或它为负数或超过了流数据表的记录行数,订阅将会从流数据表的当前行开始。offset与流数据表创建时的第一行对应。如果某些行因为内存限制被删除,在决定订阅开始的位置时,这些行仍然考虑在内。

  • resub是布尔值,表示订阅中断后,是否会自动重订阅。

  • filter是一个向量,表示过滤条件。流数据表过滤列在filter中的数据才会发布到订阅端,不在filter中的数据不会发布。

ThreadSP 指向循环调用handler的线程的指针。该线程在此topic被取消订阅后会退出。

示例:

auto t = client.subscribe(host, port, [](Message msg) {
    // user-defined routine
    }, tableName);
t->join();

11.1.3 取消订阅

void ThreadClient::unsubscribe(string host, int port, string tableName, string actionName = DEFAULT_ACTION_NAME);
  • host 是发布端节点的主机名。

  • port 是发布端节点的端口号。

  • tableName 是字符串,表示发布端上共享流数据表的名称。

  • actionName 是字符串,表示订阅任务的名称。它可以包含字母、数字和下划线。

该函数用于停止向发布者订阅数据。

11.2 ThreadPooledClient

ThreadPooledClient 产生用户指定数量的多个线程。每次新数据从流数据表发布时,这些线程同时去获取和处理数据。当数据到达速度超过单个线程所能处理的限度时,ThreadPooledClient 比 ThreadedClient 有优势。

11.2.1 定义多线程客户端

ThreadPooledClient::ThreadPooledClient(int listeningPort, int threadCount);
  • listeningPort 是多线程客户端节点的订阅端口号。

  • threadCount 是线程池的大小。

11.2.2 调用订阅函数

vector<ThreadSP> ThreadPooledClient::subscribe(string host, int port, MessageHandler handler, string tableName, string actionName = DEFAULT_ACTION_NAME, int64_t offset = -1, bool resub = true, VectorSP filter = nullptr);

参数参见2.1.2节。

返回一个指针向量,每个指针指向循环调用handler的线程。这些线程在此topic被取消订阅后会退出。

示例:

auto vec = client.subscribe(host, port, [](Message msg) {
    // user-defined routine
    }, tableName);
for(auto& t : vec) {
    t->join();
}

11.2.3 取消订阅

void ThreadPooledClient::unsubscribe(string host, int port, string tableName, string actionName = DEFAULT_ACTION_NAME);

参数参见2.1.3节。

11.3 PollingClient

订阅数据时,会返回一个消息队列。用户可以从其中获取和处理数据。

11.3.1 定义客户端

PollingClient::PollingClient(int listeningPort);
  • listeningPort 是客户端节点的订阅端口号。

11.3.2 订阅

MessageQueueSP PollingClient::subscribe(string host, int port, string tableName, string actionName = DEFAULT_ACTION_NAME, int64_t offset = -1);

参数参见2.1.2节。

该函数返回指向消息队列的指针。

示例:

auto queue = client.subscribe(host, port, handler, tableName);
Message msg;
while(true) {
    if(queue->poll(msg, 1000)) {
        if(msg.isNull()) break;
        // handle msg
    }
}

11.3.3 取消订阅

void PollingClient::unsubscribe(string host, int port, string tableName, string actionName = DEFAULT_ACTION_NAME);

参数参见2.1.3节。

注意,对于这种订阅模式,取消订阅时,会返回一个空指针,用户需要自行处理。

Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. Copyright [yyyy] [name of copyright owner] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

简介

暂无描述 展开 收起
C++ 等 6 种语言
Apache-2.0
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
1
https://gitee.com/dolphindb/api-cplusplus.git
git@gitee.com:dolphindb/api-cplusplus.git
dolphindb
api-cplusplus
api-cplusplus
main

搜索帮助