# dataBridge **Repository Path**: qylcode/real-data-processing-system ## Basic Information - **Project Name**: dataBridge - **Description**: 实时解析binlog日志,将mysql数据 同步到不同的存储介质上,同时可以通过规则预设,对流式数据加工和处理。 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 1 - **Created**: 2023-03-10 - **Last Updated**: 2023-04-11 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # data-birdge实时数据加工处理系统 #### 介绍 解析binlog日志,将mysql数据 同步到不同的存储介质上,同时可以通过规则预设,对流式数据加工和处理。 #### 软件架构 软件架构 ![image](https://foruda.gitee.com/images/1678974932890812045/3b7fbc05_10855182.png) #### 安装教程 1. 开启mysql binlog 在mysql5.7版本中,my.ini文件中添加 ``` log_bin=mysql-bin binlog-format=Row server-id=1 ``` 2. 安装 zookeeper和kafka 因为kafka依赖zookeeper,所以安装kafka前要安装好zk,启动kafka前要启动zk。 同时在接收到binlog消息后转发成MQ时,为了方便以后扩展,使用工厂模式获取MQ客户端,计划支持rocket mq/ rabbit mq 发送,目前暂时支持kafka客户端。 #### todo列表 1、bridge 目前一共2个查询用到了,一个是查询topicMapping-getBridgeRopicMapping,一个是查询表列名集合,后续查询列明 需要优化。 2、监控binlog 目前是使用 数据库地址,没有具体到库。 3、操作数据目标表 使用单独的connection,需要初始化,有独立标识。 4、getTargetDataBaseConnection 待优化 参数。 ### 数据库表 其中 在元数据配置data_syn_cal库中配置好 数据源库表和 加工转移后的库表。元数据库硬依赖,数据源库表和 转移的目标库表 按实际业务进行配置。 1、元数据配置库 1) 库名:mysql data_syn_cal 2) topic_mapping 监控配置表 ``` CREATE TABLE `topic_mapping` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `topic_name` varchar(50) DEFAULT NULL, `topic_type` varchar(50) DEFAULT NULL COMMENT '1原单表 2加工表', `database_code` varchar(50) DEFAULT NULL COMMENT '数据源code', `database_name` varchar(200) DEFAULT NULL COMMENT '数据源名称', `database_table` varchar(50) DEFAULT NULL COMMENT '数据源表', `primary_id` varchar(50) DEFAULT NULL COMMENT '目标表业务主键', `foreign_id` varchar(50) DEFAULT NULL COMMENT '原表业务主键', `date_fields` varchar(200) DEFAULT NULL COMMENT '日期字段', `fields_mapping` varchar(200) DEFAULT NULL COMMENT '属性对应关系', `ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `bu_id` tinyint(4) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 ``` 属性解释: ![img.png](img.png) 1)topic_name等于 kafka_topic_.students 监听的是students表的binlog,同理,topic_name等于 kafka_topic_.scores 监听的是scores表的binlog。 2)database_table 表示要转移的库表 3)primary_id 表示转移目标表中的业务主键,例如student_id 表示 目标表 以student_id为维度,进行添加和更新。名称可能日后会优化。 4)foreign_id 表示数据源表 的业务主键。例如students表 业务主键是id,学生成绩表的业务主键是student_id 2、监听的测试库 1)库名 students_manage 2)表1 scores 学生成绩表 CREATE TABLE `scores` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `student_id` varchar(50) DEFAULT NULL, `score` tinyint(4) DEFAULT NULL, `ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8 3)表2 students 学生信息表 CREATE TABLE `students` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(50) DEFAULT NULL, `age` tinyint(4) DEFAULT NULL, `sex` tinyint(4) DEFAULT NULL, `ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=105 DEFAULT CHARSET=utf8 3、数据加工和转存的目标库 1)库名 unified_data_warehous 2)students_manage_students_scores 学生信息和成绩汇总表 CREATE TABLE `students_manage_students_scores` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `student_id` varchar(50) DEFAULT NULL, `name` varchar(50) DEFAULT NULL, `age` varchar(50) DEFAULT NULL, `score` tinyint(4) DEFAULT NULL, `ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `sex` tinyint(4) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=23 DEFAULT CHARSET=utf8 #### 其他 https://www.jianshu.com/p/1674f8b6aeaf 提交远程分支方法