1 Star 2 Fork 1

peipeihh/blog

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
MultipleThreadSelectorServer.java 5.48 KB
一键复制 编辑 原始数据 按行查看 历史
huangyinhuang 提交于 2018-02-06 13:59 +08:00 . 添加java nio的演示项目
package nio;
import util.ChannelUtil;
import util.Logger;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by huangyinhuang on 2/1/2018.
* connect with socket client with fixed child thread
* child thread will be blocked by client connection
*/
public class MultipleThreadSelectorServer {
private static ExecutorService threadPool;
private static HashSet<SocketChannel> threadHashSet;
public static void main(String[] args) throws Exception {
int cpu = Runtime.getRuntime().availableProcessors();
threadPool = Executors.newFixedThreadPool(cpu);
threadHashSet = new HashSet<>();
// please note: configure channel blocking state as false
Logger.info("start the server.");
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
ServerSocket serverSocket = server.socket();
serverSocket.bind(new InetSocketAddress(9000));
// register the channel
Selector selector = Selector.open();
server.register(selector, SelectionKey.OP_ACCEPT);
// check new connection's events at intervals
int count = 0;
while (true) {
Logger.info("Wait for connection, count = " + count++);
Thread.sleep(1000);
// please note: selector.select() will block current thread
if (selector.select() == 0) {
continue;
}
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
Logger.info("Receive selection key from channel: " + key.channel());
if (key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
Logger.info("a connection was accepted by a ServerSocketChannel.");
ServerSocketChannel srvChannel = (ServerSocketChannel) key.channel();
Logger.printServerInfo(srvChannel);
// connected with client
SocketChannel socket = srvChannel.accept();
socket.configureBlocking(false);
socket.register(selector, SelectionKey.OP_READ);
} else if (key.isConnectable()) {
// a connection was established with a remote server.
Logger.info("a connection was established with a remote server.");
} else if (key.isReadable()) {
// a channel is ready for reading
Logger.info("a channel is ready for reading.");
SocketChannel channel = (SocketChannel) key.channel();
Logger.printClientInfo(channel);
// handle upcoming new channel by thread
if (!threadHashSet.contains(channel)) {
threadPool.submit(new HandlerThread(channel));
threadHashSet.add(channel);
}
} else if (key.isWritable()) {
// a channel is ready for writing
Logger.info("a channel is ready for writing.");
}
keyIterator.remove();
}
}
}
public static class HandlerThread implements Runnable {
private SocketChannel channel;
public HandlerThread(SocketChannel channel) {
this.channel = channel;
}
@Override
public void run() {
Logger.info("try to read the message from client.");
ByteBuffer buffer = ByteBuffer.allocate(64);
while (!IsReadFinished(buffer)) {
buffer.flip();
if (buffer.hasRemaining()) {
byte[] msgBytes = new byte[buffer.remaining()];
buffer.get(msgBytes);
Logger.printNewMessage(new String(msgBytes).trim());
// send a reply message
sendReplyMsg();
}
buffer.clear();
}
Logger.info("Client has closed the connection, shutdown the channel.");
ChannelUtil.close(channel);
//threadHashSet.remove(channel);
}
void sendReplyMsg() {
try {
String msg = "Hello client, this is server implemented by multiple-thread selector.\r\n";
channel.write(ByteBuffer.wrap(msg.getBytes()));
} catch (IOException e) {
Logger.info("An exception is thrown when trying to reply client. Details: " + e.getMessage());
}
}
Boolean IsReadFinished(ByteBuffer output) {
Boolean finished = false;
try {
finished = (channel.read(output) == -1);
} catch (IOException e) {
Logger.info("An exception is thrown when trying to read bytes from client. Details: " + e.getMessage());
finished = true;
}
return finished;
}
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/pphh/blog.git
git@gitee.com:pphh/blog.git
pphh
blog
blog
master

搜索帮助