txt文件上传网站源码分享()

大家好,关于txt文件上传网站源码分享很多朋友都还不太明白,不过没关系,因为今天小编就来为大家分享关于的知识点,相信应该可以解决大家的一些困惑和问题,如果碰巧可以解决您的问题,还望关注下本站哦,希望对各位有所帮助!

一个量化策略在生产(交易)环境中运行时,实时数据的处理通常是由事件驱动的。为确保研发和生产使用同一套代码,通常在研发阶段需将历史数据,严格按照事件发生的时间顺序进行回放,以此模拟交易环境。在DolphinDB中,用户通过replay函数可以实现对静态数据的回放,即将历史数据按照时间顺序以“实时数据”的方式注入流数据表中。对相同时间戳的数据还可以指定额外排序列,使数据回放顺序更接近实时交易场景。

在历史数据回放、股票行情回放两篇教程中已经介绍了DolphinDB的回放功能,本教程更加侧重于回放功能的工程化实践本教程将介绍如何基于DolphinDB分布式数据库、回放功能以及DolphinDBAPI搭建一个行情数据回放服务,该服务支持多个用户同时通过C++、Python等客户端提交数据回放请求

1.基于DolphinDB的行情回放服务

本教程实现的行情回放服务基于3类国内A股行情数据源:逐笔委托数据、逐笔成交数据、Level2快照数据,支持以下功能与特性:

C++、Python客户端提交回放请求(指定回放股票列表、回放日期、回放速率、回放数据源)多个用户同时回放多个数据源同时有序回放在时间有序的基础上支持排序列有序(如:针对逐笔数据中的交易所原始消息记录号排序)发布回放结束信号对回放结果订阅消费

1.1行情回放服务架构

本教程示例DolphinDB搭建的行情回放服务架构如下图所示:

行情回放服务架构

行情数据接入:实时行情数据和历史行情数据可以通过DolphinDBAPI或插件存储到DolphinDB分布式时序数据库中。函数模块封装:数据查询和回放过程可以通过DolphinDB函数视图封装内置,仅暴露股票列表、回放日期、回放速率、回放数据源等关键参数给行情服务用户。行情用户请求:需要进行行情回放的用户可以通过DolphinDB客户端软件(如DolphinDBGUI工具、DolphinDBVSCode插件、DolphinDBAPI等)调用封装好的回放函数对存储在数据库中的行情数据进行回放,同时,用户还可以在客户端对回放结果进行实时订阅消费。此外,支持多用户并发回放。

1.2回放服务搭建步骤

本教程示例DolphinDB搭建行情回放服务的具体操作步骤如下图所示:

搭建步骤

Step1:服务提供者设计合理分区的数据库表,在DolphinDB集成开发环境中执行对应的建库建表、数据导入等脚本,以将历史行情数据存储到DolphinDB分布式数据库中作为回放服务的数据源。在第二章将给出本教程涉及的3类行情数据的分区存储方案及建库建表脚本。Step2:服务提供者在DolphinDB集成开发环境中将回放过程中的操作封装成函数视图,通过封装使得行情服务用户不需要关心DolphinDB回放功能的细节,只需要指定简单的回放参数(股票、日期、回放速率、数据源)即可提交回放请求。在第三章将给全部函数视图的定义脚本。Step3:服务提供者在外部程序中通过DolphinDBAPI调用上述函数视图实现提交回放的功能。在第四章将给出API端提交回放任务的C++实现和Python实现。此外,在第四章提交回放的基础上,在第五章将介绍对回放结果的API端订阅与消费的代码实现。

在第六章将给出多用户多表回放、多天回放的性能测试结果。最后两章为开发环境配置与总结。

2.行情数据分区存储方案

本教程的回放服务基于3类国内A股行情数据源:逐笔成交数据、逐笔委托数据、快照数据,均使用TSDB存储引擎存储在DolphinDB分布式数据库中。

数据源

代码样例中的分区数据库路径

代码样例中的表名

分区机制

排序列

建库建表脚本

逐笔委托

dfs://Test_order

order

VALUE:交易日,HASH:[SYMBOL,25]

股票ID,交易时间

附录逐笔委托建库建表脚本

逐笔成交

dfs://Test_transaction

transaction

VALUE:交易日,HASH:[SYMBOL,25]

股票ID,交易时间

附录逐笔成交建库建表脚本

快照

dfs://Test_snapshot

snapshot

VALUE:交易日,HASH:[SYMBOL,20]

股票ID,交易时间

附录快照建库建表脚本

