NIO-Reactor模型梳理与demo实现

发布于:2025-02-24 ⋅ 阅读:(16) ⋅ 点赞:(0)

关于NIO,我们在上一篇 linux下网络编程socket&select&epoll的底层实现原理 就介绍了网络阻塞IO、以及基于事件驱动的非阻塞IO。对于NIO的API基本使用是java提供的接口,然后我们在业务上对NIO的使用,也是有不同的使用方法的。然后在我们的网络应用服务器的开发对NIO的使用,一般是基于Reactor模型,Reactor模型主要有3种:单Reactor单线程模型、单Reactor多线程模型、多Reactor多线程模型。下面我们就具体梳理下这3种模型,本篇主要是基于<>进行对应整理。

一、网络服务

​ 对于一般的网络服务、分布式应用,都有一些基本的结构流程。也就是说,我们可以将网络服务分为这5种具体的处理步骤。Reactor模型主要就是看怎么基于NIO的API,充分利用多CPU多线程来合理加快各个环节的处理效率。

1、读取请求数据

2、对请求数据进行解码

3、对数据进行处理

4、对回复数据进行编码

5、发送回复

二、阻塞IO经典的网络服务器设计

​ 然后在网络服务的处理中,最基本的类型就是这种,对于每个请求,新加一个线程进行处理。

在这里插入图片描述

​ 在这种模型中,主要是阻塞的,当一个连接没有断,就会一直占用到线程。这个我们上一篇也有提到这个

在这里插入图片描述

​ 在这种模型种,主要就是新接受到一个Socket就要新加一个线程进行处理,一直到这个socket释放,就即使这个socket没有进行操作也要一直阻塞占用这个线程

package org.example.reactor;

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;

class ClassicServer{

    public static void main(String[] args) {
        ClassicServer classicServer = new ClassicServer();
        classicServer.start();
    }

    public static final int PORT  = 9999;

