Scoket&ServerSocket
客户端数据处理器
package org.example.code_case.javaIo.聊天室实现.Socket实现;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Set;
public class ClientHandler implements Runnable {
private Socket clientSocket;
private Set<ClientHandler> clients;
private PrintWriter out;
private BufferedReader in;
private String clientName;
public ClientHandler(Socket accept, Set<ClientHandler> clients) {
this.clientSocket = accept;
this.clients = clients;
}
@Override
public void run() {
try {
out = new PrintWriter(clientSocket.getOutputStream(),true);
in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
System.out.println("客户端连接成功");
clientName = in.readLine();
System.out.println(clientName + "上线了");
//广播新用户加入
broadcastMessage("[系统]" + clientName + "加入聊天室");
//读取客户端消息
String msg;
while ((msg = in.readLine()) != null) {
System.out.println("收到的消息:" + msg);
broadcastMessage("[" + clientName + "]:" + msg);
}
} catch (IOException e) {
throw new RuntimeException(e);
}finally {
try {
clients.remove(this);
if(clientName != null){
broadcastMessage("[系统]" + clientName + "离开聊天室");
}
if (out != null) out.close();
if (in != null) in.close();
if (clientSocket != null) clientSocket.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
private void broadcastMessage(String msg) {
for (ClientHandler client : clients) {
if (client != this) {
client.out.println(msg);
}
}
}
}
服务端实现
package org.example.code_case.javaIo.聊天室实现.Socket实现;
import org.ehcache.impl.internal.concurrent.ConcurrentHashMap;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ChatServer {
private static final int PATH = 8888;
private static final Set<ClientHandler> CLIENTS = ConcurrentHashMap.newKeySet();
private static final ExecutorService POOL = Executors.newCachedThreadPool();
public static void main(String[] args) {
System.out.println("聊天室服务器启动......");
try (ServerSocket serverSocket = new ServerSocket(PATH);){
while (true) {
Socket accept = serverSocket.accept();
System.out.println("新客户端连接:"+accept.getInetAddress().getHostAddress());
ClientHandler clientHandler = new ClientHandler(accept,CLIENTS);
CLIENTS.add(clientHandler);
POOL.execute(clientHandler);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
客户端实现
package org.example.code_case.javaIo.聊天室实现.Socket实现;
import java.io.*;
import java.net.Socket;
import java.util.Scanner;
public class ChatClient {
public static void main(String[] args) {
try (Socket socket = new Socket("127.0.0.1", 8888);
InputStream in = socket.getInputStream();
OutputStream out = socket.getOutputStream();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in));
Scanner scanner = new Scanner(System.in);
PrintWriter printWriter = new PrintWriter(out, true);) {
System.out.println("已连接到聊天室.....");
System.out.println("请输入用户名:");
String name = scanner.nextLine();
System.out.println("用户名:" + name);
printWriter.println(name);
Thread thread = new Thread(() -> {
try {
String msg;
while ((msg = bufferedReader.readLine()) != null) {
System.out.println(msg);
}
} catch (IOException e) {
System.out.println("聊天室关闭");
}
});
thread.start();
//发送消息
System.out.println("您可以开始输入消息了(输入'exit'退出):");
while (true) {
String msg = scanner.nextLine();
if ("exit".equalsIgnoreCase(msg)) {
break;
}
printWriter.println(msg);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
SocketChannel&ServerSocketChannel
服务端实现
package org.example.code_case.javaIo.聊天室实现.SockerChannel实现;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
public class ChatServer {
private static final int PORT = 9999;
private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
private ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
private Selector selector;
public void start() {
try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();) {
//绑定端口
serverSocketChannel.bind(new InetSocketAddress(PORT));
//设置非阻塞模式
serverSocketChannel.configureBlocking(false);
//注册选择器
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器启动,监听端口:"+PORT);
while (true) {
//等待就绪事件发生
int select = selector.select();
if (select == 0) {
continue;
}
//获取就绪事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
if (key.isAcceptable()) {
handlerAccept(key);
} else if (key.isReadable()) {
handlerRead(key);
} else if (key.isWritable()) {
handlerWrit(key);
}
}catch (IOException e) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
System.out.println("客户端断开连接");
}
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void handlerWrit(SelectionKey key) {
}
private void handlerAccept(SelectionKey key) throws IOException {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel clientChannel = channel.accept();
clientChannel.configureBlocking(false);
clientChannel.register(key.selector(), SelectionKey.OP_READ);
System.out.println("客户端连接成功,客户端IP地址:"+clientChannel.getRemoteAddress());
//发送欢迎消息
String message = "欢迎连接到聊天室!请求输入您的用户名:";
writeBuffer.clear();
writeBuffer.put(message.getBytes());
writeBuffer.flip();
clientChannel.write(writeBuffer);
}
private void handlerRead(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
readBuffer.clear();
int count = channel.read(readBuffer);
if (count == -1) {
System.out.println("客户端断开连接:" + channel.getRemoteAddress());
key.cancel();
channel.close();
return;
}
//处理读数据
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String message = new String(bytes, "UTF-8");
String clientName = (String) key.attachment();
if(clientName == null){
//第一次收到消息,作用用户名
clientName = message.trim();
key.attach(clientName);
System.out.println("客户端"+clientName+"上线");
//广播新用户加入
broadcastMessage("[系统] " + clientName + " 加入了聊天室", channel);
}else{
//广播聊天消息
broadcastMessage("[" + clientName + "] " + message, channel);
}
}
private void broadcastMessage(String msg, SocketChannel exceptChannel) throws IOException {
for (SelectionKey key : selector.keys()) {
Channel channel = key.channel();
if(channel instanceof SocketChannel clientChannel && channel != exceptChannel){
writeBuffer.clear();
writeBuffer.put(msg.getBytes());
writeBuffer.flip();
clientChannel.write(writeBuffer);
}
}
}
public static void main(String[] args) {
new ChatServer().start();
}
}
客户端实现
package org.example.code_case.javaIo.聊天室实现.SockerChannel实现;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
public class ChatClient {
private Selector selector;
private SocketChannel socketChannel;
private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
private ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
public void start() throws IOException {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
//连接服务器
socketChannel.connect(new InetSocketAddress("localhost", 9999));
//注册客户端通道,监听连接时间
socketChannel.register(selector, SelectionKey.OP_CONNECT);
System.out.println("尝试连接到服务器.....");
Thread thread = new Thread(this::readUserInput);
thread.setDaemon(true);
thread.start();
//轮询选择器
while (true) {
int select = selector.select();
if (select == 0) continue;
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
// 处理事件
if (key.isConnectable()) {
// 处理连接事件
handleConnect(key);
} else if (key.isReadable()) {
// 处理读事件
handleRead(key);
}
} catch (IOException e) {
// 处理异常
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
System.err.println("处理服务器响应时出错: " + e.getMessage());
return;
}
}
}
}
// 处理连接事件
private void handleConnect(SelectionKey key) throws IOException {
SocketChannel clientChannel = (SocketChannel) key.channel();
// 完成连接过程
if (clientChannel.isConnectionPending()) {
clientChannel.finishConnect();
}
System.out.println("已连接到服务器");
// 注册客户端通道,监听读事件
clientChannel.register(selector, SelectionKey.OP_READ);
}
// 处理读事件
private void handleRead(SelectionKey key) throws IOException {
SocketChannel clientChannel = (SocketChannel) key.channel();
readBuffer.clear();
int bytesRead = clientChannel.read(readBuffer);
if (bytesRead == -1) {
// 服务器关闭连接
System.out.println("与服务器断开连接");
key.cancel();
clientChannel.close();
return;
}
// 处理读取的数据
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String message = new String(bytes, "UTF-8");
System.out.println(message);
}
// 读取用户输入并发送到服务器
private void readUserInput() {
Scanner scanner = new Scanner(System.in);
while (true) {
String userInput = scanner.nextLine();
if ("exit".equalsIgnoreCase(userInput)) {
try {
System.out.println("退出聊天室");
socketChannel.close();
System.exit(0);
} catch (IOException e) {
System.err.println("关闭连接时出错: " + e.getMessage());
}
break;
}
try {
// 发送消息到服务器
writeBuffer.clear();
writeBuffer.put(userInput.getBytes());
writeBuffer.flip();
socketChannel.write(writeBuffer);
} catch (IOException e) {
System.err.println("发送消息时出错: " + e.getMessage());
break;
}
}
}
public static void main(String[] args) throws IOException {
new ChatClient().start();
}
}
尝试连接到服务器.....
已连接到服务器
欢迎连接到聊天室!请求输入您的用户名:
[系统] 用户1 加入了聊天室
用户2
你是谁
[用户1] 我是用户2
1
[用户1] 2
3
[用户1] 4
尝试连接到服务器.....
已连接到服务器
欢迎连接到聊天室!请求输入您的用户名:
用户1
[系统] 用户2 加入了聊天室
[用户2] 你是谁
我是用户2
[用户2] 1
2
[用户2] 3
4
总结
- 使用BIO同步阻塞模型实现的聊天室:服务端需要为每个客户端单独开辟线程处理数据,否则就会就会造成服务端阻塞
- 优点:代码实现简单、可读性强易理解
- 缺点:线程资源浪费、内存占用高、并发能力有限、运行效率低、拓展性差
- 使用AIO同步非阻塞模型实现的聊天室:服务端通过Selector来监控客户端连接,事件就绪时,通知程序执行相应的逻辑,避免阻塞。然后通过IO多路复用,实现单个线程可以管理多个连接
- 优点:高并发的处理能力、事件驱动模型、资源利用率高
- 缺点:代码实现复杂、可读性差不易理解