回放的原理是从数据库中读取需要的行情数据,并根据时间列排序后写入到相应的流数据表。因此,读数据库并排序的性能对回放速度有很大的影响,合理的分区机制将有助于提高数据加载速度。基于用户通常按照日期和股票提交回放请求的特点,设计了上表的分区方案。

此外,本教程的历史数据存储在三节点双副本的DolphinDB集群中,集群和副本同样可以提升读取性能,同时可以增加系统的可用性,分区的副本通常是存放在不同的物理节点的,所以一旦某个分区不可用,系统依然可以调用其它副本分区来保证回放服务的正常运转。

附录将提供部分原始数据的csv文件(原始行情数据文件)以及对应的示例导入脚本(逐笔委托示例导入脚本、逐笔成交示例导入脚本、快照示例导入脚本),以便读者快速体验搭建本教程所示的行情回放服务。

3.行情回放自定义函数

本章介绍回放过程中的主要函数功能及其实现,最后将函数封装成视图以便通过API等方式调用。

本章的开发工具采用DolphinDBGUI,完整脚本见附录:行情回放函数。

函数名

函数入参

函数功能

stkReplay

stkList:回放股票列表startDate:回放开始日期endDate:回放结束日期replayRate:回放速率replayUuid:回放用户标识replayName:回放数据源名称

行情回放服务主函数

dsTb

timeRS:数据源时间划分startDate:回放开始日期endDate:回放结束日期stkList:回放股票列表replayName:回放数据源名称

构造回放数据源

createEnd

tabName:回放输出表名称sortColumn:相同时间戳时额外排序列名

构造回放结束信号

replayJob

inputDict:回放输入数据源tabName:回放输出表名称dateDict:回放排序时间列timeDict:回放排序时间列replayRate:回放速率sortColumn:相同时间戳时额外排序列名

定义回放任务内容

3.1stkReplay函数:行情回放服务主函数

函数定义代码:

defstkReplay(stkList,mutablestartDate,mutableendDate,replayRate,replayUuid,replayName)\n{\nmaxCnt=50\nreturnBody=dict(STRING,STRING)\nstartDate=datetimeParse(startDate,&34;)\nendDate=datetimeParse(endDate,&34;)+1\nsortColumn=&34;\nif(stkList.size()>maxCnt)\n{\nreturnBody[&34;]=&34;\nreturnBody[&34;]=&34;+string(maxCnt)\nreturnreturnBody\n}\nif(size(replayName)!=0)\n{\nfor(nameinreplayName)\n{\nif(notnamein[&34;,&34;,&34;])\n{\nreturnBody[&34;]=&34;\nreturnBody[&34;]=&34;+name\nreturnreturnBody\n}\n}\n}\nelse\n{\nreturnBody[&34;]=&34;\nreturnBody[&34;]=&34;\nreturnreturnBody\n}\ntry\n{\nif(size(replayName)==1&&replayName[0]==&34;)\n{\ncolName=[&34;,&34;,&34;]\ncolType=[TIMESTAMP,SYMBOL,BLOB]\nsortColumn=&34;\n}\nelse\n{\ncolName=[&34;,&34;,&34;,sortColumn]\ncolType=[TIMESTAMP,SYMBOL,BLOB,LONG]\n}\nmsgTmp=streamTable(10000000:0,colName,colType)\ntabName=&34;+replayUuid\nenableTableShareAndPersistence(table=msgTmp,tableName=tabName,asynWrite=true,compress=true,cacheSize=10000000,retentionMinutes=60,flushMode=0,preCache=1000000)\n\ntimeRS=cutPoints(09:30:00.000..15:00:00.000,23)\n\ninputDict=dict(replayName,each(dsTb{timeRS,startDate,endDate,stkList},replayName))\ndateDict=dict(replayName,take(`MDDate,replayName.size()))\ntimeDict=dict(replayName,take(`MDTime,replayName.size()))\n\njobId=&34;+replayUuid\njobDesc=&34;\nsubmitJob(jobId,jobDesc,replayJob{inputDict,tabName,dateDict,timeDict,replayRate,sortColumn})\nreturnBody[&34;]=&34;\nreturnBody[&34;]=&34;\nreturnreturnBody\n}\ncatch(ex)\n{\nreturnBody[&34;]=&34;\nreturnBody[&34;]=&34;+ex\nreturnreturnBody\n}\n}

函数功能:

自定义函数stkReplay是整个回放的主体函数,用户传入的参数在stkReplay里会进行有效性判断及格式处理,可以根据实际需求更改。

