1 Star 11 Fork 9

shuai7boy / TrafficBySparkAndKafka

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
Readme.md 4.75 KB
一键复制 编辑 原始数据 按行查看 历史
shuai7boy 提交于 2020-06-09 14:31 . update Readme.md.

一、项目简介

TrafficBySparkAndKafka是一个车辆监控项目。主要实现了三个功能:

1.计算每一个区域车流量最多的前3条道路。

2.计算道路转换率

3.实时统计道路拥堵情况(当前时间,卡口编号,车辆总数,速度总数,平均速度)

二、项目结构


├─TrafficBySparkAndKafka
├─data
└─src
   ├─main
   │  ├─java
   │  │  └─vip
   │  │      ├─producedate2hive(模拟数据到文件和Hive)
   │  │      ├─shuai7boy
   │  │      │  └─trafficTemp
   │  │      │      ├─areaRoadFlow(每个区域top3道路速度统计。道路转换率。)
   │  │      │      ├─conf (获取配置文件帮助类)
   │  │      │      ├─constant (接口静态类,防止硬编码)
   │  │      │      ├─dao
   │  │      │      │  ├─factory (工厂类)
   │  │      │      │  └─impl (接口实现类)
   │  │      │      ├─domain (属性定义类)
   │  │      │      ├─jdbc (jdbc帮助类)
   │  │      │      ├─rtmroad(实时统计道路拥堵情况)
   │  │      │      ├─skynet 
   │  │      │      └─util (帮助类)
   │  │      └─spark
   │  │          └─spark  
   │  │              └─test (模拟实时数据)
   │  ├─resources
   │  └─scala
   │      └─top
   │          └─shuai7boy
   │              └─trafficTemp
   │                  └─areaRoadFlow (利用scala和java互调用,实现top3道路速度统计)

   └─test
       └─java

三、数据源

数据源类型:

monitor_flow_action(每个摄像头的监控数据)

当天日期 卡口编号 摄像头编号 车牌号 拍摄时间 车速 道路编号 区域编号

2020-05-08  0001   34287  京M80025    2020-05-08 05:35:58    57 25 03
2020-05-08 0005   99132  京M80025    2020-05-08 05:51:28    149    50 04

monitor_camera_info(卡口和摄像头对应编号)

0006    00443
0006   25745
0006   98681
0006   36400

存储介质:

如果在本地运行的话,这里读取的是本地文件。

如果在集群运行,对于批处理读取的是Hive,对于流处理这里读取Kafka。

四、数据转换流程

1.计算每一个区域车流量最多的前3条道路。

  • 从表traffic.monitor_flow_action根据日期获取车流量监控日志信息。

    挡在集群中时,traffic.monitor_flow_action代表的是hive中的表,当在本地运行时,traffic.monitor_flow_action是本地创建的临时表。

  • 从area_info表中获取区域信息。

    area_info是MySql中的表。

  • 根据步骤二获取的区域信息,补全监控日志名称。根据join,map即可拼接一个新的RDD,并将RDD转换为DataFrame的临时表tmp_car_flow_basic。

  • 统计各个区域的道路车流量。

    使用Spark SQL根据区域名称,道路ID进行分组。即可统计出每个区域,每条道路对应的车流量。

  • 统计每个区域top3车流量。

    利用开窗函数进行统计。row_number() over(partition by area_name order by road_id desc)

用到的技术:Hive,Spark SQL,临时表,MySql,JDBC,join,map,RDD转换DataFrame。

2.计算道路转换率

  • 从MySql拿出我们要对比的转换路段

  • 从日志拿出指定日期的监控数据

  • 将监控数据转换为键值对(car,row)格式

  • 计算每个路段的匹配情况。

    逻辑:将第三步拿到的数据,根据car进行分组,映射键值对。将轨迹信息根据时间进行排序,然后拼接。

    将我们指定的路段(第一步获取到的)和上面拼接的数据进行比对,得出匹配情况。(路段,匹配次数)

  • 因为上面求的是多辆车的 (路段,匹配次数)。这步使用reduceByKey进行聚合,将相同路段进行汇总。

  • 获取转化率。

    转换率=(这次路段的匹配度)/(上次路段的匹配度)即可得到。、

    这次路段的匹配度=(聚合数据.get(路段))

用到的技术:mapToPair,groupByKey,flatMapToPair(进来一辆车,出去多个对应路段信息),reduceByKey。

3.实时统计道路拥堵情况(根据车辆和车速判断)

  • 根据日志获取(卡口ID,汽车速度)格式数据
  • 获取(卡口ID,(汽车速度,1))格式数据,后面的1代表车辆数
  • 获取(卡口ID,(汽车总速度,总车辆数))
  • 打印车辆(卡口,总速度,总车辆,平均速度)

用到的技术:map,mapToPair,mapValues(仅仅针对value进行map,(key,(value,1))格式数据),reduceByKeyAndWindow。

大数据系列传送门

1
https://gitee.com/shuai7boy/TrafficBySparkAndKafka.git
git@gitee.com:shuai7boy/TrafficBySparkAndKafka.git
shuai7boy
TrafficBySparkAndKafka
TrafficBySparkAndKafka
master

搜索帮助