# long-polling **Repository Path**: devine/long-polling ## Basic Information - **Project Name**: long-polling - **Description**: 长轮询demo - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2021-05-07 - **Last Updated**: 2021-05-07 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 长轮询 ## 实时通信的几种方式 - socket > 实时通信,能够做到双方通信 - 轮训 > 实现简单,调用次数频繁,消息通知会产生延迟 - 长轮询 > 服务端实现复杂,消息实时性高,能够做到实时通信 ## MQ中采用的通信方式 > 在MQ中,多数会采用长轮询的方式获取消息,我们来分析一下这几种方式各自有什么优势 | 通信方式 | 实现复杂度 | 消息实时性 | 缺点 | | -------- | ---------- | ---------- | ------------------------------------------------------------ | | socket | 一般 | 实时性高 | 需要维持链接,服务端推送消息,无法感知客户端的消费速率,容易造成客户端消息积压 | | 轮训 | 简单 | 实时性低 | 消息容易产生延迟,频繁调用,网络开销大 | | 长轮询 | 复杂 | 实时性高 | 实现起来较为复杂 | ## 长轮询的核心思想 > 用户发起一个请求,服务端将请求异步,并持有这个请求,当服务端数据发生变更的时候,找到相应的请求,并将数据返回,没有数据发送变更时,服务端通过延时的定时任务,将请求返回,客户端在收到请求后,重新发起新的请求。长轮询减少了请求次数,并提高了获取变更数据的实时性。 ## 长轮询代码实现 ### `LongPollingController` > 提供http接口,供外部调用,并发起长轮询 ```java @RestController @RequestMapping("longPolling") public class LongPollingController { @Autowired private LongPollingAdapter longPollingAdapter; @PostMapping("listen") public void listen(@Validated @RequestBody LongPollingDto longPollingDto, HttpServletRequest request, HttpServletResponse response) { longPollingAdapter.doLongPolling(longPollingDto, request, response); } } ``` ### `LongPollingAdapter` > 兼容短轮训,对于支持长轮询的请求,开始执行长轮询 ```java public class LongPollingAdapter { @Autowired private LongPollingService longPollingService; public void doLongPolling(LongPollingDto longPollingDto, HttpServletRequest request, HttpServletResponse response) { // 判断是否支持长轮询 if (isSupportLongPolling(request)) { longPollingService.addLongPollingClient(longPollingDto, request, response); return; } // 兼容短轮训 ConfigData configData = ConfigDataPersistent.getConfigData(longPollingDto.getAppCode()); if (configData == null) { ResponseUtils.generatorResponse(response, Response.builder().code(-1).message("appCode不存在").build()); return; } if (longPollingDto.getMd5().equals(configData.getMd5())) { // 数据未发生变化 ResponseUtils.generatorResponse(response, Response.builder().success()); } ResponseUtils.generatorResponse(response, Response.builder().data(configData)); } private boolean isSupportLongPolling(HttpServletRequest request) { return request.getHeader(CommonConstant.LONG_POLLING_TIMEOUT) != null; } } ``` ### `LongPollingService` > - 继承`SingleEventListener`,用于触发`LocalDataChangeEvent`事件 > - `LongPollingClient`,hold住请求,并开启延时定时任务,当定时执行时,返回改变的数据,并释放连接 > - `DataChangeTask`,用于触发`LocalDataChangeEvent`事件时,返回改变的数据,并释放连接 ```java @Service public class LongPollingService extends SingleEventListener { @Value("${pool.with.fix.rate:false}") private boolean isFixPoll; private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 8, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy()); private static final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4); private static final Queue longPollingClientQueue = new ConcurrentLinkedQueue<>(); public void addLongPollingClient(LongPollingDto longPollingDto, HttpServletRequest request, HttpServletResponse response) { String timeoutStr = request.getHeader(CommonConstant.LONG_POLLING_TIMEOUT); int timeout = Math.max(10000, Integer.valueOf(timeoutStr) - 500); if (isFixPolling()) { timeout = 10000; } else { // 检测数据是否发生变化 ConfigData configData = ConfigDataPersistent.getConfigData(longPollingDto.getAppCode()); if (configData != null && !configData.getMd5().equals(longPollingDto.getMd5())) { // 数据发生变化,直接返回 ResponseUtils.generatorResponse(response, Response.builder().data(configData)); return; } } // 开启异步 AsyncContext asyncContext = request.startAsync(); // 通过线程池异步请求 executor.execute(new LongPollingClient(longPollingDto.getAppCode(), longPollingDto.getMd5(), asyncContext, timeout)); } /** * 是否以固定的频率返回数据 */ private boolean isFixPolling() { return isFixPoll; } @Override public Class interest() { return LocalDataChangeEvent.class; } /** * 触发事件 * * @param event */ @Override public void onEvent(Event event) { if (isFixPolling()) { // 忽略,待定时执行时返回变更的数据 } else { if (event instanceof LocalDataChangeEvent) { LocalDataChangeEvent localDataChangeEvent = (LocalDataChangeEvent) event; executor.execute(new DataChangeTask(localDataChangeEvent.getAppCode())); } } } @Data class LongPollingClient implements Runnable { private final String appCode; private final String md5; private final AsyncContext asyncContext; private final int timeout; Future asyncTimeoutFuture; public LongPollingClient(String appCode, String md5, AsyncContext asyncContext, int timeout) { this.appCode = appCode; this.md5 = md5; this.asyncContext = asyncContext; this.timeout = timeout; } @Override public void run() { asyncTimeoutFuture = executorService.schedule(() -> { longPollingClientQueue.remove(this); // 固定时间的模式通过此定时返回变更的数据 if (isFixPolling()) { ConfigData configData = ConfigDataPersistent.getConfigData(appCode); if (configData != null && !md5.equals(configData.getMd5())) { sendResponse(configData); } else { sendResponse(null); } } else { // 无数据变更,只是使用此方式hold请求, sendResponse(null); } }, timeout, TimeUnit.MILLISECONDS); longPollingClientQueue.add(this); } void sendResponse(ConfigData configData) { // 取消定时 if (asyncTimeoutFuture != null) { asyncTimeoutFuture.cancel(false); } generatorResponse(configData); } void generatorResponse(ConfigData configData) { if (configData != null) { ResponseUtils.generatorResponse((HttpServletResponse) asyncContext.getResponse(), Response.builder().data(configData)); } // 结束hold住的请求 asyncContext.complete(); } } class DataChangeTask implements Runnable { private final String appCode; public DataChangeTask(String appCode) { this.appCode = appCode; } @Override public void run() { if (CollectionUtils.isEmpty(longPollingClientQueue)) { return; } Iterator iterator = longPollingClientQueue.iterator(); while (iterator.hasNext()) { LongPollingClient longPollingClient = iterator.next(); if (appCode.equals(longPollingClient.getAppCode())) { // 移除持有的请求 iterator.remove(); ConfigData configData = ConfigDataPersistent.getConfigData(appCode); longPollingClient.sendResponse(configData); } } } } } ``` ### 事件驱动推送数据 #### `EventListener` ```java public interface EventListener { /** * 感兴趣的事件 * * @return */ List> interests(); /** * 触发事件 * * @param event */ void onEvent(Event event); } ``` > listener顶层接口,定时感兴趣的事件,以及触发事件 #### `AbstractEventListener` ```java @Slf4j public abstract class AbstractEventListener implements EventListener { /** * spring初始化bean的时候,自动添加listener * */ @PostConstruct public void init() { List> interests = interests(); if (CollectionUtils.isEmpty(interests)) { log.error("没有感兴趣的事件"); return; } interests.forEach(clazz -> EventDispatcher.addListener(clazz, this)); } } ``` > 定义抽象类,用于在bean初始化时,自动注册listener #### `SingleEventListener` ```java public abstract class SingleEventListener extends AbstractEventListener { /** * 感兴趣的时间 * * @return */ @Override public final List> interests() { return Lists.newArrayList(interest()); } public abstract Class interest(); } ``` > 只对单个事件感兴趣 #### `EventDispatcher` ```java public class EventDispatcher { private EventDispatcher() { } private static final Map, List> listenerHolder = new ConcurrentHashMap<>(); public static final void addListener(Class clazz, EventListener listener) { List listeners; // 非线程安全,需要使用synchronized synchronized (EventDispatcher.class) { if (listenerHolder.containsKey(clazz)) { listeners = listenerHolder.get(clazz); } else { listeners = new ArrayList<>(); listenerHolder.put(clazz, listeners); } } listeners.add(listener); } public static final List getListeners(Class clazz) { return listenerHolder.get(clazz); } public static final void fireEvent(Event event) { // 获取listener List listeners = listenerHolder.get(event.getClass()); if (!CollectionUtils.isEmpty(listeners)) { listeners.forEach(listener -> listener.onEvent(event)); } } } ``` -