首先,用maxCnt来控制用户一次回放股票数量的最大上限,本例中设置的是50。returnBody构造了信息字典,返回给用户以提示执行错误或执行成功。回放开始日期startDate和回放结束日期endDate利用datetimeParse函数进行格式处理。replayRate是回放速率,replayUuid是回放表名名称,replayName是回放数据源列表,sortColumn是数据源同回放时间戳排序列列名。

当输入参数无误后,便初始化回放结果流表,结果流表为异构流数据表,字段类型为BLOB的字段包含了一条原始记录的全部信息,同时结果流表为持久化流表,enableTableShareAndPersistence函数把流数据表共享并把它持久化到磁盘上,使用持久化流表可以避免内存占用过大。当回放数据源包含逐笔成交(transaction)或逐笔委托(order)时,本例实现了对相同时间戳的逐笔数据按交易所原始消息记录号(ApplSeqNum)进行排序(具体实现见3.3replayJob函数),所以结果流表中必须冗余一列来存放排序列。若回放数据源仅包含快照(snapshot)时,则不需要冗余一列排序列。

定义回放需要的其他参数。inputDict构造了回放数据源列表字典,利用each函数和部分应用可以对多个数据源进行简洁的定义。dateDict和timeDict构造了回放数据源时间戳字典。最后通过submitJob提交后台回放任务。

3.2dsTb函数:构造回放数据源

函数定义代码:

defdsTb(timeRS,startDate,endDate,stkList,replayName)\n{\nif(replayName==&34;){\ntab=loadTable(&34;,&34;)\n\t}\n\telseif(replayName==&34;){\n\t\ttab=loadTable(&34;,&34;)\n\t}\n\telseif(replayName==&34;){\n\t\ttab=loadTable(&34;,&34;)\n\t}\n\telse{\n\t\treturnNULL\n\t}\nds=replayDS(sqlObj=<select*fromtabwhereMDDate>=startDateandMDDate<endDateandHTSCSecurityIDinstkList>,dateColumn=&39;,timeColumn=&39;,timeRepartitionSchema=timeRS)\nreturnds\n}

函数功能:

自定义函数dsTb返回符合用户回放需求的数据源划分结果。主要是对内置replayDS函数进行了进一步的封装,函数首先对用户端输入的replayName进行判断,选择从数据库加载对应的表对象。利用replayDS对交易日期在startDate至endDate之间、股票代号在stkList内的数据按照timeRS进行数据源划分,返回某一个数据源的列表。

timeRS参数对应replayDS函数中的timeRepartitionSchema参数,是时间类型的向量,可用于将数据源划分为更小粒度的多个数据源,以保证查询DFS表中数据的效率以及控制内存大小。本例在3.1stkReplay函数中构造了timeRS变量,其意为对于一天的数据在有效时间段内平均分为23份。

执行如下代码查看dsTb函数返回的结果:

timeRS=cutPoints(09:30:00.000..15:00:00.000,3)\nstartDate=2021.12.01\nendDate=2021.12.02\nstkList=[&39;]\nreplayName=[&34;]\nds=dsTb(timeRS,startDate,endDate,stkList,replayName)

ds为一个向量,其中每一个元素如下,数据源被划分为多个小的SQL查询语句,具体原理参考replayDS函数。

DataSource<select[4]*fromtabwheretime(MDTime)<09:30:00.000,nanotime(MDTime)>=00:00:00.000000000,date(MDDate)==2021.12.01,MDDate>=2021.12.01andMDDate<2021.12.02andSecurityIDin[&34;]orderbyMDDateasc,MDTimeasc>\nDataSource<select[4]*fromtabwheretime(MDTime)<11:20:00.001,time(MDTime)>=09:30:00.000,date(MDDate)==2021.12.01,MDDate>=2021.12.01andMDDate<2021.12.02andSecurityIDin[&34;]orderbyMDDateasc,MDTimeasc>\nDataSource<select[4]*fromtabwheretime(MDTime)<13:10:00.001,time(MDTime)>=11:20:00.001,date(MDDate)==2021.12.01,MDDate>=2021.12.01andMDDate<2021.12.02andSecurityIDin[&34;]orderbyMDDateasc,MDTimeasc>\nDataSource<select[4]*fromtabwheretime(MDTime)<15:00:00.001,time(MDTime)>=13:10:00.001,date(MDDate)==2021.12.01,MDDate>=2021.12.01andMDDate<2021.12.02andSecurityIDin[&34;]orderbyMDDateasc,MDTimeasc>\nDataSource<select[4]*fromtabwherenanotime(MDTime)<=23:59:59.999999999,time(MDTime)>=15:00:00.001,date(MDDate)==2021.12.01,MDDate>=2021.12.01andMDDate<2021.12.02andSecurityIDin[&34;]orderbyMDDateasc,MDTimeasc>

