JAVA基础-使用BIO / NIO实现聊天室功能

发布于:2025-08-10 ⋅ 阅读:(19) ⋅ 点赞:(0)

Scoket&ServerSocket

客户端数据处理器

package org.example.code_case.javaIo.聊天室实现.Socket实现;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Set;

public class ClientHandler implements Runnable {
    private Socket clientSocket;
    private Set<ClientHandler> clients;
    private PrintWriter out;
    private BufferedReader in;
    private String clientName;

    public ClientHandler(Socket accept, Set<ClientHandler> clients) {
        this.clientSocket = accept;
        this.clients = clients;
    }

    @Override
    public void run() {
        try {
            out = new PrintWriter(clientSocket.getOutputStream(),true);
            in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));

            System.out.println("客户端连接成功");
            clientName = in.readLine();
            System.out.println(clientName + "上线了");

            //广播新用户加入
            broadcastMessage("[系统]" + clientName + "加入聊天室");

            //读取客户端消息
            String msg;
            while ((msg = in.readLine()) != null) {
                System.out.println("收到的消息:" + msg);
                broadcastMessage("[" + clientName + "]:" + msg);
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }finally {
            try {
                clients.remove(this);
                if(clientName != null){
                    broadcastMessage("[系统]" + clientName + "离开聊天室");
                }
                if (out != null) out.close();
                if (in != null) in.close();
                if (clientSocket != null) clientSocket.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

    }

    private void broadcastMessage(String msg) {
        for (ClientHandler client : clients) {
            if (client != this) {
                client.out.println(msg);
            }
        }
    }
}

服务端实现

package org.example.code_case.javaIo.聊天室实现.Socket实现;

import org.ehcache.impl.internal.concurrent.ConcurrentHashMap;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class ChatServer {

    private static final int PATH = 8888;
    private static final Set<ClientHandler> CLIENTS = ConcurrentHashMap.newKeySet();
    private static final ExecutorService POOL = Executors.newCachedThreadPool();

    public static void main(String[] args) {
        System.out.println("聊天室服务器启动......");

        try (ServerSocket serverSocket = new ServerSocket(PATH);){
            while (true) {
                Socket accept = serverSocket.accept();
                System.out.println("新客户端连接:"+accept.getInetAddress().getHostAddress());

                ClientHandler clientHandler = new ClientHandler(accept,CLIENTS);
                CLIENTS.add(clientHandler);
                POOL.execute(clientHandler);

            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }


    }

}

客户端实现

package org.example.code_case.javaIo.聊天室实现.Socket实现;

import java.io.*;
import java.net.Socket;
import java.util.Scanner;

public class ChatClient {

    public static void main(String[] args) {
        try (Socket socket = new Socket("127.0.0.1", 8888);
             InputStream in = socket.getInputStream();
             OutputStream out = socket.getOutputStream();
             BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in));
             Scanner scanner = new Scanner(System.in);
             PrintWriter printWriter = new PrintWriter(out, true);) {


            System.out.println("已连接到聊天室.....");

            System.out.println("请输入用户名:");
            String name = scanner.nextLine();
            System.out.println("用户名:" + name);
            printWriter.println(name);

            Thread thread = new Thread(() -> {
                try {
                    String msg;
                    while ((msg = bufferedReader.readLine()) != null) {
                        System.out.println(msg);
                    }
                } catch (IOException e) {
                    System.out.println("聊天室关闭");
                }
            });

            thread.start();

            //发送消息
            System.out.println("您可以开始输入消息了(输入'exit'退出):");
            while (true) {
                String msg = scanner.nextLine();
                if ("exit".equalsIgnoreCase(msg)) {
                    break;
                }

                printWriter.println(msg);

            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

SocketChannel&ServerSocketChannel

服务端实现

package org.example.code_case.javaIo.聊天室实现.SockerChannel实现;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

public class ChatServer {
    private static final int PORT = 9999;
    private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
    private ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
    private Selector selector;


    public void start() {
        try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();) {
            //绑定端口
            serverSocketChannel.bind(new InetSocketAddress(PORT));
            //设置非阻塞模式
            serverSocketChannel.configureBlocking(false);
            //注册选择器
            selector = Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            System.out.println("服务器启动,监听端口:"+PORT);
            while (true) {
                //等待就绪事件发生
                int select = selector.select();
                if (select == 0) {
                    continue;
                }

                //获取就绪事件
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();

                    try {
                        if (key.isAcceptable()) {
                            handlerAccept(key);
                        } else if (key.isReadable()) {
                            handlerRead(key);
                        } else if (key.isWritable()) {
                            handlerWrit(key);
                        }

                    }catch (IOException e) {
                        key.cancel();
                        if (key.channel() != null) {
                            key.channel().close();
                        }
                        System.out.println("客户端断开连接");
                    }

                }

            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void handlerWrit(SelectionKey key) {
    }

    private  void handlerAccept(SelectionKey key) throws IOException {
        ServerSocketChannel channel = (ServerSocketChannel) key.channel();
        SocketChannel clientChannel = channel.accept();

        clientChannel.configureBlocking(false);
        clientChannel.register(key.selector(), SelectionKey.OP_READ);

        System.out.println("客户端连接成功,客户端IP地址:"+clientChannel.getRemoteAddress());

        //发送欢迎消息
        String message = "欢迎连接到聊天室!请求输入您的用户名:";
        writeBuffer.clear();
        writeBuffer.put(message.getBytes());
        writeBuffer.flip();
        clientChannel.write(writeBuffer);
    }

    private void handlerRead(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        readBuffer.clear();
        int count = channel.read(readBuffer);
        if (count == -1) {
            System.out.println("客户端断开连接:" + channel.getRemoteAddress());
            key.cancel();
            channel.close();
            return;
        }

        //处理读数据
        readBuffer.flip();
        byte[] bytes = new byte[readBuffer.remaining()];
        readBuffer.get(bytes);
        String message = new String(bytes, "UTF-8");


        String clientName = (String) key.attachment();
        if(clientName == null){
            //第一次收到消息,作用用户名
            clientName = message.trim();
            key.attach(clientName);
            System.out.println("客户端"+clientName+"上线");

            //广播新用户加入
            broadcastMessage("[系统] " + clientName + " 加入了聊天室", channel);
        }else{
            //广播聊天消息
            broadcastMessage("[" + clientName + "] " + message, channel);
        }
    }

    private void broadcastMessage(String msg, SocketChannel exceptChannel) throws IOException {
        for (SelectionKey key : selector.keys()) {
           Channel channel = key.channel();
           if(channel instanceof SocketChannel clientChannel && channel != exceptChannel){
               writeBuffer.clear();
               writeBuffer.put(msg.getBytes());
               writeBuffer.flip();
               clientChannel.write(writeBuffer);
           }
        }
    }

    public static void main(String[] args) {
        new ChatServer().start();
    }
}

客户端实现

package org.example.code_case.javaIo.聊天室实现.SockerChannel实现;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;

public class ChatClient {

    private Selector selector;
    private SocketChannel socketChannel;
    private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
    private ByteBuffer writeBuffer = ByteBuffer.allocate(1024);


    public void start() throws IOException {
        selector = Selector.open();

        socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);

        //连接服务器
        socketChannel.connect(new InetSocketAddress("localhost", 9999));

        //注册客户端通道,监听连接时间
        socketChannel.register(selector, SelectionKey.OP_CONNECT);
        System.out.println("尝试连接到服务器.....");

        Thread thread = new Thread(this::readUserInput);
        thread.setDaemon(true);
        thread.start();

        //轮询选择器
        while (true) {
            int select = selector.select();
            if (select == 0) continue;

            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                try {
                    // 处理事件
                    if (key.isConnectable()) {
                        // 处理连接事件
                        handleConnect(key);
                    } else if (key.isReadable()) {
                        // 处理读事件
                        handleRead(key);
                    }
                } catch (IOException e) {
                    // 处理异常
                    key.cancel();
                    if (key.channel() != null) {
                        key.channel().close();
                    }
                    System.err.println("处理服务器响应时出错: " + e.getMessage());
                    return;
                }
            }
        }
    }
    // 处理连接事件
    private void handleConnect(SelectionKey key) throws IOException {
        SocketChannel clientChannel = (SocketChannel) key.channel();

        // 完成连接过程
        if (clientChannel.isConnectionPending()) {
            clientChannel.finishConnect();
        }

        System.out.println("已连接到服务器");

        // 注册客户端通道,监听读事件
        clientChannel.register(selector, SelectionKey.OP_READ);
    }

    // 处理读事件
    private void handleRead(SelectionKey key) throws IOException {
        SocketChannel clientChannel = (SocketChannel) key.channel();
        readBuffer.clear();

        int bytesRead = clientChannel.read(readBuffer);
        if (bytesRead == -1) {
            // 服务器关闭连接
            System.out.println("与服务器断开连接");
            key.cancel();
            clientChannel.close();
            return;
        }

        // 处理读取的数据
        readBuffer.flip();
        byte[] bytes = new byte[readBuffer.remaining()];
        readBuffer.get(bytes);
        String message = new String(bytes, "UTF-8");

        System.out.println(message);
    }

    // 读取用户输入并发送到服务器
    private void readUserInput() {
        Scanner scanner = new Scanner(System.in);

        while (true) {
            String userInput = scanner.nextLine();

            if ("exit".equalsIgnoreCase(userInput)) {
                try {
                    System.out.println("退出聊天室");
                    socketChannel.close();
                    System.exit(0);
                } catch (IOException e) {
                    System.err.println("关闭连接时出错: " + e.getMessage());
                }
                break;
            }

            try {
                // 发送消息到服务器
                writeBuffer.clear();
                writeBuffer.put(userInput.getBytes());
                writeBuffer.flip();
                socketChannel.write(writeBuffer);
            } catch (IOException e) {
                System.err.println("发送消息时出错: " + e.getMessage());
                break;
            }
        }
    }

    public static void main(String[] args) throws IOException {
        new ChatClient().start();
    }
}

尝试连接到服务器.....

已连接到服务器

欢迎连接到聊天室!请求输入您的用户名:

[系统] 用户1 加入了聊天室

用户2

你是谁

[用户1] 我是用户2

1

[用户1] 2

3

[用户1] 4

尝试连接到服务器.....

已连接到服务器

欢迎连接到聊天室!请求输入您的用户名:

用户1

[系统] 用户2 加入了聊天室

[用户2] 你是谁

我是用户2

[用户2] 1

2

[用户2] 3

4

总结

  • 使用BIO同步阻塞模型实现的聊天室:服务端需要为每个客户端单独开辟线程处理数据,否则就会就会造成服务端阻塞
    • 优点:代码实现简单、可读性强易理解
    • 缺点:线程资源浪费、内存占用高、并发能力有限、运行效率低、拓展性差
  • 使用AIO同步非阻塞模型实现的聊天室:服务端通过Selector来监控客户端连接,事件就绪时,通知程序执行相应的逻辑,避免阻塞。然后通过IO多路复用,实现单个线程可以管理多个连接
    • 优点:高并发的处理能力、事件驱动模型、资源利用率高
    • 缺点:代码实现复杂、可读性差不易理解

网站公告

今日签到

点亮在社区的每一天
去签到