    public void start() {
        try {
            ServerSocket ss = new ServerSocket(PORT);
            while (!Thread.interrupted()) {
                new Thread(new Handler(ss.accept())).start();
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
 
    static class Handler implements Runnable {
        final Socket socket;

        Handler(Socket s) {
            socket = s;
        }

        public void run() {
            while (true) {
                try {
                    InputStream inputStream = socket.getInputStream();
                    byte[] read = read(inputStream);
                    String decode = decode(read);
                    String compute = compute(decode);
                    byte[] encode = encode(compute);
                    send(socket, encode);
                } catch (IOException ex) {
                    ex.printStackTrace();
                }
            }
        }

        private byte[] read(InputStream inputStream) throws IOException {
            byte[] input = new byte[1024];
            int read = inputStream.read(input);
            if (read > 0){
                byte[] returnBytes = new byte[read];
                System.arraycopy(input,0,returnBytes,0,read);
                return returnBytes;
            }
            return new byte[0];
        }

        private String decode(byte[] bytes){
            return new String(bytes);
        }

        private String compute(String msg){
            System.out.println("print server msg ---- " + msg);
            return "Hello Client, I am Server";
        }

        private byte[] encode(String returnMsg){
            return returnMsg.getBytes(StandardCharsets.UTF_8);
        }

        private void send(Socket socket, byte[] returnBytes){
            try {
                socket.getOutputStream().write(returnBytes);
                socket.getOutputStream().flush();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

三、Reactor模型

​ 然后对应Reactor模型,其使用NIO,然后进行拆分的概念。例如可以将accept提出来,然后对于应用来说,主要耗时的是在与业务相关,也可以将compute拿出来放到线程池来处理。

​ 基于NIO,其是基于事件驱动,IO多路复用。这个我们上一篇也有提到这个:

在这里插入图片描述

​ 这种的话,因为是select轮询,不用一直accept、read阻塞,一个线程就能管理多个SocketChannel。

1、单线程单Reactor模型

1)、基本介绍

在这里插入图片描述

​ 可以看到,在单线程单Reactor模型中,将accept交给了Acceptor来处理,同时对于事件进行dispatch分发。

下面我们看下这种模型的具体实现:

2)、demo实现案例

对于Server端:

package org.example.reactor.single;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;

class SingleThreadSingleReactor{
    private final Selector selector;
    private final ServerSocketChannel serverSocket;
    SingleThreadSingleReactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.configureBlocking(false);
        serverSocket.socket().bind(new InetSocketAddress(port));
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);
    }

    public static void main(String[] args) throws IOException {
        SingleThreadSingleReactor singleThreadSingleReactor = new SingleThreadSingleReactor(6666);
        singleThreadSingleReactor.dispatcher();
    }
    
    public void dispatcher() throws IOException {
        System.out.println("server start dispatcher-----------");
        Acceptor acceptor = new Acceptor();
        Handler handler = new Handler();
        while (true){
            int select = selector.select(200);
            if (select > 0){
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
                while (keyIterator.hasNext()){
                    SelectionKey selectionKey = keyIterator.next();
                    if (selectionKey.isAcceptable()){
                        System.out.println("client acceptor-----------");
                        acceptor.acceptor();
                    }else if (selectionKey.isReadable()){
                        System.out.println("server read msg ---------");
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                        byte[] read = read(socketChannel);
                        if (read.length > 0){
                            byte[] handlerMsg = handler.handler(read);
                            if (handlerMsg != null){
                                send(socketChannel,handlerMsg);
                            }
                        }
                    }
                    keyIterator.remove();
                }
            }
        }
    }
    

    private byte[] read(SocketChannel socket) throws IOException {
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        int read = socket.read(byteBuffer);
        if (read > 0){
            byte[] returnBytes = new byte[read];
            System.arraycopy(byteBuffer.array(),0,returnBytes,0,read);
            return returnBytes;
        }
        return new byte[0];
    }

    private void send(SocketChannel socketChannel,byte[] bytes) throws IOException {
        socketChannel.write(ByteBuffer.wrap(bytes));
    }
    
    // Acceptor 连接处理类
    class Acceptor {
        public void acceptor() {
            try {
                SocketChannel c = serverSocket.accept();
                c.configureBlocking(false);
                c.register(selector,SelectionKey.OP_READ);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public class Handler {
        public byte[] handler(byte[] bytes) {
            //按业务含义去解析获取数据
            String decode = decode(bytes);
            //业务处理
            String compute = compute(decode);
            //业务处理后去进行返回编码
            byte[] encode = encode(compute);
            return encode;
        }

        private String decode(byte[] bytes){
            return new String(bytes);
        }

        private String compute(String msg){
            System.out.println("msg compute handler start------------");
            System.out.println("print client msg ---- " + msg);
            System.out.println("msg compute handler end------------");
            return "server compute msg return";
        }

        private byte[] encode(String returnMsg){
            return returnMsg.getBytes(StandardCharsets.UTF_8);
        }
    }
}

对于Client端:

package org.example.reactor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;

public class NioClientMain {

    public static void main(String[] args) {
        for (int i = 0; i < 4; i++) {
            Thread.sleep(1000);
            new Thread(() -> {
                try {
                    multiThread();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }).start();
        }
    }

    private static void multiThread() throws IOException, InterruptedException {
        Selector selector = Selector.open();
        SocketChannel clientSocketChannel = SocketChannel.open();
        clientSocketChannel.configureBlocking(false);
        clientSocketChannel.connect(new InetSocketAddress("127.0.0.1",6666));
        clientSocketChannel.register(selector, SelectionKey.OP_CONNECT);
        while (true){
            int select = selector.select(200);
            if (select > 0){
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
                while (keyIterator.hasNext()){
                    SelectionKey selectionKey = keyIterator.next();
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    if (selectionKey.isConnectable()){
                        socketChannel.finishConnect();
                        System.out.println("server is connect-------");
                        socketChannel.register(selector,SelectionKey.OP_WRITE);
                    }else if (selectionKey.isWritable()){
                        Thread.sleep(3000);
                        System.out.println("client send msg-------" + Thread.currentThread().getName() );
                        String sendMsg = Thread.currentThread().getName() + ": Hello Server";
                        ByteBuffer byteBuffer = ByteBuffer.wrap(sendMsg.getBytes(StandardCharsets.UTF_8));
                        socketChannel.write(byteBuffer);
                        socketChannel.register(selector,SelectionKey.OP_READ);
                    }else if (selectionKey.isReadable()){
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        int read = socketChannel.read(byteBuffer);
                        if (read > 0){
                            byte[] bytes = new byte[read];
                            byteBuffer.flip();
                            byteBuffer.get(bytes);
                            System.out.println(Thread.currentThread().getName() + " current client ip " + socketChannel.getLocalAddress());
                            System.out.println(Thread.currentThread().getName() + " server msg: "+ new String(bytes));
                            socketChannel.register(selector,SelectionKey.OP_WRITE);
                        }
                    }
                    keyIterator.remove();
                }
            }
        }
    }

}

运行结果

server start dispatcher-----------
client acceptor-----------
client acceptor-----------
client acceptor-----------
client acceptor-----------
server read msg ---------
msg compute handler start------------
print client msg ---- Thread-0: Hello Server
msg compute handler end------------
server read msg ---------
msg compute handler start------------
print client msg ---- Thread-1: Hello Server
msg compute handler end------------
server read msg ---------
msg compute handler start------------
print client msg ---- Thread-3: Hello Server
msg compute handler end------------
server read msg ---------
msg compute handler start------------
print client msg ---- Thread-2: Hello Server
msg compute handler end------------
server is connect-------
server is connect-------
server is connect-------
server is connect-------
client send msg-------Thread-3
client send msg-------Thread-2
client send msg-------Thread-1
client send msg-------Thread-0
Thread-0 current client ip /127.0.0.1:7623
Thread-0 server msg: server compute msg return
Thread-1 current client ip /127.0.0.1:7621
Thread-1 server msg: server compute msg return
Thread-3 current client ip /127.0.0.1:7622
Thread-3 server msg: server compute msg return
Thread-2 current client ip /127.0.0.1:7620
Thread-2 server msg: server compute msg return

2、多线程单Reactor模型

1)、基本介绍

​ 在上面的单线程单Reactor模型中,我们可以看到,其的业务是单线程的,前一个的处理会阻塞后面的处理。下面我们来看下多线程的版本。

​ 在这种模型中,其将业务处理(decode、compute、encode)拿出来交给workerThreads线程池来处理了。这样的话,前一个业务处理就不会影响到后一个的逻辑处理了。

在这里插入图片描述

2)、demo实现案例

server端:

package org.example.reactor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class MultiThreadSingleReactor {
    private final Selector selector;
    private final ServerSocketChannel serverSocket;
    private ExecutorService executorService;

    MultiThreadSingleReactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.configureBlocking(false);
        serverSocket.socket().bind(new InetSocketAddress(port));
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        executorService = Executors.newFixedThreadPool(4);
    }

    public static void main(String[] args) throws IOException {
        MultiThreadSingleReactor singleThreadSingleReactor = new MultiThreadSingleReactor(6666);
        singleThreadSingleReactor.dispatcher();
    }
    
    public void dispatcher() throws IOException {
        System.out.println("server start dispatcher-----------");
        Acceptor acceptor = new Acceptor();
        Handler handler = new Handler();
        while (true){
            int select = selector.select(200);
            if (select > 0){
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
                while (keyIterator.hasNext()){
                    SelectionKey selectionKey = keyIterator.next();
                    if (selectionKey.isAcceptable()){
                        System.out.println("client acceptor-----------");
                        acceptor.acceptor();
                    }else if (selectionKey.isReadable()){
                        System.out.println("server read msg ---------");
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                        //dispatcher 读数据
                        byte[] read = read(socketChannel);
                        if (read.length > 0){
                            //线程池进行 decode、computer、encode处理
                            executorService.submit(() -> {
                                try {
                                    handler.handler(socketChannel,read);
                                } catch (ClosedChannelException e) {
                                    throw new RuntimeException(e);
                                }
                            });
                        }
                    }else if (selectionKey.isWritable()){
                        byte[] bytes = (byte[]) selectionKey.attachment();
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                        if (Objects.nonNull(bytes)){
                            //dispatcher 写数据
                            send(socketChannel,bytes);
                        }
                    }
                    keyIterator.remove();
                }
            }
        }
    }
    

    private byte[] read(SocketChannel socket) throws IOException {
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        int read = socket.read(byteBuffer);
        if (read > 0){
            byte[] returnBytes = new byte[read];
            System.arraycopy(byteBuffer.array(),0,returnBytes,0,read);
            return returnBytes;
        }
        return new byte[0];
    }

    private void send(SocketChannel socketChannel,byte[] bytes) throws IOException {
        socketChannel.write(ByteBuffer.wrap(bytes));
        socketChannel.register(selector,SelectionKey.OP_READ);
    }
    
    // Acceptor 连接处理类
    class Acceptor {
        public void acceptor() {
            try {
                SocketChannel c = serverSocket.accept();
                c.configureBlocking(false);
                c.register(selector, SelectionKey.OP_READ);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public class Handler {
        public void handler(SocketChannel socketChannel, byte[] bytes) throws ClosedChannelException {
            //按业务含义去解析获取数据
            String decode = decode(bytes);
            //业务处理
            String compute = compute(decode);
            //业务处理后去进行返回编码
            byte[] encode = encode(compute);
            SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_WRITE);
            //需要发送的数据
            selectionKey.attach(encode);
        }

        private String decode(byte[] bytes){
            return new String(bytes);
        }

        private String compute(String msg){
            System.out.println("msg compute handler start------------");
            System.out.println("print client msg ---- " + msg);
            System.out.println("msg compute handler end------------");
            return "server compute msg return";
        }

        private byte[] encode(String returnMsg){
            return returnMsg.getBytes(StandardCharsets.UTF_8);
        }
    }
}

​ Client的话,用上一个Client就可以了。

运行结果

server start dispatcher-----------
client acceptor-----------
client acceptor-----------
client acceptor-----------
client acceptor-----------
server read msg ---------
msg compute handler start------------
print client msg ---- Thread-0: Hello Server
msg compute handler end------------
server read msg ---------
msg compute handler start------------
print client msg ---- Thread-1: Hello Server
msg compute handler end------------
server read msg ---------
msg compute handler start------------
print client msg ---- Thread-2: Hello Server
msg compute handler end------------
server read msg ---------
msg compute handler start------------
print client msg ---- Thread-3: Hello Server
msg compute handler end------------
server read msg ---------


server is connect-------
server is connect-------
server is connect-------
server is connect-------
client send msg-------Thread-0
Thread-0 current client ip /127.0.0.1:9953
Thread-0 server msg: server compute msg return
client send msg-------Thread-1
Thread-1 current client ip /127.0.0.1:9956
Thread-1 server msg: server compute msg return
client send msg-------Thread-2
Thread-2 current client ip /127.0.0.1:9959
Thread-2 server msg: server compute msg return
client send msg-------Thread-3
Thread-3 current client ip /127.0.0.1:9962
Thread-3 server msg: server compute msg return

3、多线程多Reactor模型

1)、基本介绍

​ 在上面的多线程单Reactor模型中虽然进行业务处理部分用多线程解决了阻塞问题。但在通过Selector类型事件的时候,还是用的一个Selector,就一个Selector管理全部SocketChannel这个在连接多的时候,这部分就会有瓶颈,同时的话,读、写也是在一个Reactor中。

​ 所以我们可以再优化,就进行多Reactor,一个主Reactor只进行accept,accept后,就再将这个SocketChannel交给子Reactor,同时子Reactor可以设置多个,这就将事件轮询、数据读取、业务处理、数据发送分散到多个Reactor了,更好的应用多线程充分利用CPU了。其具体的模型就是这样了:

在这里插入图片描述

2)、demo实现案例

服务端:

package org.example.reactor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

class SingleThreadMainReactor {
    private final Selector selector;
    private final ServerSocketChannel serverSocket;

    private final MultiThreadSubReactor[] multiThreadSubReactors;

    public static final int mulSubNum = 4;

    private int nextSubIndex = 0;

    SingleThreadMainReactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.configureBlocking(false);
        serverSocket.socket().bind(new InetSocketAddress(port));
        serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        multiThreadSubReactors = new MultiThreadSubReactor[mulSubNum];
        for (int i = 0; i < mulSubNum; i++) {
            multiThreadSubReactors[i] = new MultiThreadSubReactor();
            new Thread(multiThreadSubReactors[i]).start();
        }
    }

    public static void main(String[] args) throws IOException {
        SingleThreadMainReactor singleThreadSingleReactor = new SingleThreadMainReactor(6666);
        singleThreadSingleReactor.accept();
    }
    
    public void accept() throws IOException {
        System.out.println("server start accept-----------");
        Acceptor acceptor = new Acceptor();
        while (true){
            int select = selector.select(200);
            if (select > 0){
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
                while (keyIterator.hasNext()){
                    SelectionKey selectionKey = keyIterator.next();
                    if (selectionKey.isAcceptable()){
                        System.out.println("client acceptor-----------");
                        acceptor.acceptor();
                    }
                    keyIterator.remove();
                }
            }
        }
    }


    // Acceptor 连接处理类
    class Acceptor {
        public void acceptor() {
            try {
                SocketChannel c = serverSocket.accept();
                c.configureBlocking(false);
                nextSubIndex = nextSubIndex%mulSubNum;
                System.out.println("server subReactor register--" + nextSubIndex);
                multiThreadSubReactors[nextSubIndex].register(c);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

}
package org.example.reactor;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class MultiThreadSubReactor implements Runnable{
    private final Selector selector;
    private ExecutorService executorService;

    MultiThreadSubReactor() throws IOException {
        selector = Selector.open();
        executorService = Executors.newFixedThreadPool(4);
    }

    @Override
    public void run() {
        System.out.println("MultiThreadSubReactor start-----------");
        Handler handler = new Handler();
        while (true){
            try {
                int select = selector.select(200);
                if (select > 0){
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
                    while (keyIterator.hasNext()){
                        SelectionKey selectionKey = keyIterator.next();
                        if (selectionKey.isReadable()){
                            System.out.println("subReactor read msg ---------");
                            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                            //dispatcher 读数据
                            byte[] read = read(socketChannel);
                            if (read.length > 0){
                                //线程池进行 decode、computer、encode处理
                                executorService.submit(() -> {
                                    try {
                                        handler.handler(socketChannel,read);
                                    } catch (ClosedChannelException e) {
                                        throw new RuntimeException(e);
                                    }
                                });
                            }
                        }else if (selectionKey.isWritable()){
                            byte[] bytes = (byte[]) selectionKey.attachment();
                            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                            if (Objects.nonNull(bytes)){
                                //dispatcher 写数据
                                send(socketChannel,bytes);
                            }
                        }
                        keyIterator.remove();
                    }
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }


    private byte[] read(SocketChannel socket) throws IOException {
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        int read = socket.read(byteBuffer);
        if (read > 0){
            byte[] returnBytes = new byte[read];
            System.arraycopy(byteBuffer.array(),0,returnBytes,0,read);
            return returnBytes;
        }
        return new byte[0];
    }

    private void send(SocketChannel socketChannel,byte[] bytes) throws IOException {
        socketChannel.write(ByteBuffer.wrap(bytes));
        socketChannel.register(selector,SelectionKey.OP_READ);
    }

    public void register(SocketChannel socketChannel) throws ClosedChannelException {
        socketChannel.register(selector,SelectionKey.OP_READ);
    }


    public class Handler {
        public void handler(SocketChannel socketChannel, byte[] bytes) throws ClosedChannelException {
            //按业务含义去解析获取数据
            String decode = decode(bytes);
            //业务处理
            String compute = compute(decode);
            //业务处理后去进行返回编码
            byte[] encode = encode(compute);
            SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_WRITE);
            //需要发送的数据
            selectionKey.attach(encode);
        }

        private String decode(byte[] bytes){
            return new String(bytes);
        }

        private String compute(String msg){
            System.out.println("msg compute handler start------------");
            System.out.println("print client msg ---- " + msg);
            System.out.println("msg compute handler end------------");
            return "server compute msg return";
        }

        private byte[] encode(String returnMsg){
            return returnMsg.getBytes(StandardCharsets.UTF_8);
        }
    }
}

Client同样使用原来的

运行结果:

MultiThreadSubReactor start-----------
MultiThreadSubReactor start-----------
MultiThreadSubReactor start-----------
server start accept-----------
MultiThreadSubReactor start-----------
client acceptor-----------
server subReactor register--0
client acceptor-----------
server subReactor register--0
client acceptor-----------
server subReactor register--0
client acceptor-----------
server subReactor register--0
subReactor read msg ---------
msg compute handler start------------
print client msg ---- Thread-0: Hello Server
msg compute handler end------------
subReactor read msg ---------
msg compute handler start------------
print client msg ---- Thread-1: Hello Server
msg compute handler end------------
subReactor read msg ---------
msg compute handler start------------
print client msg ---- Thread-2: Hello Server
msg compute handler end------------
subReactor read msg ---------
msg compute handler start------------
print client msg ---- Thread-3: Hello Server
msg compute handler end------------


server is connect-------
server is connect-------
server is connect-------
server is connect-------
client send msg-------Thread-0
Thread-0 current client ip /127.0.0.1:1989
Thread-0 server msg: server compute msg return
client send msg-------Thread-1
Thread-1 current client ip /127.0.0.1:1992
Thread-1 server msg: server compute msg return
client send msg-------Thread-2
Thread-2 current client ip /127.0.0.1:1995
Thread-2 server msg: server compute msg return
client send msg-------Thread-3
Thread-3 current client ip /127.0.0.1:1998
Thread-3 server msg: server compute msg return