NIO一个boss多个worker模型
# 一个boss多个worker模型
>s 一个boss选择器处理分发客户端请求,多个worker选择器执行客户端的注册和数据的读取
<img src="https://oscimg.oschina.net/oscnet/up-6c7b8f62a32069de93ddb90396f37a636f2.png" width=600 height=500>
## Service主线程
```java
public class BossAndWorkerSelector {
private ServerSocketChannel server = null;
private int port = 9090;
private Selector selector;
private Selector selector1;
private Selector selector2;
public void initServer() throws IOException {
server = ServerSocketChannel.open();
server.bind(new InetSocketAddress(port));
selector = Selector.open();
selector1 = Selector.open();
selector2 = Selector.open();
server.register(selector, SelectionKey.OP_ACCEPT);
}
public static void main(String[] args) throws InterruptedException, IOException {
BossAndWorkerSelector service = new BossAndWorkerSelector();
service.initServer();
NioThread nioThread1 = new NioThread(service.selector, 2);
NioThread nioThread2 = new NioThread(service.selector1);
NioThread nioThread3 = new NioThread(service.selector2);
nioThread1.start();
Thread.sleep(1000L);
nioThread2.start();
nioThread3.start();
System.err.println("service started .... ");
}
}
```
## NIO线程
>i boss/worker公用线程(可以优化 拆成BossThread和WorkerThread)
```java
/**
* @Author gaolei
* @Date 2021/4/3 5:41 下午
* @Version 1.0
*
* 参照https://www.bilibili.com/video/BV1gv411h7PS?p=5&spm_id_from=pageDriver
* 优化: NIO线程拆分 BossThread 线程 和 WorkerThread 分开
*/
public class NioThread extends Thread {
private Selector selector = null;
// worker选择器的数量
static int selectors = 0;
int id = 0;
boolean isBoss = false;
// 两个客户端channel队列
static BlockingDeque<SocketChannel>[] queue;
// 索引
static AtomicInteger idx = new AtomicInteger();
/**
* 初始化boss线程
*/
NioThread(Selector sel, int n) {
this.selector = sel;
// worker 的数量
selectors = n;
isBoss = true;
//初始化LinkedBlockingDeque
queue = new LinkedBlockingDeque[n];
for (int i = 0; i < n; i++) {
queue[i] = new LinkedBlockingDeque<>();
}
System.err.println("Boss init and started ... ");
}
/**
* 初始化worker线程
* @param sel
*/
NioThread(Selector sel) {
this.selector = sel;
// 设置worker线程的id
// worker线程的id 和 BlockingDeque<SocketChannel>[] queue 对应
id = idx.getAndIncrement() % selectors;
System.err.println("worker : " + id + "started ... ");
}
@Override
public void run() {
try {
while (true) {
// boss 选择器处理 连接建立部分
while (selector.select(10L) > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// boss线程只会走isAcceptable
if (key.isAcceptable()) {
acceptHandler(key);
} else if (key.isReadable()) {
// worker线程只会走isReadable
readHandle(key);
}
}
}
// worker注册客户端 ,注册之后,下一次循环就会走 key.isReadable()去读取数据
if (!isBoss && !queue[id].isEmpty()) {
SocketChannel client = queue[id].take();
ByteBuffer buffer = ByteBuffer.allocate(1024);
client.register(selector, SelectionKey.OP_READ, buffer);
System.err.println("client " + client.getRemoteAddress() + "分配到worker " + id);
}
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
public void acceptHandler(SelectionKey key) throws IOException {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
client.configureBlocking(false);
int num = idx.getAndIncrement() % selectors;
queue[num].add(client);
}
public void readHandle(SelectionKey key) throws IOException {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
buffer.clear();
int read = 0;
while (true) {
read = client.read(buffer);
if (read > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
client.write(buffer);
}
buffer.clear();
} else if (read == 0) {
break;
} else {
// client 发生错误 或者断开 read == -1
// 导致空转 最终CPU达到100%
client.close();
break;
}
}
}
}
```
## 模型讲解
>i boss线程
- boss线程首先初始化,初始化的时候绑定selector,这个selector同时是绑定了ServerSocketChannel实例的
- boss初始化,将NioThiread中的static变量都初始化了
> static BlockingDeque<SocketChannel>[] queue;
static AtomicInteger idx = new AtomicInteger();
- boss只会执行acceptHandler(key);方法,将客户端均匀的存放到队列中
>w woeker线程
- worker线程在循环中不断判断自己的queue中是否又客户端,如果有的话,就注册read到当前线程拥有的选择器上
- 如果上一步有客户端,而且注册成功之后,下一步worker线程就会执行readHandle方法去读取客户端数据