1 Star 3 Fork 1

jannal / rabbitmq

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
SocketChannelFrameHandlerState.java 7.46 KB
一键复制 编辑 原始数据 按行查看 历史
jannal 提交于 2018-10-20 19:54 . 删除无用的md
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.
package com.rabbitmq.client.impl.nio;
import com.rabbitmq.client.impl.AMQConnection;
import com.rabbitmq.client.impl.Frame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLEngine;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
*
*/
public class SocketChannelFrameHandlerState {
private static final Logger LOGGER = LoggerFactory.getLogger(SocketChannelFrameHandlerState.class);
/** Time to linger before closing the socket forcefully. */
private static final int SOCKET_CLOSING_TIMEOUT = 1;
private final SocketChannel channel;
private final BlockingQueue<WriteRequest> writeQueue;
private volatile AMQConnection connection;
/** should be used only in the NIO read thread */
private long lastActivity;
private final SelectorHolder writeSelectorState;
private final SelectorHolder readSelectorState;
private final int writeEnqueuingTimeoutInMs;
final boolean ssl;
final SSLEngine sslEngine;
/** outbound app data (to be crypted if TLS is on) */
final ByteBuffer plainOut;
/** inbound app data (deciphered if TLS is on) */
final ByteBuffer plainIn;
/** outbound net data (ciphered if TLS is on) */
final ByteBuffer cipherOut;
/** inbound data (ciphered if TLS is on) */
final ByteBuffer cipherIn;
final DataOutputStream outputStream;
final DataInputStream inputStream;
public SocketChannelFrameHandlerState(SocketChannel channel, NioLoopContext nioLoopsState, NioParams nioParams, SSLEngine sslEngine) {
this.channel = channel;
//读Selector
this.readSelectorState = nioLoopsState.readSelectorState;
//写Selector
this.writeSelectorState = nioLoopsState.writeSelectorState;
//写操作存入阻塞队列,数组形式保存数据
this.writeQueue = new ArrayBlockingQueue<WriteRequest>(nioParams.getWriteQueueCapacity(), true);
this.writeEnqueuingTimeoutInMs = nioParams.getWriteEnqueuingTimeoutInMs();
this.sslEngine = sslEngine;
if(this.sslEngine == null) {
this.ssl = false;
this.plainOut = nioLoopsState.writeBuffer;
this.cipherOut = null;
this.plainIn = nioLoopsState.readBuffer;
this.cipherIn = null;
this.outputStream = new DataOutputStream(
new ByteBufferOutputStream(channel, plainOut)
);
this.inputStream = new DataInputStream(
new ByteBufferInputStream(channel, plainIn)
);
} else {
this.ssl = true;
this.plainOut = ByteBuffer.allocate(sslEngine.getSession().getApplicationBufferSize());
this.cipherOut = ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize());
this.plainIn = ByteBuffer.allocate(sslEngine.getSession().getApplicationBufferSize());
this.cipherIn = ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize());
this.outputStream = new DataOutputStream(
new SslEngineByteBufferOutputStream(sslEngine, plainOut, cipherOut, channel)
);
this.inputStream = new DataInputStream(
new SslEngineByteBufferInputStream(sslEngine, plainIn, cipherIn, channel)
);
}
}
public SocketChannel getChannel() {
return channel;
}
public Queue<WriteRequest> getWriteQueue() {
return writeQueue;
}
public void sendHeader() throws IOException {
sendWriteRequest(new HeaderWriteRequest());
}
public void write(Frame frame) throws IOException {
sendWriteRequest(new FrameWriteRequest(frame));
}
private void sendWriteRequest(WriteRequest writeRequest) throws IOException {
try {
//将写操作入队列,offer当队列满了,返回false不会阻塞
boolean offered = this.writeQueue.offer(writeRequest, writeEnqueuingTimeoutInMs, TimeUnit.MILLISECONDS);
if(offered) {
this.writeSelectorState.registerFrameHandlerState(this, SelectionKey.OP_WRITE);
//注册了写事件,让写事件优先级变高,此时通过wakeup()唤醒
this.readSelectorState.selector.wakeup();
} else {
throw new IOException("Frame enqueuing failed");
}
} catch (InterruptedException e) {
LOGGER.warn("Thread interrupted during enqueuing frame in write queue");
}
}
public void startReading() {
this.readSelectorState.registerFrameHandlerState(this, SelectionKey.OP_READ);
}
public AMQConnection getConnection() {
return connection;
}
public void setConnection(AMQConnection connection) {
this.connection = connection;
}
public void setLastActivity(long lastActivity) {
this.lastActivity = lastActivity;
}
public long getLastActivity() {
return lastActivity;
}
void prepareForWriteSequence() {
if(ssl) {
plainOut.clear();
cipherOut.clear();
}
}
void endWriteSequence() {
if(!ssl) {
plainOut.clear();
}
}
void prepareForReadSequence() throws IOException {
if(ssl) {
cipherIn.clear();
plainIn.clear();
cipherIn.flip();
plainIn.flip();
} else {
NioHelper.read(channel, plainIn);
plainIn.flip();
}
}
boolean continueReading() throws IOException {
if(ssl) {
if (!plainIn.hasRemaining() && !cipherIn.hasRemaining()) {
// need to try to read something
cipherIn.clear();
int bytesRead = NioHelper.read(channel, cipherIn);
if (bytesRead <= 0) {
return false;
} else {
cipherIn.flip();
return true;
}
} else {
return true;
}
} else {
if (!plainIn.hasRemaining()) {
plainIn.clear();
NioHelper.read(channel, plainIn);
plainIn.flip();
}
return plainIn.hasRemaining();
}
}
void close() throws IOException {
if(ssl) {
SslEngineHelper.close(channel, sslEngine);
}
if(channel.isOpen()) {
channel.socket().setSoLinger(true, SOCKET_CLOSING_TIMEOUT);
channel.close();
}
}
}
Java
1
https://gitee.com/jannal/rabbitmq.git
git@gitee.com:jannal/rabbitmq.git
jannal
rabbitmq
rabbitmq
master

搜索帮助

53164aa7 5694891 3bd8fe86 5694891