# mymes **Repository Path**: amor19/mymes ## Basic Information - **Project Name**: mymes - **Description**: mymes 是一个消息推送的服务平台,能够有效地聚合消息推送业务,加快业务开发流程 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2022-05-08 - **Last Updated**: 2022-09-27 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 项目介绍 **核心功能**:以统一的接口发送各种类型消息,并对消息生命周期全链路追踪。 **项目意义**:只要公司或者组织有发送消息的需求,就可以有类似`mymes`的项目,对各类消息进行统一发送处理,有利于功能的收拢、提高业务需求开发效率。 ![image-20220529115033051](https://test-1300999732.cos.ap-guangzhou.myqcloud.com/typora/20220529115034.png) ## 项目系统架构 ``` ├── mymes-common ├── mymes-cron ├── mymes-handler ├── mymes-service-api ├── mymes-service-api-impl ├── mymes-support ├── mymes-web └── static ``` 这是目前为止,mymes 项目的结构,各个模块的大致内容如下: - mymes-common:支撑整个模块的枚举类、常量、视图对象…… - mymes-cron:对 xxl-job 的封装以及对定时批处理任务的处理 - mymes-handler:从 MQ 中获取消息,执行短信、邮件消息的具体发送 - mymes-service-api:服务调用的 API,供后续分布式调用使用 - mymes-service-api-impl:服务 API 的具体实现,负责向 MQ 中发送消息以及发送消息前的一些处理 - mymes-support:系统的功能、工具支撑,包括责任链模式的具体实现、线程池的处理 - mymes-web:进行系统调试、可视化后台管理和使用的接口 **核心流程**:`mymes-service-api` 接收到发送消息请求,直接将请求进 `MQ`。`mymes-handler` 消费 `MQ` 消息后由各类消息的 Handler 进行发送处理。 ![image-20220529115949730](https://test-1300999732.cos.ap-guangzhou.myqcloud.com/typora/20220529115951.png) **使用 MQ**:发送消息实际上是调用各个服务提供的 API,假设某消息的服务超时,`mymes-service-api` 如果是直接调用服务,那存在**超时**风险,可能会拖垮整个接口的性能。MQ 在这是为了做异步和解耦,并且在一定程度上抗住业务流量。 **消息统一接入层的逻辑**: ![image-20220529161051946](README.assets/20220529161053.png) **PreParamCheckAction**:参数前置检查,最主要检查参数中给定的模板 ID 是否存在于数据库中,消息接收方是否不为空 **AssembleAction**: **AfterParamCheckAction**: **SendMqAction**: ## 核心设计与实现 ### 数据库设计 系统的数据库较为简单,仅由两张表构成,一张是存储模板的 `message_template` 表: ```mysql CREATE TABLE `message_template` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `name` varchar(100) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '标题', `audit_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '当前消息审核状态: 10.待审核 20.审核成功 30.被拒绝', `flow_id` varchar(50) COLLATE utf8mb4_unicode_ci COMMENT '工单ID', `msg_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '当前消息状态:10.新建 20.停用 30.启用 40.等待发送 50.发送中 60.发送成功 70.发送失败', `cron_task_id` bigint(20) COMMENT '定时任务Id (xxl-job-admin返回)', `cron_crowd_path` varchar(500) COMMENT '定时发送人群的文件路径', `expect_push_time` varchar(100) COLLATE utf8mb4_unicode_ci COMMENT '期望发送时间:0:立即发送 定时任务以及周期任务:cron表达式', `id_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '消息的发送ID类型:10. userId 20.did 30.手机号 40.openId 50.email', `send_channel` tinyint(4) NOT NULL DEFAULT '0' COMMENT '消息发送渠道:10.IM 20.Push 30.短信 40.Email 50.公众号 60.小程序', `template_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '10.运营类 20.技术类接口调用', `msg_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '10.通知类消息 20.营销类消息 30.验证码类消息', `msg_content` varchar(600) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '消息内容 占位符用{$var}表示', `send_account` tinyint(4) NOT NULL DEFAULT '0' COMMENT '发送账号 一个渠道下可存在多个账号', `creator` varchar(45) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '创建者', `updator` varchar(45) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '更新者', `auditor` varchar(45) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '审核人', `team` varchar(45) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '业务方团队', `proposer` varchar(45) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '业务方', `is_deleted` tinyint(4) NOT NULL DEFAULT '0' COMMENT '是否删除:0.不删除 1.删除', `created` int(11) NOT NULL DEFAULT '0' COMMENT '创建时间', `updated` int(11) NOT NULL DEFAULT '0' COMMENT '更新时间', PRIMARY KEY (`id`), KEY `idx_channel` (`send_channel`) ) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci COMMENT ='消息模板信息'; +------------------+--------------+------+-----+---------+----------------+ | Field | Type | Null | Key | Default | Extra | +------------------+--------------+------+-----+---------+----------------+ | id | bigint(20) | NO | PRI | NULL | auto_increment | | name | varchar(100) | NO | | | | | audit_status | tinyint(4) | NO | | 0 | | | flow_id | varchar(50) | YES | | NULL | | | msg_status | tinyint(4) | NO | | 0 | | | cron_task_id | bigint(20) | YES | | NULL | | | cron_crowd_path | varchar(500) | YES | | NULL | | | expect_push_time | varchar(100) | YES | | NULL | | | id_type | tinyint(4) | NO | | 0 | | | send_channel | tinyint(4) | NO | MUL | 0 | | | template_type | tinyint(4) | NO | | 0 | | | msg_type | tinyint(4) | NO | | 0 | | | msg_content | varchar(600) | NO | | | | | send_account | tinyint(4) | NO | | 0 | | | creator | varchar(45) | NO | | | | | updator | varchar(45) | NO | | | | | auditor | varchar(45) | NO | | | | | team | varchar(45) | NO | | | | | proposer | varchar(45) | NO | | | | | is_deleted | tinyint(4) | NO | | 0 | | | created | int(11) | NO | | 0 | | | updated | int(11) | NO | | 0 | | +------------------+--------------+------+-----+---------+----------------+ ``` 另一张是存储发送记录的日志表: ```mysql CREATE TABLE `sms_record` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `message_template_id` bigint(20) NOT NULL DEFAULT '0' COMMENT '消息模板ID', `phone` bigint(20) NOT NULL DEFAULT '0' COMMENT '手机号', `supplier_id` tinyint(4) NOT NULL DEFAULT '0' COMMENT '发送短信渠道商的ID', `supplier_name` varchar(40) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '发送短信渠道商的名称', `msg_content` varchar(600) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '短信发送的内容', `series_id` varchar(100) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '下发批次的ID', `charging_num` tinyint(4) NOT NULL DEFAULT '0' COMMENT '计费条数', `report_content` varchar(50) NOT NULL DEFAULT '' COMMENT '回执内容', `status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '短信状态: 10.发送 20.成功 30.失败', `send_date` int(11) NOT NULL DEFAULT '0' COMMENT '发送日期:20211112', `created` int(11) NOT NULL DEFAULT '0' COMMENT '创建时间', `updated` int(11) NOT NULL DEFAULT '0' COMMENT '更新时间', PRIMARY KEY (`id`), KEY `idx_send_date` (`send_date`) ) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci COMMENT ='短信记录信息'; +---------------------+--------------+------+-----+---------+----------------+ | Field | Type | Null | Key | Default | Extra | +---------------------+--------------+------+-----+---------+----------------+ | id | bigint(20) | NO | PRI | NULL | auto_increment | | message_template_id | bigint(20) | NO | | 0 | | | phone | bigint(20) | NO | | 0 | | | supplier_id | tinyint(4) | NO | | 0 | | | supplier_name | varchar(40) | NO | | | | | msg_content | varchar(600) | NO | | | | | series_id | varchar(100) | NO | | | | | charging_num | tinyint(4) | NO | | 0 | | | report_content | varchar(50) | NO | | | | | status | tinyint(4) | NO | | 0 | | | send_date | int(11) | NO | MUL | 0 | | | created | int(11) | NO | | 0 | | | updated | int(11) | NO | | 0 | | +---------------------+--------------+------+-----+---------+----------------+ ``` ### 消息录入和定时发送——后端以及 xxl-job 封装 ![image-20220906145825007](README.assets/image-20220906145825007.png) 业务方可以通过前端系统或者消息统一接入层来使用这个平台,前端系统面向一般业务方(不需要掌握 IT 技术),只需要在系统中设置好模板,提供发送消息的参数文件即可实现。 #### xxl-job 封装 我们当然可以直接使用 xxl-job 来规划定时任务,但是很显然,不懂 IT 技术的业务人员很难根据自身需求来使用 xxl-job,因此,我们需要对 xxl-job 进行再一次的封装,将它集成到 mymes 平台的后端中去。 xxl-job 的封装网上比比皆是,这里不做过多说明,大致原理就是 http 消息报文的发送和接收,首先要先记录下 xxl-job 所提供的几个接口: ```java /** * 任务信息接口路径 */ public static final String LOGIN_URL = "/login"; public static final String INSERT_URL = "/jobinfo/add"; public static final String UPDATE_URL = "/jobinfo/update"; public static final String DELETE_URL = "/jobinfo/remove"; public static final String RUN_URL = "/jobinfo/start"; public static final String STOP_URL = "/jobinfo/stop"; public static final String JOB_GROUP_PAGE_LIST = "/jobgroup/pageList"; public static final String JOB_GROUP_INSERT_URL = "/jobgroup/save"; ``` 然后就是封装我们要使用的那几个接口: ```java /** * 新增/修改 定时任务 * * @return 新增时返回任务Id,修改时无返回 */ BasicResultVO saveCronTask(XxlJobInfo xxlJobInfo); /** * 删除定时任务 * * @param taskId */ BasicResultVO deleteCronTask(Integer taskId); /** * 启动定时任务 * * @param taskId */ BasicResultVO startCronTask(Integer taskId); /** * 暂停定时任务 * * @param taskId */ BasicResultVO stopCronTask(Integer taskId); /** * 得到执行器Id * * @return */ BasicResultVO getGroupId(String appName, String title); /** * 创建执行器 */ BasicResultVO createGroup(XxlJobGroup xxlJobGroup); ``` 具体的实现在这里不做赘述,大都是填写表单的简单游戏。不过有一个问题就是要记录登录状态,这里是通过在每次向接口发送请求前都执行一次登录请求然后获取返回的 cookie 来解决的: ```java // 这个方法会在其它的每一个请求发送之前被调用一次,然后将返回的 cookie 附在新的请求上,以确保登录状态 private String getCookie() { Map params = MapUtil.newHashMap(); params.put("userName", xxlUserName); params.put("password", xxlPassword); params.put("randomCode", IdUtil.fastSimpleUUID()); String path = xxlAddresses + XxlJobConstant.LOGIN_URL; HttpResponse response = null; try { response = HttpRequest.post(path).form(params).execute(); if (response.isOk()) { List cookies = response.getCookies(); StringBuilder sb = new StringBuilder(); for (HttpCookie cookie : cookies) { sb.append(cookie.toString()); } return sb.toString(); } } catch (Exception e) { log.error("CronTaskService#createGroup getCookie,e:{},param:{},response:{}", Throwables.getStackTraceAsString(e) , JSON.toJSONString(params), JSON.toJSONString(response)); } return null; } ``` #### 后端接口 这里介绍几个主要的后端接口。 ##### 测试接口 ```java @PostMapping("test") @ApiOperation("/测试发送接口") public BasicResultVO test(@RequestBody MessageTemplateParam messageTemplateParam) { Map variables = JSON.parseObject(messageTemplateParam.getMsgContent(), Map.class); MessageParam messageParam = MessageParam.builder().receiver(messageTemplateParam.getReceiver()).variables(variables).build(); SendRequest sendRequest = SendRequest.builder().code(BusinessCode.COMMON_SEND.getCode()).messageTemplateId(messageTemplateParam.getId()).messageParam(messageParam).build(); SendResponse response = sendService.send(sendRequest); if (response.getCode() != RespStatusEnum.SUCCESS.getCode()) { return BasicResultVO.fail(response.getMsg()); } return BasicResultVO.success(response); } ``` 这个接口将根据前端传入的测试消息信息直接执行一次实时的发送。 ##### 存储/更新定时任务 ```java /** * 如果Id存在,则修改 * 如果Id不存在,则保存 */ @PostMapping("/save") @ApiOperation("/插入数据") public BasicResultVO saveOrUpdate(@RequestBody MessageTemplate messageTemplate) { MessageTemplate info = messageTemplateService.saveOrUpdate(messageTemplate); return BasicResultVO.success(info); } @Override public MessageTemplate saveOrUpdate(MessageTemplate messageTemplate) { if (messageTemplate.getId() == null) { initStatus(messageTemplate); } else { resetStatus(messageTemplate); } messageTemplate.setUpdated(Math.toIntExact(DateUtil.currentSeconds())); return messageTemplateDao.save(messageTemplate); } ``` ##### 启动模板的定时任务 启动模板定时任务就用到了之前由我们封装好的 xxl-job。 ```java /** * 启动模板的定时任务 */ @PostMapping("start/{id}") @ApiOperation("/启动模板的定时任务") public BasicResultVO start(@RequestBody @PathVariable("id") Long id) { return messageTemplateService.startCronTask(id); } @Override public BasicResultVO startCronTask(Long id) { // 1.修改模板状态 MessageTemplate messageTemplate = messageTemplateDao.findById(id).get(); // 2.动态创建或更新定时任务 XxlJobInfo xxlJobInfo = xxlJobUtils.buildXxlJobInfo(messageTemplate); // 3.获取taskId(如果本身存在则复用原有任务,如果不存在则得到新建后任务ID) Integer taskId = messageTemplate.getCronTaskId(); BasicResultVO basicResultVO = cronTaskService.saveCronTask(xxlJobInfo); if (taskId == null && RespStatusEnum.SUCCESS.getCode().equals(basicResultVO.getStatus()) && basicResultVO.getData() != null) { taskId = Integer.valueOf(String.valueOf(basicResultVO.getData())); } // 4. 启动定时任务 if (taskId != null) { cronTaskService.startCronTask(taskId); MessageTemplate clone = ObjectUtil.clone(messageTemplate).setMsgStatus(MessageStatus.RUN.getCode()).setCronTaskId(taskId).setUpdated(Math.toIntExact(DateUtil.currentSeconds())); messageTemplateDao.save(clone); return BasicResultVO.success(); } return BasicResultVO.fail(); } ``` ##### 上传批发送文件 批发送文件中保存的是发送对象以及发送的消息占位符所对应的内容。 ```java /** * 上传人群文件 */ @PostMapping("upload") @ApiOperation("/上传人群文件") public BasicResultVO upload(@RequestParam("file") MultipartFile file) { String fileName = IdUtil.fastSimpleUUID() + file.getOriginalFilename(); File dest = new File(new File(dataPath).getAbsolutePath()+ "/" + fileName); // 构造父层文件夹 if (!dest.getParentFile().exists()) { dest.getParentFile().mkdirs(); } try { file.transferTo(dest); // 保存文件 } catch (Exception e) { log.error("MessageTemplateController#upload fail! e:{},params{}", Throwables.getStackTraceAsString(e), JSON.toJSONString(file)); return BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR); } return BasicResultVO.success(MapUtil.of(new String[][]{{"value", dest.getAbsolutePath()}})); } ``` ### 消息封装——责任链模式的实现 责任链的实现有许多种方式。本项目中,责任链模式用于将一条要发送的消息封装成为 `TaskInfo` 投递到 MQ 中,其处的位置如下: ![image-20220906125611879](README.assets/image-20220906125611879.png) 1、定义在责任链各个节点中被流传和处理的数据(这个数据包含一个标志位): ```java public class ProcessContext { /** * 标识责任链的code */ private String code; /** * 存储责任链上下文数据的模型 */ private ProcessModel processModel; /** * 责任链中断的标识 */ private Boolean needBreak = false; /** * 流程处理的结果 */ BasicResultVO response = BasicResultVO.success(); } // ProcessContext 也不过是一个简单的接口 public interface ProcessModel { } ``` 2、定义一个接口来规定责任链中每一个节点的行为: ```java public interface BusinessProcess { /** * 真正处理逻辑 * @param context */ void process(ProcessContext context); } ``` 3、定义“装下”责任链的容器: ```java public class ProcessController { /** * 模板映射,根据 Code 获取相应的责任链 */ private Map templateConfig = null; public ProcessContext process(ProcessContext context) { if (!preCheck(context)) { return context; } // 遍历某个流程节点,出现异常往外抛,遇到终止就结束 List processList = templateConfig.get(context.getCode()).getProcessList(); for (BusinessProcess businessProcess : processList) { businessProcess.process(context); if (context.getNeedBreak()) { break; } } return context; } private Boolean preCheck(ProcessContext context) { // 上下文 if (context == null) { context.setResponse(BasicResultVO.fail(RespStatusEnum.CONTEXT_IS_NULL)); return false; } // 业务代码 String businessCode = context.getCode(); if (StrUtil.isBlank(businessCode)) { context.setResponse(BasicResultVO.fail(RespStatusEnum.BUSINESS_CODE_IS_NULL)); return false; } // 执行模板 ProcessTemplate processTemplate = templateConfig.get(businessCode); if (processTemplate == null) { context.setResponse(BasicResultVO.fail(RespStatusEnum.PROCESS_TEMPLATE_IS_NULL)); return false; } // 执行模板列表 List processList = processTemplate.getProcessList(); if (CollUtil.isEmpty(processList)) { context.setResponse(BasicResultVO.fail(RespStatusEnum.PROCESS_LIST_IS_NULL)); return false; } return true; } } ``` 4、责任链的形成 `ProcessController` 中存在一个类型为 `Map` 的 map,它的作用就是存储不同的责任链,每一个 Code 都可以对应一条 `ProcessTemplate`: ![image-20220906111323183](README.assets/image-20220906111323183.png) 存储行为链的容器: ```java public class ProcessTemplate { private List processList; public List getProcessList() { return processList; } public void setProcessList(List processList) { this.processList = processList; } } ``` 5、行为实例的生成和成链由一个 Config 完成: ```java public class PipelineConfig { @Bean("commonSendTemplate") public ProcessTemplate commonSendTemplate() { // 将各种行为连接起来 ProcessTemplate processTemplate = new ProcessTemplate(); ArrayList processList = new ArrayList<>(); processList.add(preParamCheckAction()); processList.add(assembleAction()); processList.add(afterParamCheckAction()); processList.add(sendMqAction()); processTemplate.setProcessList(processList); return processTemplate; } // 创建 ProcessController @Bean public ProcessController processController() { ProcessController processController = new ProcessController(); Map templateConfig = new HashMap<>(4); // 根据代码存入责任链 templateConfig.put(BusinessCode.COMMON_SEND.getCode(), commonSendTemplate()); processController.setTemplateConfig(templateConfig); return processController; } // 生成具体的行为实例 @Bean public AssembleAction assembleAction() { return new AssembleAction(); } @Bean public PreParamCheckAction preParamCheckAction() { return new PreParamCheckAction(); } @Bean public AfterParamCheckAction afterParamCheckAction() { return new AfterParamCheckAction(); } @Bean public SendMqAction sendMqAction() { return new SendMqAction(); } } ``` 6、具体的行为实例 ![image-20220906112524624](README.assets/image-20220906112524624.png) 这些具体的行为只需要实现 `BusinessProcess` 接口即可成为责任链中的一员。 ### 消息入队——责任链的具体处理 ![image-20220906150002334](README.assets/image-20220906150002334.png) 在调用 send 方法后,消息将会被送入到责任链中进行举例的处理,目前责任链只进行 4 种处理: 1. 前置参数检查 2. 将消息封装成任务 3. 后置参数检查 4. 将封装好的任务发送到 MQ 文章之前已经介绍了责任处理过程中由 `ProcessContext` 来作为责任链处理的上下文环境,而 `ProcessContext` 中还存储着一些标志位,所以还需要进一步的将我们的数据封装在 `ProcessModel` 中,这个接口就是一个空接口,而要被发送的消息就将会先被封装成 `sendTaskModel`,`sendTaskModel` 实现了 `ProcessModel` 接口: ```java public class SendTaskModel implements ProcessModel { /** * 消息模板Id */ private Long messageTemplateId; /** * 请求参数,包含接受者是谁,替换占位符的具体内容 */ private List messageParamList; /** * 发送任务的信息 */ private List taskInfo; } ``` 再来看看 send 方法的实现: ```java public SendResponse send(SendRequest sendRequest) { // 将消息封装成 sendTaskModel SendTaskModel sendTaskModel = SendTaskModel.builder() .messageTemplateId(sendRequest.getMessageTemplateId()) .messageParamList(Collections.singletonList(sendRequest.getMessageParam())) .build(); // 将封装好的 sendTaskModel 进一步封装成 ProcessContext ProcessContext context = ProcessContext.builder() .code(sendRequest.getCode()) .processModel(sendTaskModel) .needBreak(false) .response(BasicResultVO.success()).build(); // 放入责任链进行处理 ProcessContext process = processController.process(context); // 获取责任链处理的结果 return new SendResponse(process.getResponse().getStatus(), process.getResponse().getMsg()); } ``` #### 前置参数检查 前置参数检查主要做几件事: 1. 检查这条消息有没有消息模板ID 2. 检查这条消息有没有接受者,并且过滤掉为 null 的接收者(接受者可能不止一个,接受者之间用逗号分割) 3. 检查消息有没有消息内容 只有当上述三者都有之后,这个节点的检查就算过去了。 #### 参数组装 参数组装就是将前置参数检查中的三个参数和根据消息模板 ID 查询得到的其它字段关联起来,并且根据模板消息的数据数进一步细化消息类型,细化后的消息类型包括: 1. sms 2. email 根据模板消息 ID 查询得到的字段中有一个指名发送渠道的字段,叫做 `sendChannel`,而系统中存在 `ChannelType` 枚举类,使用反射将消息封装成对应发送渠道的消息类型: ```java public enum ChannelType { SMS(30, "sms(短信)", SmsContentModel.class, "sms"), EMAIL(40, "email(邮件)", EmailContentModel.class, "email"); private Integer code; private String description; private Class contentModelClass; private String codeEn; // 通过 sendChannel 字段可以获取到具体消息类型的类型信息 public static Class getChanelModelClassByCode(Integer code) { ChannelType[] values = values(); for (ChannelType value : values) { if (value.getCode().equals(code)) { return value.getContentModelClass(); } } return null; } } ``` 进一步封装成一个 `TaskInfo`,这个对象中主要包含这么几个重要字段: 1. 消息模板的 ID 2. 接收者的集合 3. 发送渠道 4. 消息内容 #### 后置参数检查 在前置参数检查中,我们只检查了消息接收者是否存在,但是忽略了消息接收者的合法性问题,比如错误的邮箱,错误的手机号,这一步骤主要是用来检查和移除不合法的接收者: ```java // 1. 过滤掉不合法的手机号 filterIllegalPhoneNum(taskInfo); // 2. 过滤不合法的邮箱 filterIllegalEmail(taskInfo); ``` #### 将封装好的任务发送到 MQ 打上 topic,经由 kafka 将任务列表发送到 MQ 中。 ### 定时批量消息懒处理 所谓的定时消息就是当到达指定的时间后,系统自动将消息入队。批量消息就意味着有许多消息接收者,现实生活中可能是百万、千万级别的。这就需要我们系统对批量消息的发送做一些优化了。 这是整个懒处理系统的框架: ![image-20220906170830073](README.assets/image-20220906170830073.png) ### 消费者数据隔离 消费者负责从 MQ 中获取数据进行消费(也就是开始真正地发送消息),