3.3replayJob函数:定义回放任务内容

函数定义代码:

defreplayJob(inputDict,tabName,dateDict,timeDict,replayRate,sortColumn)\n{\nif(sortColumn==&34;)\n{\nreplay(inputTables=inputDict,outputTables=objByName(tabName),dateColumn=dateDict,timeColumn=timeDict,replayRate=int(replayRate),absoluteRate=false,parallelLevel=23)\n}\nelse\n{\nreplay(inputTables=inputDict,outputTables=objByName(tabName),dateColumn=dateDict,timeColumn=timeDict,replayRate=int(replayRate),absoluteRate=false,parallelLevel=23,sortColumns=sortColumn)\n}\ncreateEnd(tabName,sortColumn)\n}

函数功能:

自定义函数replayJob提交用户数据回放并调用createEnd函数,这里首先通过内置replay函数回放用户需求数据,此处回放模式为N对1异构回放。replay函数返回后即表示需要的数据已经全部回放结束,再执行自定义createEnd函数以构造结束信号写入回放结果里。调用createEnd函数是可选的,其功能是在回放结束时再发布一条特殊的记录,以标识回放结束。

参数sortColumn用于指定额外的排序列,如果用户回放的数据源为仅仅包含快照(snapshot),对于这种特殊情况需要指定replayJob函数的回放时间戳排序列参数sortColumn为NULL,则调用内置的replay函数时不加入sortColumn参数。

3.4createEnd函数:构造回放结束信号

函数定义代码:

defcreateEnd(tabName,sortColumn)\n{\ndbName=&34;\ntbName=&34;\nif(notexistsDatabase(dbName))\n{\ndb=database(directory=dbName,partitionType=VALUE,partitionScheme=2023.04.03..2023.04.04)\nendTb=table(2200.01.01T23:59:59.000asDateTime,`ENDaspoint,long(0)asApplSeqNum)\nendLine=db.createPartitionedTable(table=endTb,tableName=tbName,partitionColumns=`DateTime)\nendLine.append!(endTb)\n}\n\nds=replayDS(sqlObj=<select*fromloadTable(dbName,tbName)>,dateColumn=&39;,timeColumn=&39;)\n\ninputEnd=dict([&34;],[ds])\ndateEnd=dict([&34;],[`DateTime])\ntimeEnd=dict([&34;],[`DateTime])\nif(sortColumn==&34;)\n{\nreplay(inputTables=inputEnd,outputTables=objByName(tabName),dateColumn=dateEnd,timeColumn=timeEnd,replayRate=-1,absoluteRate=false,parallelLevel=1)\n}\nelse\n{\nreplay(inputTables=inputEnd,outputTables=objByName(tabName),dateColumn=dateEnd,timeColumn=timeEnd,replayRate=-1,absoluteRate=false,parallelLevel=1,sortColumns=sortColumn)\n}\n}

函数功能:

自定义函数createEnd是在回放结束时给用户提供一条回放结束信号,利用了replay回放模式中的N对1异构回放构造回放信号,会往参数tabName指定的异构流表中写入一条消息类型列为end的记录。为方便后期异构消费解析以及复用,此处为结束信号独立建一个数据库并创建分区表,表内必须有时间列,其他字段可选。并向该表写入一条模拟数据,其数据内容没有任何强制要求。DolphinDB建库建表相关知识可参考分区数据库。inputEnd、dateEnd、timeEnd字典的key按需设置为字符串end,将对应replay函数指定的输出表中的第二个字典,即消息类型。

参数sortColumn用于指定额外的排序列,如果用户回放的数据源为仅仅包含快照(snapshot),则调用内置的replay函数时不加入sortColumn参数,结果流表中的结束信号记录如下。

3.5封装函数视图

将以上函数利用addFunctionView封装成函数视图,具体代码如下。API端仅需要调用主函数stkReplay,第四章回放全部基于该函数视图。

addFunctionView(dsTb)\naddFunctionView(createEnd)\naddFunctionView(replayJob)\naddFunctionView(stkReplay)

4.API提交回放

4.1C++API

本例程序运行在Linux系统DolphinDBC++API环境下,编译成可执行文件以命令行输入参数执行。完整脚本见附录C++源码。

4.1.1C++API调用回放服务

C++API调用回放服务代码如下:

DictionarySPresult=conn.run(&34;,args);\nstringerrorCode=result->get(Util::createString(&34;))->getString();\nif(errorCode!=&34;)\n{\nstd::cout<<result->getString()<<endl;\nreturn-1;\n}

