代码拉取完成,页面将自动刷新
package net.dwade.livechat.websocket.client;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.client.CookieStore;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.cookie.Cookie;
import org.apache.http.impl.client.BasicCookieStore;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.cookie.BasicClientCookie;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.messaging.simp.stomp.StompFrameHandler;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.messaging.WebSocketStompClient;
import org.springframework.web.socket.sockjs.client.SockJsClient;
import org.springframework.web.socket.sockjs.client.Transport;
import org.springframework.web.socket.sockjs.client.UndertowXhrTransport;
import org.springframework.web.socket.sockjs.client.WebSocketTransport;
import org.springframework.web.socket.sockjs.frame.Jackson2SockJsMessageCodec;
import org.springframework.web.socket.sockjs.frame.SockJsMessageCodec;
import org.xnio.OptionMap;
import org.xnio.Options;
import com.alibaba.fastjson.JSONObject;
import net.dwade.livechat.websocket.beans.SubscribeType;
import net.dwade.livechat.websocket.beans.WebSocketRequestMessage;
/**
* Websocket访客端
* @author huangxf
* @date 2016年12月26日
*/
public class ChatWebsocketClient implements ChatClient {
/** http请求头User-Agent */
private String userAgent;
/** index请求的URL地址 */
private String indexUrl;
/** pull请求的URL地址 */
private String pullUrl;
/** 访问websocket的URL地址 */
private String chatUrl;
/** 域名,没有域名的用主机ip代替 */
private String domain;
/** Http请求的SessionId */
private String httpSessionId;
public static final String STATUS_WATING = "0";
public static final String STATUS_ACCEPTED = "1";
public static final String SUCCESS = "000000";
private static final Logger logger = LoggerFactory.getLogger( ChatWebsocketClient.class );
/**
* 如果创建多个实例,线程会出现Blocked,因为会去加载类
* @see {@link Jackson2SockJsMessageCodec#Jackson2SockJsMessageCodec()}
* @see {@link Jackson2ObjectMapperBuilder#build()}
*/
private static final SockJsMessageCodec MESSAGE_CODEC = new Jackson2SockJsMessageCodec();
private static final ThreadPoolTaskScheduler taskScheduler;
private final SockJsClient sockJsClient;
private CookieStore cookieStore = null;
private HttpClientContext context = null;
/** 排除轮询次数 */
private AtomicInteger pullCount = new AtomicInteger();
/** 发送消息次数统计 */
private AtomicInteger sendCount = new AtomicInteger();
/** 最多排除轮询次数 */
public static final int MAX_PULL = 30;
/**
* 会话ID
*/
private String chatSessionId;
protected WebSocketStompClient stompClient;
protected StompSession stompSession;
//初始化任务调度器,用于保持websocket心跳
static {
ThreadPoolTaskScheduler theadPoolScheduler = new ThreadPoolTaskScheduler();
theadPoolScheduler.setPoolSize( 100 );
theadPoolScheduler.setThreadNamePrefix( "WebsocketHeartBeat-" );
theadPoolScheduler.initialize();
taskScheduler = theadPoolScheduler;
}
public ChatWebsocketClient(String userAgent, String indexUrl, String pullUrl,
String chatUrl, String domain, String httpSessionId) {
this.userAgent = userAgent;
this.indexUrl = indexUrl;
this.pullUrl = pullUrl;
this.chatUrl = chatUrl;
this.domain = domain;
this.httpSessionId = httpSessionId;
//初始化通道以及SockJsClient
Transport webSocketTransport = new WebSocketTransport( new StandardWebSocketClient() );
List<Transport> transports = Collections.singletonList( webSocketTransport );
this.sockJsClient = new SockJsClient( transports );
sockJsClient.setMessageCodec( MESSAGE_CODEC );
this.postConstruct();
}
protected void postConstruct() {
cookieStore = new BasicCookieStore();
context = HttpClientContext.create();
context.setCookieStore( cookieStore );
BasicClientCookie cookie = new BasicClientCookie( "SESSION", this.httpSessionId );
//需要设置域名,否则不能将cookie传递到服务端
cookie.setDomain( this.domain );
cookieStore.addCookie( cookie );
}
/**
* index请求
*/
public void index() {
CloseableHttpClient client = null;
CloseableHttpResponse response = null;
try {
client = HttpClients.custom().setDefaultCookieStore( cookieStore ).build();
HttpGet get = new HttpGet( indexUrl );
get.setHeader( "User-Agent", userAgent );
//传入context对象,便于保存cookies
response = client.execute( get, context );
int status = response.getStatusLine().getStatusCode();
if ( status < 200 && status > 300 ) {
throw new RuntimeException( "index请求错误, http status:" + status );
}
HttpEntity entity = response.getEntity();
String json = EntityUtils.toString( entity, "UTF-8" );
logger.info( "index response:{}", json );
String retCode = JSONObject.parseObject( json ).getString( "retCode" );
if ( !SUCCESS.equals( retCode ) ) {
throw new RuntimeException( "index请求错误, json result:" + json );
}
} catch( Exception e ) {
throw new RuntimeException( "index response error", e );
} finally {
IOUtils.closeQuietly( response );
IOUtils.closeQuietly( client );
}
}
/**
* 排队轮询
*/
public void clientPull() {
CloseableHttpClient client = null;
CloseableHttpResponse response = null;
try {
client = HttpClients.custom().setDefaultCookieStore( cookieStore ).build();
HttpPost method = new HttpPost( pullUrl );
method.setHeader( "User-Agent", userAgent );
//传入context对象,便于保存cookies
response = client.execute( method, context );
HttpEntity entity = response.getEntity();
String json = EntityUtils.toString( entity, "UTF-8" );
logger.info( "Client pull:{}", json );
//处理pull结果
JSONObject result = JSONObject.parseObject( json );
String status = result.getString( "status" );
//排队成功,初始化会话数据
if ( STATUS_ACCEPTED.equals( status ) ) {
chatSessionId = result.getString( "chatSessionId" );
}
else if ( STATUS_WATING.equals( status ) ) {
//继续排队
int times = pullCount.incrementAndGet();
if ( times > MAX_PULL ) {
throw new RuntimeException( "轮询排队超过上限" );
}
Thread.sleep( 3000 );
clientPull();
}
else {
throw new RuntimeException( "client pull error:" + json );
}
} catch( Exception e ) {
throw new RuntimeException( "client pull response error", e );
} finally {
IOUtils.closeQuietly( response );
IOUtils.closeQuietly( client );
}
}
/**
* Websocket连接,<strong>注意:订阅阶段是异步的,需要根据实际情况进行处理</strong>
*/
public void connect() {
try {
standardWebsocket();
} catch (Exception e) {
throw new RuntimeException( "连接失败", e );
}
}
public void disconnect() {
//释放资源
if ( stompClient != null ) {
stompClient.stop();
}
if ( stompSession != null ) {
stompSession.disconnect();
}
}
public WebSocketRequestMessage sendMessage( String text ) {
final WebSocketRequestMessage msg = new WebSocketRequestMessage();
msg.setWebsocketRequestId( this.stompSession.getSessionId() + "-" + sendCount.incrementAndGet() );
msg.setContentType( "0" );
msg.setMessageType( "0" );
msg.setPostTime( new Date() );
msg.setMessage( text );
try {
this.beforeSendMessage( msg );
byte[] data = JSONObject.toJSONString( msg ).getBytes( "UTF-8" );
stompSession.send( "/app/sendMessage", data );
} catch (UnsupportedEncodingException e) {
logger.error( "消息编码异常", e );
}
return msg;
}
/**
* 供子类扩展
* @param message
* @return void
*/
protected void beforeSendMessage( WebSocketRequestMessage message ) { }
/**
* 订阅消息的回调方法
*/
protected void subscribeCallback( StompSession session, SubscribeType type, String textMessage ) {
logger.debug( "系统消息已订阅成功" );
}
/**
* Websocket连接已建立,但是未订阅成功
*/
protected void afterConnected( StompSession session ) {
logger.debug( "Websocket连接成功" );
}
protected void standardWebsocket() throws Exception {
//主要目的是设置Cookie请求头,注意格式,Cookie: SESSION=bbc43bd3-b38c-40d0-bf53-ad9967a11254
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.set( HttpHeaders.COOKIE, "SESSION=" + this.getHttpSessionId() );
WebSocketHttpHeaders wsHeaders = new WebSocketHttpHeaders( httpHeaders );
logger.info( "WebSocketHttpHeaders:{}", wsHeaders );
this.stompClient = new WebSocketStompClient( sockJsClient );
stompClient.setTaskScheduler( taskScheduler );
ListenableFuture<StompSession> future = stompClient.connect( chatUrl, wsHeaders, new SimpleStompSessionHandler() );
//阻塞连接
StompSession session = future.get();
subscribe( session );
this.stompSession = session;
afterConnected( session );
}
/**
* 通过UndertowXhrTransport实现双向通信,模拟浏览器使用xhrStreaming通讯
* UndertowXhrTransport实例需要设置OptionMap
* https://fossies.org/linux/spring-framework/docs/javadoc-api/org/springframework/web/socket/sockjs/client/UndertowXhrTransport.html
* @see UndertowXhrTransport#UndertowXhrTransport(OptionMap)
*/
protected void xhrStreamWebsocket() throws Exception {
//主要目的是设置Cookie请求头,注意格式,Cookie: SESSION=bbc43bd3-b38c-40d0-bf53-ad9967a11254
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.set( HttpHeaders.COOKIE, "SESSION=" + this.getHttpSessionId() );
WebSocketHttpHeaders wsHeaders = new WebSocketHttpHeaders( httpHeaders );
System.out.println( wsHeaders );
OptionMap optionMap = OptionMap.builder()
.set(Options.WORKER_IO_THREADS, 1)
.set(Options.TCP_NODELAY, true)
.set(Options.KEEP_ALIVE, true)
.set(Options.WORKER_NAME, "SockJSClient")
.getMap();
Transport xhrTransport = new UndertowXhrTransport( optionMap );
List<Transport> transports = Collections.singletonList( xhrTransport );
SockJsClient sockJsClient = new SockJsClient( transports );
sockJsClient.setMessageCodec( new Jackson2SockJsMessageCodec() );
this.stompClient = new WebSocketStompClient( sockJsClient );
ListenableFuture<StompSession> future = stompClient.connect( chatUrl, wsHeaders, new SimpleStompSessionHandler() );
StompSession session = future.get();
this.stompSession = session;
subscribe( session );
}
protected void subscribe( final StompSession session ) {
session.subscribe( SubscribeType.CHAT_MESSAGE.destination, new StompFrameHandler() {
@Override
public void handleFrame(StompHeaders headers, Object payload) {
String result = new String( ( byte[] ) payload );
subscribeCallback( session, SubscribeType.CHAT_MESSAGE, result );
}
@Override
public Type getPayloadType(StompHeaders headers) {
return byte[].class;
}
});
session.subscribe( SubscribeType.CHAT_RESPONSE.destination, new StompFrameHandler() {
@Override
public void handleFrame(StompHeaders headers, Object payload) {
String result = new String( ( byte[] ) payload );
subscribeCallback( session, SubscribeType.CHAT_RESPONSE, result );
}
@Override
public Type getPayloadType(StompHeaders headers) {
return byte[].class;
}
});
session.subscribe( SubscribeType.SYSTEM_CLOSE.destination, new StompFrameHandler() {
@Override
public void handleFrame(StompHeaders headers, Object payload) {
String result = new String( ( byte[] ) payload );
subscribeCallback( session, SubscribeType.SYSTEM_CLOSE, result );
}
@Override
public Type getPayloadType(StompHeaders headers) {
return byte[].class;
}
});
//订阅系统消息
session.subscribe( SubscribeType.SYSTEM_MESSAGE.destination, new StompFrameHandler() {
@Override
public void handleFrame(StompHeaders headers, Object payload) {
String result = new String( ( byte[] ) payload );
logger.info( "Subscribe system message:", result );
subscribeCallback( session, SubscribeType.SYSTEM_MESSAGE, result );
}
@Override
public Type getPayloadType(StompHeaders headers) {
return byte[].class;
}
});
}
private String getHttpSessionId() {
if ( StringUtils.isNotBlank( this.httpSessionId ) ) {
return this.httpSessionId;
}
List<Cookie> cookies = cookieStore.getCookies();
for ( Cookie cookie : cookies ) {
if ( "SESSION".equals( cookie.getName() ) ) {
return cookie.getValue();
}
}
return null;
}
public String getUserAgent() {
return userAgent;
}
public String getIndexUrl() {
return indexUrl;
}
public String getPullUrl() {
return pullUrl;
}
public String getChatUrl() {
return chatUrl;
}
public String getDomain() {
return domain;
}
public String getChatSessionId() {
return chatSessionId;
}
@Override
public String toString() {
return "ChatWebsocketClient [userAgent=" + userAgent + ", indexUrl="
+ indexUrl + ", pullUrl=" + pullUrl + ", chatUrl=" + chatUrl
+ ", domain=" + domain + ", httpSessionId=" + httpSessionId
+ ", chatSessionId=" + chatSessionId + "]";
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。