十分钟验证一个高性能车联网数据平台解决方案
导读 本文将分享车联网大数据平台的解决方案。
全文目录:
-
高性能车联网大数据平台要具备的能力
-
每秒 1.8 亿写入的车联网大数据平台应用实例
-
基于 DolphinDB 的车联网大数据处理架构
-
代码附录
分享嘉宾|DolphinDB
智能网联汽车在车联网的应用上,通常是以智能传感器、物联网、GIS 技术为基础,结合大数据、人工智能技术,通过 OT (Operation Technology)和 IT (Information Technology) 融合的方式,实现智能车辆的辅助驾驶、状态监控、远程管理、数据分析及决策等功能。同时,通过对云端大数据的实时分析,还可以对运营车辆实现行程报警、路径规划、电子围栏、订单跟踪等企业级功能。
车联网云端大数据最重要的工作之一,是处理海量的 GPS 轨迹数据。GPS 轨迹数据本质上是带时间标签的时序数据(time series data),市面上很多时序数据库都能够满足时序数据的简单存储和简单查询需求。但在完整的车联网应用场景中,绝大部分时序数据库是无法直接输出最终业务所需结果的,也无法将时序数据与业务数据进行关联查询。
因此通常做法是在时序数据库的基础上,配合复杂的系统架构来支撑业务需求。例如,当我们想要将 GPS 轨迹跟车辆识别码、订单关联时,需要将 GPS 轨迹数据提取到应用端,使用关系数据库和编程工具进行二次处理。这种方式虽然能解决业务查询问题,但是在一定程度上增加了系统的复杂性,并且在性能、开发难度、数据挖掘等方面受到架构限制。
车联网一站式大数据处理平台是一个综合性系统,用于收集、存储、处理和分析车辆产生的庞大数据量。它提供了全面的解决方案,能够以更智能、高效的方式管理整个车联网生态系统。
那么一个优异的高性能车联网大数据平台需要具备哪些能力呢?
- 强大的数据对接能力:
强大的数据对接能力是车联网大数据平台的核心要素之一。该平台需具备多协议写入、乱序数据写入等先进技术,以实现端到端的无缝数据写入。先进的数据对接能力使得车联网大数据平台在数据流入时无需经过繁琐的二次处理,直接将数据安全、高效地入库存储。这不仅提高了数据写入的效率,还确保了数据质量,为后续的数据分析和挖掘奠定基础。
- 多模数据的存储能力:
平台能够集中管理车辆各种类型的数据,如状态、位置、驾驶行为和传感器信息等,使数据更易于访问和监控。通过支持多模数据存储,平台极大地简化了数据的访问和监控过程。无论是进行实时监控、历史回溯还是深度分析,用户可以轻松地获取所需的数据。此外,平台还提供了灵活而强大的数据检索工具,使用户能够迅速定位并提取感兴趣的信息,进一步提高了数据的可用性和实用性。
- 高性能、低延时、高吞吐的流式计算能力:
通过高性能的流式计算,平台能够处理大规模、高密度的实时数据流,确保信息的即时性和精准性。低延时使系统能够实现实时响应,有效提升了监控和管理的效能。同时,高吞吐的流式计算能力能够轻松处理多源、高频的数据输入,满足车联网系统对大规模数据处理的需求,为实时监控车辆状态、及时跟进车辆的实时情况和驾驶员行为提供底层架构支撑。
- 强大的数据分析能力:
平台具备强大的数据分析能力,涵盖完备的数据分析工具、卓越的海量数据处理能力以及便捷的可视化工具。这一综合性能有助于平台处理庞大的车辆数据集,从预测性维护、驾驶习惯、燃油效率等多个方面为用户提供深刻洞察。这种深度的数据解读不仅能够提升车辆的维护效率,还有助于制定更智能、高效的运营策略,从而全面提升整个车联网系统的性能和管理水平。
每秒1.8亿写入、单车7000个测点......一个真实的车联网大数据平台应用场景案例:
某新能源车企的数据写入与诊断十分复杂,面临两大痛点与挑战:一方面是每秒1.8亿 测点大数据量的写入,在保障大数据量的写入下,需要满足查询性能要求;另一方面,在车辆诊断场景中,单车测点达7000,需要大宽表存储支持,并对车辆诊断场景的海量数据进行计算与分析。
首先,在数据写入及查询 系统方面,这个系统需要处理来自数以十万计车辆的海量实时数据。每辆车定期产生并上传数据,每条数据包含着多达7000个测点 的信息,预计每秒数据量将达到1.5G。这意味着这一系统必须具备高效的数据写入和查询能力,以确保数据的实时性和可用性。
其次,在车辆诊断场景中,应用系统负责对部分车辆进行诊断,并对这些车辆的七天历史数据进行分析。每辆车都拥有7000个测点,这些测点记录着车辆的各种运行参数和性能指标。这样的数据量和复杂性要求强大的数据处理和分析能力,以便及时发现车辆的故障和问题,并进行有效诊断和处理。
经过前期调研与综合考量,该车企挑选了 DB-Engines 上国内排名第一的时序数据库 DolphinDB 进行测试。经过测试后,DolphinDB 为该车企提供了如下特性:
- 满足每秒1.8亿测点的写入,且资源使用率较低,cpu、内存都稳定在40%左右,保证了系统的稳定性和可靠性。
- 写入过程中,单点查询平均耗时在 100ms 以内,为用户提供了快速响应的数据查询体验。
- 对于车辆诊断系统,DolphinDB 能满足 7000 项指标的大宽表模型,无需分库分表,进一步简化了系统架构,提高了数据管理的效率。
- 历史数据的批量导入性能可达2.2亿点每秒。
- 提供 1600+ 内置函数,提供强大的数据分析与计算能力。
- 提供实时计算能力,能够满足边缘端计算 需求,可实现云边端架构部署,进一步提高了系统的可扩展性和适应性。(向下滑至文末,加群了解完整解决方案)
基于 DolphinDB 的车联网大数据处理架构:
接下来我们给大家详细介绍上文提到的车联网大数据处理架构,并提供了轻量化版本(完整脚本代码在附件中),大家可以用10分钟左右快速验证。
在这一架构中,时间信息、车牌、经纬度、速度等多数据源的海量数据从采集层进入 DolphinDB 大数据平台,注入流数据表中。DolphinDB 通过订阅流数据表 ,并与订单业务、车辆配置等数据进行关联查询,实现分析与监测预警。输出的结果进入应用层,对接业务系统、消息中间件,或通过多种接口进行可视化展示。架构图如下所示:
基于 DolphinDB 的车联网大数据处理架构图
作为一个基于时序数据库管理系统,支持数据分析、流计算的低延时平台,DolphinDB 具有轻量化、一站式的特点,不仅可以高速存储海量结构化数据,还能在库内直接进行复杂计算,内置的高性能流数据处理框架满足了实时流计算的需求,且脚本语言对标准 SQL 高度兼容,简单易上手。
使用这一架构可以实现海量轨迹数据的存储,车辆、订单的关联聚合查询,以及结果直接输出的完整流程。下面我们给出了8个 查询案例场景,完整的脚本代码在附件中,任何开发人员都可以花10分钟左右的时间进行复现。
以场景7 为例,将轨迹表(8.6亿)和订单表(100w)进行关联,返回某个配送订单的全部车辆运行轨迹,耗时在112毫秒左右:
场景8中,将某个订单的数据,按60倍速持续写入一个新表中,读取新表数据并输出到 GIS 系统的地图中,就可以非常方便地实现某个订单车辆配送轨迹的实时播放,轻松回放行驶路径,用于异常排查。
SQL 语句及参考耗时
|----|-------------------|-------|-------------------------------------------------------------------------------------------------------------------------------------------------|
| 序号 | 场景 | 耗时 | SQL语句 |
| 1 | 统计车辆经纬数据总数 | 1ms | select count(*) from drives |
| 2 | 按车牌+时间,查询车辆经纬数据 | 4ms | select * from drives where ts=2022.07.01 22:10:10.000 , code="浙A100207" |
| 3 | 按车牌,统计数据总数 | 5ms | select count(*) from drives where code="浙A100207" |
| 4 | 按车牌,查看车辆与总部距离 | 3ms | select ts,code,string(long(distance(poi,point(lng, lat)))/1000)+"km" as distance from drives where ts=2022.07.01 22:10:10.000 , code="浙A105207" |
| 5 | 按车牌,查询一天的所有数据 | 3ms | select * from drives where code="浙A165207" and ts between 2023.01.01 00:00:00.000:2023.01.01 23:59:59.999 |
| 6 | 按车牌按每小时统计平均车速 | 12ms | select avg(velocity) from drives where code="浙A165207" group by bar(ts,1H) |
| 7 | 按订单ID,查询该订单所有路径 | 112ms | //定义存储过程orderQueryorderQuery(1000006) |
| 8 | 以60倍速回放某订单的车辆行驶轨迹 | - | replay函数 |
10分钟轻松验证( Windows 版)
|----|------------------|------|------------------------------------------------------------------|
| 步骤 | 任务 | 预计耗时 | 操作描述 |
| 1 | 部署DolphinDB大数据环境 | 1分钟 | 下载 DolphinDB,并解压(免安装) |
| 2 | 运行 | 1秒 | 双击 dolphindb.exe 文件,开启实例 |
| 3 | 运行开发环境 | 10秒 | 打开 http://localhost:8848,网页上可执行 SQL 等脚本 |
| 4 | 模拟生成8.64亿数据 | 8分钟 | 复制《data.txt》脚本,执行。(注意,此处模拟的是仿真数据,即每一条数据都是单独生成的,而不是简单地把一份数据重复复制。) |
| 5 | 验证查询性能 | 3分钟 | 复制《query.txt》脚本,依次执行,观察耗时 |
注:以上脚本可在附录中查找获取
安装部署
- 下载官网社区最新版,建议2.00.9及以上版本。
传送门:
https://www.dolphindb.cn/downloads/DolphinDB_Win64_V2.00.9.3.zip
- windows 解压路径,不能有空格,避免安装到 Program Files 路径下。
官网教程:standalone_server.md · dolphindb/Tutorials_CN - Gitee
- 本次测试使用免费的社区版,企业版 license 可申请免费试用。
获取方式:http://dolphindb.cn
- 安装及测试过程中,有任何问题,可添加小助手微信(dolphindb1)咨询。
欢迎大家动手尝试,一起来验证一下吧!
想要了解更多物联网行业资讯与方案?动手扫描下方二维码,即可加入物联网行业交流群 并获得电力、能源、车联网等行业解决方案资料,捕捉行业热点讯息,与近百位同行交流心得~
关注公众号 DolphinDB 物联网 了解工业制造、智能运维、能源电力等更多物联网场景解决方案!
DolphinDB 是一款拥有自主知识产权的国产自研 数据库软件,支持主流国产操作系统和芯片。目前已广泛应用于诸多行业和场景,包括车联网、电力、能源、化工、工业制造、智慧矿山等行业的数据存储、分析、设备监控预警 场景,以及银行、券商、公募基金、私募基金、交易所等金融机构的投研、交易和风控场景。
附录
数据集
|--------|---------|-------|
| 表描述 | 表名 | 数据量 |
| 车辆信息表 | t_car | 10万 |
| 订单信息表 | t_order | 100万 |
| GPS轨迹表 | t_drive | 8.64亿 |
环境配置
|------|---------------------------------------------------------|
| 项目 | 参数 |
| 操作系统 | DELL Latitude 5420 笔记本电脑 windows 11 (22621.521) |
| CPU | 11th Gen Intel(R) Core(TM) i5-1145G7 @ 2.60GHz 1.50 GHz |
| 内存 | 16G |
| 磁盘 | SSD 512G |
| 服务端 | DolphinDB 2.00.9 |
验证说明
-
统计耗时使用 timer 函数,即排除网络传输和序列化影响,仅统计服务端全部数据处理完成的时间。
-
性能受磁盘 IO、CPU、网络等系统资源的影响,如测试环境不同,表格中的性能实测数据可能会有差异。
-
web 端的交互编程执行方式,可以框选单条脚本,按 Ctrl-E 执行。也可以全选,按 Ctrl-E 执行。
-
模拟车辆轨迹写入的性能接近200万条/秒(1000万点/秒),可以作为真实数据写入性能的参考(排除协议连接、网络传输、序列化等耗时)。
-
性能测试优先保障性能,配置文件 dolphindb.cfg 中可以限制资源(核数、内存等)。
《data.txt》:建库建表,模拟数据生成
//步骤一:登录
login(`admin,`123456)
//步骤二:建库、建表
//1.车辆信息表:t_car
if(existsDatabase("dfs://t_car")){dropDatabase("dfs://t_car")}
create database "dfs://t_car" partitioned by VALUE([`code])
create table "dfs://t_car"."car" (
code SYMBOL, //车牌
model SYMBOL, //型号
emissions SYMBOL, //排量
brand SYMBOL //品牌
)
//2.配送订单表:t_order
if(existsDatabase("dfs://t_order")){dropDatabase("dfs://t_order")}
create database "dfs://t_order" partitioned by VALUE([date(now())]), engine="TSDB"
create table "dfs://t_order"."order" (
orderid LONG, //订单号
ts TIMESTAMP, //下单时间
btime TIMESTAMP, //配送起始时间
etime TIMESTAMP, //配送截止时间
code SYMBOL, //车牌
blng DOUBLE, //起始经度
blat DOUBLE, //起始纬度
elng DOUBLE, //目的地经度
elat DOUBLE //目的地纬度
)
partitioned by ts
sortColumns=[`code,`ts],
sortKeyMappingFunction=[hashBucket{,50}]
//3.车辆行驶路径表:dfs_drive
if(existsDatabase("dfs://dfs_drive")){dropDatabase("dfs://dfs_drive")}
create database "dfs://dfs_drive" partitioned by VALUE([date(now())]),HASH([SYMBOL,60]),engine="TSDB"
create table "dfs://dfs_drive"."drive" (
ts TIMESTAMP, //时间戳
code SYMBOL, //车牌
lng DOUBLE, //经度
lat DOUBLE, //纬度
velocity INT, //速度
altitude INT, //海拔
direction INT //方向
)
partitioned by ts,code
sortColumns=[`code,`ts],
sortKeyMappingFunction=[hashBucket{,50}]
//步骤三:模拟写入仿真数据
//写入车辆信息表:t_car(1万条)
n=100000
code=100001..200000 //产生序列数据
code="浙A"+string(code)
model=rand(`搅拌车`泵车`砂石车,n) //rand随机函数,用于产生数量为 n 的向量值
emissions=string(rand(5..10,n))+`升
brand=rand(`SANY`ZOOMLION`XCMG`LOXA`FANGYUAN`RJST,n)
t=table(code,model,emissions,brand)
t_car=loadTable("dfs://t_car",`car)
t_car.append!(t)
select count(*) from t_car //数据检查
select top 10 * from t_car
//写入订单信息表:t_order(100万条)
n=1000000
orderid=1000001..2000000 //产生序列数据
ts=take(2023.01.01..2023.01.10,n) //产生10天的订单
ts=sort(ts) //向量结构排序:10w条1月1日+10w条1月2日...+10w条1月10日
codes=select code from loadTable("dfs://t_car",`car) //获取1万车牌号码
code=take(codes.code,n) //向量结构:10w条车牌序列 x 10天=100w
btime=temporalAdd(datetime(ts),rand(14400,n)+32400,"s") //开始配送时间:9点~13点随机
etime=temporalAdd(datetime(btime),rand(18000,n)+3600,"s") //配送时间:1小时~5小时随机
blng=103.60972+rand(1.0,n)-0.5
blat=30.81841+rand(1.0,n)-0.5
elng=103.60972+rand(1.0,n)-0.5
elat=30.81841+rand(1.0,n)-0.5
t=table(orderid,ts,btime,etime,code,blng,blat,elng,elat)
t_order=loadTable("dfs://t_order",`order)
t_order.append!(t)
select count(*) from t_order //数据检查
select top 10 * from t_order
//写入车辆轨迹数据,8.64亿/天
def write_data(){
for(ts in 2023.01.01..2023.01.01){
//将10w车牌拆分成50份,写入50次(可通过降低拆分数量,进一步提高速度。如内存不支持,可能会Out Of Memory)
for(i in 1..50){
n=8640
j=(i-1)*2000
codes=select code from loadTable("dfs://t_car",`car) limit j , 2000*i
time = datetime(ts)+ 10*(0..(n-1))
lng=103.60972+rand(1.0,n)-0.5
lat=30.81841+rand(1.0,n)-0.5
velocity=rand(100,n)
altitude=rand(300,n)
direction=rand(360,n)
t=table(time as ts,lng,lat,velocity,altitude,direction)
tt = cj(t,codes) //关联车牌和数据,每次写入量:2000*8640
reorderColumns!(tt,loadTable("dfs://dfs_drive",`drive).schema().colDefs.name)
loadTable("dfs://dfs_drive",`drive).append!(tt)
tt=NULL
}
}
}
submitJob(`write_data,`write_data,write_data) //后台执行写入操作
drives=loadTable("dfs://dfs_drive",`drive) //数据检查
select count(*) from drives
《query.txt》:性能测试
// 步骤四:数据准备工作
//检查作业状态(预计执行8分钟)
select jobId,startTime as 开始时间,endTime as 结束时间,(endTime-startTime)/1000 as 执行秒数 from getRecentJobs(1)
//确定作业完成后,执行刷盘,LevelFile合并,清除缓存,确保性能测试的准确。
//因为短时间导入了大量数据,部分数据还在内存(CacheEngine)中,并逐步写入磁盘。为确保性能测试时,数据是从磁盘中读取,需要进行刷盘操作。
flushTSDBCache()
//LevelFile合并:优化历史数据的查询性能
chunkIds = exec chunkId from getChunksMeta() where type=1
for (x in chunkIds) {
triggerTSDBCompaction(x)
}
//清除缓存,确保测试性能准确
clearAllCache()
// 步骤五:查询统计
//全量数据检查:
/*1. 统计车辆经纬数据总数*/
drives=loadTable("dfs://dfs_drive",`drive)
timer t=select count(*) from drives
t
select top 10 * from drives
/*2. 按车牌+时间,查询车辆经纬数据*/
timer t=select * from drives where ts=2023.01.01 22:10:10.000 , code="浙A100207"
t
/*3 按车牌,统计数据总数*/
timer t=select count(*) from drives where code="浙A100207"
t
/*4 按车牌,查看车辆与总部距离*/
poi=point(104.102683,30.482596) //总部经纬度
timer t=select ts,code,string(long(distance(poi,point(lng, lat)))/1000)+`km as distance from drives where ts=2023.01.01 22:10:10.000 , code="浙A105207"
t
/*5 按车牌,查询一天的所有数据*/
timer t=select * from drives where code="浙A165207" and ts between 2023.01.01 00:00:00.000:2023.01.01 23:59:59.999
t
/*6 按车牌查询每小时的平均车速 */
timer t=select avg(velocity) from drives where code="浙A165207" group by bar(ts,1H)
t
/*7 按订单ID,查询某订单所有路径*/
//新建自定义函数,用来查询订单(100w)的轨迹(8.6亿)
def orderQuery(oid){
t=select code,btime,etime from loadTable("dfs://t_order",`order) where orderid=oid
carcode = t.code[0]
tt=select * from loadTable("dfs://dfs_drive",`drive) where code=carcode,ts between t.btime[0]:t.etime[0]
return tt
}
//执行订单查询
timer t=orderQuery(1000006)
t
//添加存储过程(函数视图):执行后,可通过api调用此函数
try{dropFunctionView(`orderQuery)}catch(x){}
addFunctionView(orderQuery)
/*8 以60倍速(每秒钟播放真实时间1分钟的轨迹数据)的速率,播放某订单的车辆行驶轨迹*/
rate=60 //回放倍速
t=orderQuery(1000006) //需要回放的数据
show=table(1:0,t.schema().colDefs.name,t.schema().colDefs.typeInt)
submitJob("replay_drive","回放订单轨迹", replay, t,show, `ts, `ts, rate,false)
//持续执行(可通过share函数将表共享,以输出到GIS系统的可视化地图)
select * from show order by ts desc
有关文内所示函数及详细参数、教程与脚本代码可点击文末「链接」获取~