# IMApplication **Repository Path**: guchuanhang/imapplication ## Basic Information - **Project Name**: IMApplication - **Description**: IM 通信一种实现 - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-01-25 - **Last Updated**: 2025-01-25 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README 由于安卓资源管理器展示的路径不尽相同,各种软件保存文件的位置也不一定一样.对于普通用户上传文件时,查找文件可能是一个麻烦的事情.后来想到了一个办法,使用pc端进行辅助上传. @[TOC] # 实现思路 1. pc端与服务器建立websocket连接; 2. 服务器将sessionId传递到pc端; 3. pc端生成二维码; 4. 手机端扫描二维码,读取pc端sessionId; 5. 手机端与服务器建立websocket连接; 6. 手机端将fileId(后面再解释)、pc端sessionId、token等参数传递给服务器; 7. 服务器更新pc端session 对应的fileId; 8. 服务器将fileId、token等发送到pc端; 9. pc使用token、fileId等请求文件列表并进行展示; 10. 手机端、pc端进行文件修改后,向服务器发送给更新信号,服务器将更新信号转发到对端。 # 1.0 实现 ## 定义web与客户端通信数据类型和数据格式 1. 定义web与客户端通信数据类型 ```java public class MsgType { public static final int UPDATE = 0; //提示客户端数据发生更新 public static final int REQ = 1; //发送/接受fileId等字段 public static final int SELF = 3; //建立连接后,web端发送client其sessionId public static final int ERR_SESSION = 4; //提示session不存在或已close public static final int HEART_BEAT = 100; //心跳包 } ``` 2. 定义web与客户端通信数据格式 ```java @Data public class MsgData { private int type; //对应 MsgType private String sessionId; //SELF 对应自身sessionId; REQ 对应pc端sessionId; private String fileId; //建立连接后,向pc端发送fileId等字段 ``` ## web端websocket实现 创建spring-boot项目,添加web\websocket相关依赖 使用maven引入websocket依赖; ```maven org.springframework.boot spring-boot-starter-websocket ``` 配置websocket和访问路径的映射关系 ```java @Configuration //配置websocket和访问路径的映射关系 @EnableWebSocket // 全局开启WebSocket支持 public class WebSocketConfig implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(new WebSocketServer(), "/websocket").setAllowedOrigins("*"); } } ``` ### web端对客户端数据的管理 1. 定义web管理session的数据结构 ```java @Data public class SessionData { private int sessionType; // 1 master(app). 0 pc private String fileId; //pc会话ID private WebSocketSession session; private String sessionId; //虽然可以通过session.getId()获取到sessionId,但session关闭后,读取就会报错 ``` web端对session的管理逻辑 1. 新创建的连接添加到链表上,web向客户端发送SELF,告知其对应的sessionId; 2. 断开连接时,如果是pc端session直接从链表中删除,如果是app端session,将其他相同fileId的session全部关闭并从链表删除; 3. 接收到新消息后,根据消息类型进行分类处理: 1. 心跳包,则直接返回; 2. REQ app发送的fileId\pc端sessionId等字段,修改sessions上app连接和pc端SessionData内的fileId字段; 并将fileId等字段发送给pc端; 3. UPDATE 给所有相同fileId的session发送更新信号; 注意: sessions遍历\删除\添加必须添加synchronized,否则ConcurrentModificationException ```java package com.example.im.ws; import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; /** * @ClassName WebSocketServer * @Description 处理websocket 连接 * @Author guchuanhang * @date 2025/1/25 14:01 * @Version 1.0 **/ @Slf4j public class WebSocketServer extends TextWebSocketHandler { private final Object syncObject = new Object(); private final List sessions = new ArrayList<>(); @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { log.info("New connection established: " + session.getId()); SessionData sessionData = new SessionData(session); synchronized (syncObject) { sessions.add(sessionData); } MsgData msgData = new MsgData(); msgData.setType(MsgType.SELF); msgData.setSessionId(session.getId()); session.sendMessage(new TextMessage(new Gson().toJson(msgData))); } @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { String payload = message.getPayload(); log.info("handleTextMessage: " + session.getId()); log.info("Received message: " + payload); final MsgData msgData = new Gson().fromJson(payload, MsgData.class); //master 发来的需求. switch (msgData.getType()) { case MsgType.HEART_BEAT: { //heart beat break; } case MsgType.REQ: { //set master { SessionData sessionData = null; synchronized (syncObject) { final Optional any = sessions.stream(). filter(s -> s.getSessionId() .equals(session.getId())).findAny(); if (any.isPresent()) { sessionData = any.get(); } } if (null != sessionData) { //set master. sessionData.setSessionType(ClientType.MASTER); sessionData.setFileId(msgData.getFileId()); } } //set slave { SessionData sessionData = null; synchronized (syncObject) { final Optional any = sessions.stream(). filter(s -> s.getSessionId().equals(msgData.getSessionId())).findAny(); if (any.isPresent()) { sessionData = any.get(); } } if (null != sessionData) { sessionData.setSessionType(ClientType.SALVER); sessionData.setFileId(msgData.getFileId()); MsgData msgData1 = new MsgData(); msgData1.setType(MsgType.REQ); msgData1.setFileId(msgData.getFileId()); sessionData.getSession().sendMessage(new TextMessage(new Gson().toJson(msgData1))); } else { //pc session error. MsgData msgData1 = new MsgData(); msgData1.setType(MsgType.ERR_SESSION); session.sendMessage(new TextMessage(new Gson().toJson(msgData1))); } } break; } case MsgType.UPDATE: { //slf SessionData sessionData = null; synchronized (syncObject) { final Optional any = sessions.stream(). filter(s -> s.getSessionId().equals(session.getId())).findAny(); if (any.isPresent()) { sessionData = any.get(); } } if (null != sessionData) { final String fileId = sessionData.getFileId(); List collect; synchronized (syncObject) { collect = sessions.stream().filter(s -> (null != s.getFileId() && s.getFileId(). equals(fileId)) || (null == s.getSession() || !s.getSession().isOpen())).collect(Collectors.toList()); } if (collect.isEmpty()) { return; } List errList = new ArrayList<>(); for (SessionData s : collect) { if (null == s.getSession() || !s.getSession().isOpen()) { errList.add(s); continue; } //不需要给自己发送了 if (s.getSessionId().equals(session.getId())) { continue; } MsgData msgData1 = new MsgData(); msgData1.setType(MsgType.UPDATE); try { s.getSession().sendMessage(new TextMessage(new Gson().toJson(msgData1))); } catch (Exception e) { e.printStackTrace(); errList.add(s); } } synchronized (syncObject) { sessions.removeAll(errList); } } break; } } } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { log.info("Connection closed: " + session.getId() + ", Status: " + status); SessionData sessionData = null; synchronized (syncObject) { Optional any = sessions.stream(). filter(s -> s.getSessionId().equals(session.getId())).findAny(); if (any.isPresent()) { sessionData = any.get(); } } if (null == sessionData) { return; } final String fileId = sessionData.getFileId(); //slave just ignore and delete. if (ClientType.SALVER == sessionData.getSessionType()) { sessions.remove(sessionData); return; } if (ClientType.MASTER == sessionData.getSessionType()) { List collect; synchronized (syncObject) { collect = sessions.stream().filter(s -> (null != s.getFileId() && s.getFileId().equals(fileId)) || (null == s.getSession() || !s.getSession().isOpen())).collect(Collectors.toList()); } if (collect.isEmpty()) { return; } for (SessionData s : collect) { final WebSocketSession session1 = s.getSession(); if (null == session1 || !session1.isOpen()) { continue; } session1.close(); } synchronized (syncObject) { sessions.removeAll(collect); } } } } ``` ## pc端实现 1. 页面创建时创建websocket,销毁时关闭websocket 2. 根据和服务器约定的消息格式 在websocket回调函数onmessage接受数据类型进行二维码生成\文件列表查询等操作 3. 添加心跳机制,让websocket更健壮 fileId是一个key,通过fileId可以查询最新的数据. pc端接受到刷新信号后,请求获取最新数据; pc端更新数据后,发送数据已更新信号. ```vue ``` ## OkHttp3建立websocket连接 1. 使用okhttp3建立websocket连接,监听onMessage根据消息类型进行不同的处理; 2. 使用handler 管理心跳包 扫码后, 如果已经建立连接了 ```java package com.example.im.ws; import android.content.Intent; import android.os.Bundle; import android.os.Handler; import android.os.Looper; import android.os.Message; import android.text.TextUtils; import android.widget.EditText; import android.widget.TextView; import androidx.annotation.NonNull; import androidx.annotation.Nullable; import androidx.appcompat.app.AppCompatActivity; import com.example.im.R; import com.google.gson.Gson; import com.google.zxing.integration.android.IntentIntegrator; import com.google.zxing.integration.android.IntentResult; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; import java.util.Locale; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.WebSocket; import okhttp3.WebSocketListener; public class HelloActivity extends AppCompatActivity { public static final int MSG_HEART = 0x123; public static final int MSG_INTERVAL = 3000; private WebSocket webSocket; public static final String URL = "ws://192.168.0.110:7890/websocket"; private TextView msgView; private List sessionIds = new ArrayList<>(); Handler mHandler = new Handler(Looper.getMainLooper()) { @Override public void handleMessage(@NonNull Message msg) { super.handleMessage(msg); if (MSG_HEART == msg.what) { MsgData msgData = new MsgData(); msgData.setType(MsgType.HEART_BEAT); webSocket.send(new Gson().toJson(msgData)); msgView.append(getNowDate() + ":发送消息 heart beat\n"); mHandler.sendMessageDelayed(mHandler.obtainMessage(MSG_HEART), MSG_INTERVAL); } } }; @Override protected void onCreate(@Nullable Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); msgView = findViewById(R.id.tv_msg); findViewById(R.id.btn_scan).setOnClickListener(v -> { scanQRCode(); }); findViewById(R.id.btn_update).setOnClickListener(v -> { MsgData msgData = new MsgData(); msgData.setType(MsgType.UPDATE); webSocket.send(new Gson().toJson(msgData)); }); } @Override protected void onDestroy() { mHandler.removeCallbacksAndMessages(null); super.onDestroy(); } private void scanQRCode() { IntentIntegrator integrator = new IntentIntegrator(this); integrator.setDesiredBarcodeFormats(IntentIntegrator.QR_CODE); integrator.setPrompt("提示"); integrator.setCameraId(0); // 使用后置摄像头 integrator.setBeepEnabled(false); integrator.setBarcodeImageEnabled(true); integrator.initiateScan(); } @Override protected void onActivityResult(int requestCode, int resultCode, Intent data) { IntentResult result = IntentIntegrator.parseActivityResult(requestCode, resultCode, data); if (result != null && !TextUtils.isEmpty(result.getContents())) { String sessionId = result.getContents(); if (sessionIds.contains(sessionId)) { return; } sessionIds.add(sessionId); //start if (null == webSocket) { OkHttpClient client = new OkHttpClient(); Request request = new Request.Builder().url(URL).build(); webSocket = client.newWebSocket(request, new MyWebSocketListener()); } else { //这样可以实现扫多个pc端. 同时与多个pc端通信 MsgData msgData = new MsgData(); msgData.setSessionId(sessionId); msgData.setType(MsgType.REQ); msgData.setFileId("123"); webSocket.send(new Gson().toJson(msgData)); } } super.onActivityResult(requestCode, resultCode, data); } private String getNowDate() { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.getDefault()); return simpleDateFormat.format(new java.util.Date()); } private class MyWebSocketListener extends WebSocketListener { @Override public void onOpen(WebSocket webSocket, okhttp3.Response response) { // 连接成功 msgView.append(getNowDate() + ":连接成功\n"); MsgData msgData = new MsgData(); msgData.setSessionId(sessionIds.get(sessionIds.size() - 1)); msgData.setType(MsgType.REQ); msgData.setFileId("123"); webSocket.send(new Gson().toJson(msgData)); mHandler.sendMessageDelayed(mHandler.obtainMessage(MSG_HEART), MSG_INTERVAL); } @Override public void onMessage(WebSocket webSocket, String text) { msgView.append(getNowDate() + ":接受消息" + text + "\n"); mHandler.removeMessages(MSG_HEART); mHandler.sendMessageDelayed(mHandler.obtainMessage(MSG_HEART), MSG_INTERVAL); } @Override public void onFailure(WebSocket webSocket, Throwable t, okhttp3.Response response) { // 连接失败 msgView.append(getNowDate() + ":失败" + t.getMessage() + "\n"); } } } ``` # 2.0版本 上面的实现确实简单.下面结合实际的系统架构进行适配一下. ## spring-boot放到nginx后面 nginx常用来进行负载均衡\防火墙\反向代理等等,这种情况比较常见. ```nginx map $http_upgrade $connection_upgrade { default upgrade; '' close; } server { listen 7777; server_name localhost; location / { proxy_pass http://127.0.0.1:7890; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; } } } ``` 设置Upgrade\Connection请求头,将访问地址修改为nginx的地址,即可实现nginx代理到spring-boot. ## spring-boot 放到gateway后面 也就是所谓的spring-cloud 微服务架构. gateway添加ws协议的路由 ```yml # IM - id: im uri: ws://localhost:7890 predicates: - Path=/im/** filters: - StripPrefix=1 ``` 访问gateway代理之后的地址,即可实现nginx代理到spring-boot. ## spring-boot 放到nginx gateway后面 将前面两者进行结合, nginx保证可以代理到gateway, gateway再路由到spring-boot. ## ws升级为wss 网上的做法是, 给gateway\spring-boot都配置证书. 简单才能高效,既然gateway有防火墙验证证书等功能,应用不需要管理才对. nginx要屏蔽这种差异. 配置nginx 直接将wss的请求重写为ws. nginx重写协议 ```nginx map $http_upgrade $connection_upgrade { default upgrade; '' close; } server { listen 443 ssl http2; server_name #SSL-START SSL相关配置,请勿删除或修改下一行带注释的404规则 ssl on; ssl_certificate ssl_certificate_key add_header Strict-Transport-Security "max-age=31536000"; error_page 497 https://$host$request_uri; location /im/ { proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; proxy_pass http://127.0.0.1:18080/im/; rewrite ^(.*)wss://(.*)$ $1ws://$2 permanent; } } ``` 这样 wss://域名/im/websocket就可以进行访问了. # 其他 源码下载地址: https://gitee.com/guchuanhang/imapplication.git ## springboot打包 1. 注释掉skip ```xml org.springframework.boot spring-boot-maven-plugin ${spring-boot.version} com.example.im.ImApplication repackage repackage ``` 2. springboot日志 spring-boot 默认支持logback ```java @Slf4j public class WebSocketServer extends TextWebSocketHandler { log.info("New connection established: " + session.getId()); ``` 3. bootstrap.yml bootstrap.yml 是 spring-cloud 配置文件. application.yml applicaition.properties 是 spring-boot 的配置文件. 4. wss测试工具 wscat ``` npm install -g wscat # 安装方式 wscat -c wss://www.baidu.com/im/websocket ```