代码拉取完成,页面将自动刷新
package nio;
import util.ChannelUtil;
import util.Logger;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by huangyinhuang on 2/1/2018.
* connect with socket client with fixed child thread
* child thread will be blocked by client connection
*/
public class MultipleThreadSelectorServer {
private static ExecutorService threadPool;
private static HashSet<SocketChannel> threadHashSet;
public static void main(String[] args) throws Exception {
int cpu = Runtime.getRuntime().availableProcessors();
threadPool = Executors.newFixedThreadPool(cpu);
threadHashSet = new HashSet<>();
// please note: configure channel blocking state as false
Logger.info("start the server.");
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
ServerSocket serverSocket = server.socket();
serverSocket.bind(new InetSocketAddress(9000));
// register the channel
Selector selector = Selector.open();
server.register(selector, SelectionKey.OP_ACCEPT);
// check new connection's events at intervals
int count = 0;
while (true) {
Logger.info("Wait for connection, count = " + count++);
Thread.sleep(1000);
// please note: selector.select() will block current thread
if (selector.select() == 0) {
continue;
}
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
Logger.info("Receive selection key from channel: " + key.channel());
if (key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
Logger.info("a connection was accepted by a ServerSocketChannel.");
ServerSocketChannel srvChannel = (ServerSocketChannel) key.channel();
Logger.printServerInfo(srvChannel);
// connected with client
SocketChannel socket = srvChannel.accept();
socket.configureBlocking(false);
socket.register(selector, SelectionKey.OP_READ);
} else if (key.isConnectable()) {
// a connection was established with a remote server.
Logger.info("a connection was established with a remote server.");
} else if (key.isReadable()) {
// a channel is ready for reading
Logger.info("a channel is ready for reading.");
SocketChannel channel = (SocketChannel) key.channel();
Logger.printClientInfo(channel);
// handle upcoming new channel by thread
if (!threadHashSet.contains(channel)) {
threadPool.submit(new HandlerThread(channel));
threadHashSet.add(channel);
}
} else if (key.isWritable()) {
// a channel is ready for writing
Logger.info("a channel is ready for writing.");
}
keyIterator.remove();
}
}
}
public static class HandlerThread implements Runnable {
private SocketChannel channel;
public HandlerThread(SocketChannel channel) {
this.channel = channel;
}
@Override
public void run() {
Logger.info("try to read the message from client.");
ByteBuffer buffer = ByteBuffer.allocate(64);
while (!IsReadFinished(buffer)) {
buffer.flip();
if (buffer.hasRemaining()) {
byte[] msgBytes = new byte[buffer.remaining()];
buffer.get(msgBytes);
Logger.printNewMessage(new String(msgBytes).trim());
// send a reply message
sendReplyMsg();
}
buffer.clear();
}
Logger.info("Client has closed the connection, shutdown the channel.");
ChannelUtil.close(channel);
//threadHashSet.remove(channel);
}
void sendReplyMsg() {
try {
String msg = "Hello client, this is server implemented by multiple-thread selector.\r\n";
channel.write(ByteBuffer.wrap(msg.getBytes()));
} catch (IOException e) {
Logger.info("An exception is thrown when trying to reply client. Details: " + e.getMessage());
}
}
Boolean IsReadFinished(ByteBuffer output) {
Boolean finished = false;
try {
finished = (channel.read(output) == -1);
} catch (IOException e) {
Logger.info("An exception is thrown when trying to read bytes from client. Details: " + e.getMessage());
finished = true;
}
return finished;
}
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。