conn是DolphinDBC++APIDBConnection对象,C++应用可以通过它在DolphinDB服务器上执行脚本和函数并在两者之间双向传递数据。conn通过run方法调用stkReplay自定义函数,args容器内包含了stkReplay函数所需的参数。执行结果返回到字典result中,可以通过get方法来获得key为errorCode对应的值,如果错误代号不为1,则代表回放执行错误,返回错误信息并结束程序。

多用户进行回放时,需要对用户回放的流表进行命名,为了不重复命名以避免冲突,对每个回放用户生成唯一识别号,会生成形如“Eq8Jk8Dd0Tw5Ej8D”的识别字符串,代码如下:

stringuuid(intlen)\n{\nchar*str=(char*)malloc(len+1);\nsrand(getpid());\nfor(inti=0;i<len;++i)\n{\nswitch(i%3)\n{\ncase0:\nstr[i]=&39;+std::rand()%26;\nbreak;\ncase1:\nstr[i]=&39;+std::rand()%26;\nbreak;\ndefault:\nstr[i]=&39;+std::rand()%10;\nbreak;\n}\n}\nstr[len]=&39;;\nstd::stringrst=str;\nfree(str);\nreturnrst;\n}

4.2PythonAPI

本例程序运行在Windows系统DolphinDBPythonAPI环境下,可以在任何具备该环境的客户端执行。完整脚本见附录Python源码。

4.2.1PythonAPI调用回放服务

PythonAPI调用回放服务代码如下:

stk_list=[&39;,&39;]\nstart_date=&39;\nend_date=&39;\nreplay_rate=-1\nreplay_name=[&39;]\ns.upload({&39;:stk_list,&39;:start_date,&39;:end_date,&39;:replay_rate,&39;:uuidStr,&39;:replay_name})\ns.run(&34;)

stk_list是回放股票列表,start_date是回放开始日期,end_date是回放结束日期,replay_rate是回放速率,replay_uuid是回放流表名称,replay_name是回放数据源列表。s是DolphinDB会话,Python应用通过会话在DolphinDB服务器上执行脚本和函数以及在两者之间双向传递数据。使用s的upload方法上传Python对象,使用run方法执行stkReplay函数。

多用户进行回放时,需要对用户回放的流表进行命名,为了不重复命名以避免冲突,对每个回放用户生成唯一识别号,会生成形如“Eq8Jk8Dd0Tw5Ej8D”的识别字符串,代码如下:

defuuid(length):\nstr=&34;\nforiinrange(length):\nif(i%3==0):\nstr+=chr(ord(&39;)+random.randint(0,os.getpid()+1)%26)\nelif(i%3==1):\nstr+=chr(ord(&39;)+random.randint(0,os.getpid()+1)%26)\nelse:\nstr+=chr(ord(&39;)+random.randint(0,os.getpid()+1)%10)\nreturnstr\n\nuuidStr=uuid(16)

5.API订阅消费

5.1C++API

5.1.1反序列化器构造

为使异构流数据表反序列化输出,构造输出表结构,代码如下:

