NIO一个boss多个worker模型

# 一个boss多个worker模型 >s 一个boss选择器处理分发客户端请求,多个worker选择器执行客户端的注册和数据的读取 <img src="https://oscimg.oschina.net/oscnet/up-6c7b8f62a32069de93ddb90396f37a636f2.png" width=600 height=500> ## Service主线程 ```java public class BossAndWorkerSelector { private ServerSocketChannel server = null; private int port = 9090; private Selector selector; private Selector selector1; private Selector selector2; public void initServer() throws IOException { server = ServerSocketChannel.open(); server.bind(new InetSocketAddress(port)); selector = Selector.open(); selector1 = Selector.open(); selector2 = Selector.open(); server.register(selector, SelectionKey.OP_ACCEPT); } public static void main(String[] args) throws InterruptedException, IOException { BossAndWorkerSelector service = new BossAndWorkerSelector(); service.initServer(); NioThread nioThread1 = new NioThread(service.selector, 2); NioThread nioThread2 = new NioThread(service.selector1); NioThread nioThread3 = new NioThread(service.selector2); nioThread1.start(); Thread.sleep(1000L); nioThread2.start(); nioThread3.start(); System.err.println("service started .... "); } } ``` ## NIO线程 >i boss/worker公用线程(可以优化 拆成BossThread和WorkerThread) ```java /** * @Author gaolei * @Date 2021/4/3 5:41 下午 * @Version 1.0 * * 参照https://www.bilibili.com/video/BV1gv411h7PS?p=5&spm_id_from=pageDriver * 优化: NIO线程拆分 BossThread 线程 和 WorkerThread 分开 */ public class NioThread extends Thread { private Selector selector = null; // worker选择器的数量 static int selectors = 0; int id = 0; boolean isBoss = false; // 两个客户端channel队列 static BlockingDeque<SocketChannel>[] queue; // 索引 static AtomicInteger idx = new AtomicInteger(); /** * 初始化boss线程 */ NioThread(Selector sel, int n) { this.selector = sel; // worker 的数量 selectors = n; isBoss = true; //初始化LinkedBlockingDeque queue = new LinkedBlockingDeque[n]; for (int i = 0; i < n; i++) { queue[i] = new LinkedBlockingDeque<>(); } System.err.println("Boss init and started ... "); } /** * 初始化worker线程 * @param sel */ NioThread(Selector sel) { this.selector = sel; // 设置worker线程的id // worker线程的id 和 BlockingDeque<SocketChannel>[] queue 对应 id = idx.getAndIncrement() % selectors; System.err.println("worker : " + id + "started ... "); } @Override public void run() { try { while (true) { // boss 选择器处理 连接建立部分 while (selector.select(10L) > 0) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); // boss线程只会走isAcceptable if (key.isAcceptable()) { acceptHandler(key); } else if (key.isReadable()) { // worker线程只会走isReadable readHandle(key); } } } // worker注册客户端 ,注册之后,下一次循环就会走 key.isReadable()去读取数据 if (!isBoss && !queue[id].isEmpty()) { SocketChannel client = queue[id].take(); ByteBuffer buffer = ByteBuffer.allocate(1024); client.register(selector, SelectionKey.OP_READ, buffer); System.err.println("client " + client.getRemoteAddress() + "分配到worker " + id); } } } catch (IOException | InterruptedException e) { e.printStackTrace(); } } public void acceptHandler(SelectionKey key) throws IOException { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); client.configureBlocking(false); int num = idx.getAndIncrement() % selectors; queue[num].add(client); } public void readHandle(SelectionKey key) throws IOException { SocketChannel client = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); buffer.clear(); int read = 0; while (true) { read = client.read(buffer); if (read > 0) { buffer.flip(); while (buffer.hasRemaining()) { client.write(buffer); } buffer.clear(); } else if (read == 0) { break; } else { // client 发生错误 或者断开 read == -1 // 导致空转 最终CPU达到100% client.close(); break; } } } } ``` ## 模型讲解 >i boss线程 - boss线程首先初始化,初始化的时候绑定selector,这个selector同时是绑定了ServerSocketChannel实例的 - boss初始化,将NioThiread中的static变量都初始化了 > static BlockingDeque<SocketChannel>[] queue; static AtomicInteger idx = new AtomicInteger(); - boss只会执行acceptHandler(key);方法,将客户端均匀的存放到队列中 >w woeker线程 - worker线程在循环中不断判断自己的queue中是否又客户端,如果有的话,就注册read到当前线程拥有的选择器上 - 如果上一步有客户端,而且注册成功之后,下一步worker线程就会执行readHandle方法去读取客户端数据