DictionarySPsnap_full_schema=conn.run(&34;dfs://Test_snapshot\\&34;snapshot\\&34;);\nDictionarySPorder_full_schema=conn.run(&34;dfs://Test_order\\&34;order\\&34;);\nDictionarySPtransac_full_schema=conn.run(&34;dfs://Test_transaction\\&34;transaction\\&34;);\nDictionarySPend_full_schema=conn.run(&34;dfs://End\\&34;endline\\&34;);\n\nunordered_map<string,DictionarySP>sym2schema;\nsym2schema[&34;]=snap_full_schema;\nsym2schema[&34;]=order_full_schema;\nsym2schema[&34;]=transac_full_schema;\nsym2schema[&34;]=end_full_schema;\nStreamDeserializerSPsdsp=newStreamDeserializer(sym2schema);

上述前四行分别是快照、逐笔委托、逐笔成交、结束信号的表结构构造,回放根据需要选择对应的表结构,结束信号表必须有。unordered_map创建了key->schema的映射表,可以将回放数据源和结束信号按照schema进行结构解析。通过StreamDeserializer对象来构造异构流表反序列化器。

5.1.2C++API订阅回放服务

在DolphinDBC++API中订阅异构流表,代码如下:

intlistenport=10260;\nThreadedClientthreadedClient(listenport);\nstringtableNameUuid=&34;+uuidStr;\nautothread=threadedClient.subscribe(hostName,port,myHandler,tableNameUuid,&34;,0,true,nullptr,true,500000,0.001,false,&34;,&34;,sdsp);\nstd::cout<<&34;+tableNameUuid<<endl;\nthread->join();

listenport是单线程客户端的订阅端口号,tableNameUuid是用户回放的流表名称,通过threadedClient.subscribe可以订阅回放的流表,详情用法参考C++API订阅。thread指向循环调用myHandler的线程的指针,线程在topic被取消订阅后会退出。

5.1.3消费函数构造

调用订阅函数threadedClient.subscribe订阅异构流数据表时,在最后一个参数指定相应的流数据反序列化实例StreamDeserializerSP,在订阅时会对收到的数据进行反序列化再传递给用户自定义的回调函数myHandler,用户可以在回调函数内自定义消费,本文仅实现了简单的输出消费,当消息msg的标识为end时,通过threadedClient.unsubscribe取消订阅,代码如下:

longsumcount=0;\nlonglongstarttime=Util::getNanoEpochTime();\nautomyHandler=[&](vector<Message>msgs)\n{\nfor(auto&msg:msgs)\n{\nstd::cout<<msg.getSymbol()<<&34;<<msg->getString()<<endl;\nif(msg.getSymbol()==&34;)\n{\nthreadedClient.unsubscribe(hostName,port,tableNameUuid,&34;);\n}\nsumcount+=msg->get(0)->size();\n}\nlonglongspeed=(Util::getNanoEpochTime()-starttime)/sumcount;\nstd::cout<<&34;<<speed<<&34;<<endl;\n};

5.1.4程序执行

将如上C++程序代码写入main.cpp中,在DolphinDBC++API环境下编译成可执行文件main,按照如下命令格式输入,回车,即可开始“回放-订阅-消费”过程。

单支股票最大速度回放“order“表一天数据命令:

$./main000616.SZ2021120120211201-1order

两支(多支)股票最大速度回放三张表一天数据命令:

$./main000616.SZ,000681.SZ2021120120211201-1snapshot,order,transaction

5.1.5消费输出

根据5.1.4构造的消费函数,两支股票(“000616.SZ”&“000681.SZ”)最大速度回放三张表一天数据输出如下图所示:

消费输出结果

5.2PythonAPI

5.2.1反序列化器构造

为使异构流数据表反序列化输出,PythonAPI通过streamDeserializer类来构造异构流表反序列化器,同时定义输出表结构,代码如下:

sd=ddb.streamDeserializer({\n&39;:[&34;,&34;],\n&39;:[&34;,&34;],\n&39;:[&34;,&34;],\n&39;:[&34;,&34;],\n},s)

上述主体部分分别是快照、逐笔委托、逐笔成交、结束信号的表结构构造,回放根据需要选择对应的表结构,结束信号表必须有。

5.2.2PythonAPI订阅回放服务

在DolphinDBPythonAPI中订阅异构流表,代码如下:

s.enableStreaming(0)\ns.subscribe(host=hostname,port=portname,handler=myHandler,tableName=&34;+uuidStr,actionName=&34;,offset=0,resub=False,msgAsTable=False,streamDeserializer=sd,userName=&34;,password=&34;)\nevent.wait()

enableStreaming函数启用流数据功能,函数参数指定开启数据订阅的端口。s使用subscribe函数来订阅DolphinDB中的流数据表,详情用法参考PythonAPI订阅。订阅是异步执行的,event.wait()保持主线程不退出。

5.2.3消费函数构造

调用订阅函数s.subscribe订阅异构流数据表时,在参数指定相应的流数据反序列化实例streamDeserializer,在订阅时会对收到的数据进行反序列化再传递给用户自定义的回调函数myHandler,用户可以在回调函数内自定义消费,仅输出消费代码如下:

defmyHandler(lst):\niflst[-1]==&34;:\nprint(&34;,lst)\neliflst[-1]==&39;:\nprint(&34;,lst)\neliflst[-1]==&39;:\nprint(&34;,lst)\nelse:\nprint(&34;,lst)\nevent.set()

5.2.4消费输出

根据5.2.3构造的消费函数,两支股票(“000616.SZ”&“000681.SZ”)最大速度回放三张表一天数据输出如下图所示:

消费输出结果

6.性能测试

测试的交易日范围从2021年12月1日至12月9日共7个连续交易日。测试脚本见附录:C++测试源码。

6.1测试服务器配置

CPU类型

Intel(R)Xeon(R)Silver4216CPU@2.10GHz

逻辑CPU总数

64

内存

503GB

硬盘

SSD

OS

CentOSLinuxrelease7.9.2009(Core)

DolphinDBServer版本

2.00.9.32023.03.29

6.250支深交所股票一天全速并发回放性能测试

测试方法:选取深交所2021年12月1日50支交易股票数据,linux后台同时启动多个第四章C++API提交回放程序,提交多个并发回放任务。

回放耗时=所有回放任务最晚结束时间-所有回放任务最早收到时间回放速率=所有用户回放数据量总和/回放耗时平均单个用户回放耗时=(所有回放任务结束时间和-所有回放任务开始时间和)/并发数量平均单个用户回放速率=单个用户回放数据量/平均单个用户回放耗时

并发数量

数据源

数据量

回放耗时

回放速率

单个用户回放耗时(平均)

单个用户回放速率(平均)

1

snapshottransactionorder

2,775,129行

约2.669s

约104w/s

约2.669s

约104w/s

10

snapshottransactionorder

27,751,290行

约10.161s

约273w/s

约8.482s

约32w/s

20

snapshottransactionorder

55,502,580行

约18.761s

约296w/s

约16.728s

约16w/s

40

snapshottransactionorder

111,005,160行

约35.537s

约312w/s

约19.091s

约14w/s

1

snapshottransaction

1,416,548行

约2.003s

约70w/s

约2.003s

约70w/s

10

snapshottransaction

14,165,480行

约7.155s

约198w/s

约6.382s

约22w/s

20

snapshottransaction

28,330,960行

约13.115s

约216w/s

约11.436s

约12w/s

40

snapshottransaction

56,661,920行

约27.003s

约210w/s

约13.740s

约10w/s

1

snapshot

194,045行

约1.387s

约14w/s

约1.387s

约14w/s

10

snapshot

1,940,450行

约6.428s

约30w/s

约5.128s

约4w/s

20

snapshot

3,880,900行

约11.782s

约33w/s

约10.539s

约2w/s

40

snapshot

7,761,800行

约23.274s

约33w/s

约12.393s

约1w/s

6.350支深交所股票跨天全速回放性能测试

测试方法:选取深交所50支交易股票数据,linux后台启动第四章C++API提交回放程序,提交多天回放任务。

回放耗时=回放任务结束时间-回放任务收到时间回放速率=回放数据量/回放耗时

数据源

数据量

回放耗时

回放速率

snapshottransactionorder

一天:2,775,129行

约2.925s

约95w/s

snapshottransactionorder

一周:17,366,642行

约16.393s

约106w/s

snapshottransaction

一天:1,416,548行

约1.912s

约74w/s

snapshottransaction

一周:8,899,562行

约9.142s

约97w/s

snapshot

一天:194,045行

约1.267s

约15w/s

snapshot

一周:1,279,869行

约6.659s

约19w/s

7.开发环境配置

部署DolphinDBServer

Server版本:2.00.9.32023.03.29Server部署模式:单服务器集群配置文件:cluster.cfg

maxMemSize=128\nmaxConnections=5000\nworkerNum=24\nlocalExecutors=23\nwebWorkerNum=2\nchunkCacheEngineMemSize=16\nnewValuePartitionPolicy=add\nlogLevel=INFO\nmaxLogSize=512\nnode1.volumes=/ssd/ssd3/pocTSDB/volumes/node1,/ssd/ssd4/pocTSDB/volumes/node1\nnode2.volumes=/ssd/ssd5/pocTSDB/volumes/node2,/ssd/ssd6/pocTSDB/volumes/node2\nnode3.volumes=/ssd/ssd7/pocTSDB/volumes/node3,/ssd/ssd8/pocTSDB/volumes/node3\ndiskIOConcurrencyLevel=0\nnode1.redoLogDir=/ssd/ssd3/pocTSDB/redoLog/node1\nnode2.redoLogDir=/ssd/ssd4/pocTSDB/redoLog/node2\nnode3.redoLogDir=/ssd/ssd5/pocTSDB/redoLog/node3\nnode1.chunkMetaDir=/ssd/ssd3/pocTSDB/metaDir/chunkMeta/node1\nnode2.chunkMetaDir=/ssd/ssd4/pocTSDB/metaDir/chunkMeta/node2\nnode3.chunkMetaDir=/ssd/ssd5/pocTSDB/metaDir/chunkMeta/node3\nnode1.persistenceDir=/ssd/ssd6/pocTSDB/persistenceDir/node1\nnode2.persistenceDir=/ssd/ssd7/pocTSDB/persistenceDir/node2\nnode3.persistenceDir=/ssd/ssd8/pocTSDB/persistenceDir/node3\nmaxPubConnections=128\nsubExecutors=24\nsubThrottle=1\npersistenceWorkerNum=1\nnode1.subPort=8825\nnode2.subPort=8826\nnode3.subPort=8827\nmaxPartitionNumPerQuery=200000\nstreamingHAMode=raft\nstreamingRaftGroups=2:node1:node2:node3\nnode1.streamingHADir=/ssd/ssd6/pocTSDB/streamingHADir/node1\nnode2.streamingHADir=/ssd/ssd7/pocTSDB/streamingHADir/node2\nnode3.streamingHADir=/ssd/ssd8/pocTSDB/streamingHADir/node3\nTSDBCacheEngineSize=16\nTSDBCacheEngineCompression=false\nnode1.TSDBRedoLogDir=/ssd/ssd3/pocTSDB/TSDBRedoLogDir/node1\nnode2.TSDBRedoLogDir=/ssd/ssd4/pocTSDB/TSDBRedoLogDir/node2\nnode3.TSDBRedoLogDir=/ssd/ssd5/pocTSDB/TSDBRedoLogDir/node3\nTSDBLevelFileIndexCacheSize=20\nTSDBLevelFileIndexCacheInvalidPercent=0.6\nlanCluster=0\nenableChunkGranularityConfig=true

路径配置参数需要开发人员根据实际环境配置

有关单服务器集群部署教程,请参考:单服务器集群部署。

DolphinDBclient开发环境

CPU类型:Intel(R)Core(TM)i5-11500@2.70GHz2.71GHz逻辑CPU总数:12内存:16GBOS:Windows11专业版DolphinDBGUI版本:1.30.20.1

DolphinDBGUI安装教程:DolphinDBGUI使用手册

DolphinDBC++API安装

C++API版本:release200.9

C++API版本建议按DolphinDBserver版本选择,如2.00.9版本的server安装release200.9分支的API

C++API教程:C++API教程

DolphinDBPythonAPI安装

PythonAPI版本:1.30.21.1

PythonAPI教程:PythonAPI教程

8.路线图(Roadmap)

在后续版本中,replay函数将支持原速回放,即严格以两条记录的时间戳之差为输出间隔向下游注入数据;在后续版本中,replay函数将支持原速倍速回放,即严格以两条记录的时间戳之差为输出间隔进行倍速再向下游注入数据。

9.总结

本教程基于高性能分布式时序数据库、数据回放工具、异构流数据表、C++API订阅消费、PythonAPI订阅消费等DolphinDB特性,给出了行情数据最佳分区存储方案,提供了多用户多支股票多天多表交易所行情数据回放最佳实践方案,意在使用户可以在此基础上快速搭建自己的行情回放系统。

10.附录

附录中的压缩包包含以下文件:

原始行情数据文件https://gitee.com/dolphindb/Tutorials_CN/blob/master/data/appendices_market_replay_bp逐笔委托建库建表脚本https://gitee.com/dolphindb/Tutorials_CN/blob/master/script/appendices_market_replay_bp/order_create.txt逐笔成交建库建表脚本https://gitee.com/dolphindb/Tutorials_CN/blob/master/script/appendices_market_replay_bp/transac_create.txt快照建库建表脚本https://gitee.com/dolphindb/Tutorials_CN/blob/master/script/appendices_market_replay_bp/snap_create.txt逐笔委托示例导入脚本https://gitee.com/dolphindb/Tutorials_CN/blob/master/script/appendices_market_replay_bp/order_upload.txt逐笔成交示例导入脚本https://gitee.com/dolphindb/Tutorials_CN/blob/master/script/appendices_market_replay_bp/transac_upload.txt快照示例导入脚本https://gitee.com/dolphindb/Tutorials_CN/blob/master/script/appendices_market_replay_bp/snap_upload.txt行情回放函数https://gitee.com/dolphindb/Tutorials_CN/blob/master/script/appendices_market_replay_bp/replay.txtC++源码https://gitee.com/dolphindb/Tutorials_CN/blob/master/script/appendices_market_replay_bp/cpp_replay.cppPython源码https://gitee.com/dolphindb/Tutorials_CN/blob/master/script/appendices_market_replay_bp/python_replay.pyC++测试源码https://gitee.com/dolphindb/Tutorials_CN/blob/master/script/appendices_market_replay_bp/cpp_replay_test.cpp

注意:由于原始行情数据文件较大,故采用分卷压缩方式。解压时:

下载所有的压缩分卷至同一目录。使用7z等解压缩软件提取名为data_pack.zip.001分卷,得到data_pack文件夹。在data_pack文件夹中,解压data.zip即可得到原始行情数据文件。

文章分享结束,txt文件上传网站源码分享和的答案你都知道了吗?欢迎再次光临本站哦!

Published by

风君子

独自遨游何稽首 揭天掀地慰生平