BIO模型聊天室项目大体设计
BIO编程模型
- Acceptor是服务器端负责监听具体端口的Socket
- 每有一个客户端Client连接到服务器端,Acceptor就创建一个新的线程Handler来处理客户端发送的消息
- 每一个客户端都有一个唯一的Handler来对应处理其事务
- 为保证线程安全,简单地使用了synchronized来保证
时序图
服务器端Server代码实现
ChatServer实现
package server;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
public class ChatServer {
//定义Socket需要监听的端口
private int DEFAULT_PORT = 8888;
private final String QUIT = "quit";
private ServerSocket serverSocket;
private Map<Integer, Writer> connectedClients;
public ChatServer() {
connectedClients = new HashMap<>();
}
public synchronized void addClient(Socket socket) throws IOException {
if (socket != null) {
int port = socket.getPort();
BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(socket.getOutputStream())
);
connectedClients.put(port, writer);
System.out.println("客户端[" + port + "]已连接到服务器");
}
}
public synchronized void removeClient(Socket socket) throws IOException {
if (socket != null) {
int port = socket.getPort();
if (connectedClients.containsKey(port)) {
connectedClients.get(port).close();
}
connectedClients.remove(port);
System.out.println("客户端[" + port + "]已断开连接");
}
}
public synchronized void forwardMessage(Socket socket, String fwdMsg) throws IOException {
for (Integer id : connectedClients.keySet()) {
if (!id.equals(socket.getPort())) {
Writer writer = connectedClients.get(id);
writer.write(fwdMsg);
writer.flush();
}
}
}
public boolean readyToQuit(String msg) {
return QUIT.equals(msg);
}
public synchronized void close() {
if (serverSocket != null) {
try {
serverSocket.close();
System.out.println("关闭serverSocket");
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void start() {
try {
// 绑定监听端口
serverSocket = new ServerSocket(DEFAULT_PORT);
System.out.println("启动服务器,监听端口:" + DEFAULT_PORT + "...");
while (true) {
// 等待客户端连接
Socket socket = serverSocket.accept();
// 创建ChatHandler线程
new Thread(new ChatHandler(this, socket)).start();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
close();
}
}
public static void main(String[] args) {
ChatServer server = new ChatServer();
server.start();
}
}
ChatHanlder实现
package server;
import java.io.*;
import java.net.Socket;
public class ChatHandler implements Runnable {
private ChatServer server;
private Socket socket;
public ChatHandler(ChatServer server, Socket socket) {
this.server = server;
this.socket = socket;
}
@Override
public void run() {
try {
// 存储新上线用户
server.addClient(socket);
// 读取用户发送的消息
BufferedReader reader = new BufferedReader(
new InputStreamReader(socket.getInputStream())
);
String msg = null;
while ((msg = reader.readLine()) != null) {
String fwdMsg = "客户端[" + socket.getPort() + "]: " + msg + "\n";
System.out.print(fwdMsg);
// 将消息转发给聊天室里在线的其他用户
server.forwardMessage(socket, fwdMsg);
// 检查用户是否准备退出
if (server.readyToQuit(msg)) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
server.removeClient(socket);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
客户端Client代码实现
ChatClient实现
package client;
import java.io.*;
import java.net.Socket;
public class ChatClient {
//定义目标服务器的主机名和端口
private final String DEFAULT_SERVER_HOST = "127.0.0.1";
private final int DEFAULT_SERVER_PORT = 8888;
private final String QUIT = "quit";
//定义Socket和输入流,输出流
private Socket socket;
private BufferedReader reader;
private BufferedWriter writer;
// 发送消息给服务器
public void send(String msg) throws IOException {
if (!socket.isOutputShutdown()) {
writer.write(msg + "\n");
writer.flush();
}
}
// 从服务器接收消息
public String receive() throws IOException {
String msg = null;
if (!socket.isInputShutdown()) {
msg = reader.readLine();
}
return msg;
}
// 检查用户是否准备退出
public boolean readyToQuit(String msg) {
return QUIT.equals(msg);
}
public void close() {
if (writer != null) {
try {
System.out.println("关闭socket");
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void start() {
try {
// 创建socket
socket = new Socket(DEFAULT_SERVER_HOST, DEFAULT_SERVER_PORT);
// 创建IO流
reader = new BufferedReader(
new InputStreamReader(socket.getInputStream())
);
writer = new BufferedWriter(
new OutputStreamWriter(socket.getOutputStream())
);
// 处理用户的输入
new Thread(new UserInputHandler(this)).start();
// 读取服务器转发的消息
String msg = null;
while ((msg = receive()) != null) {
System.out.println(msg);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
close();
}
}
public static void main(String[] args) {
ChatClient chatClient = new ChatClient();
chatClient.start();
}
}
UserInputHanlder实现
package client;
import client.ChatClient;
import java.io.*;
public class UserInputHandler implements Runnable {
private ChatClient chatClient;
public UserInputHandler(ChatClient chatClient) {
this.chatClient = chatClient;
}
@Override
public void run() {
try {
// 等待用户输入消息
BufferedReader consoleReader =
new BufferedReader(new InputStreamReader(System.in));
while (true) {
String input = consoleReader.readLine();
// 向服务器发送消息
chatClient.send(input);
// 检查用户是否准备退出
if (chatClient.readyToQuit(input)) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
用线程池管理线程数
用线程池限制最大的线程数,防止线程过多导致服务器崩溃
package server;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ChatServer {
private int DEFAULT_PORT = 8888;
private final String QUIT = "quit";
//创建线程池
private ExecutorService executorService;
private ServerSocket serverSocket;
private Map<Integer, Writer> connectedClients;
public ChatServer() {
//线程池最大线程数为10;
executorService = Executors.newFixedThreadPool(10);
connectedClients = new HashMap<>();
}
public synchronized void addClient(Socket socket) throws IOException {
if (socket != null) {
int port = socket.getPort();
BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(socket.getOutputStream())
);
connectedClients.put(port, writer);
System.out.println("客户端[" + port + "]已连接到服务器");
}
}
public synchronized void removeClient(Socket socket) throws IOException {
if (socket != null) {
int port = socket.getPort();
if (connectedClients.containsKey(port)) {
connectedClients.get(port).close();
}
connectedClients.remove(port);
System.out.println("客户端[" + port + "]已断开连接");
}
}
public synchronized void forwardMessage(Socket socket, String fwdMsg) throws IOException {
for (Integer id : connectedClients.keySet()) {
if (!id.equals(socket.getPort())) {
Writer writer = connectedClients.get(id);
writer.write(fwdMsg);
writer.flush();
}
}
}
public boolean readyToQuit(String msg) {
return QUIT.equals(msg);
}
public synchronized void close() {
if (serverSocket != null) {
try {
serverSocket.close();
System.out.println("关闭serverSocket");
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void start() {
try {
// 绑定监听端口
serverSocket = new ServerSocket(DEFAULT_PORT);
System.out.println("启动服务器,监听端口:" + DEFAULT_PORT + "...");
while (true) {
// 等待客户端连接
Socket socket = serverSocket.accept();
// 创建ChatHandler线程
// 交由线程池负责
executorService.execute(new ChatHandler(this, socket));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
close();
}
}
public static void main(String[] args) {
ChatServer server = new ChatServer();
server.start();
}
}
总结
BIO模型中的阻塞在哪里?
- ServerSocket.accept()是阻塞的
- InputStream.read(), OutputStream.write()也是阻塞的
- 无法在同一个线程里处理多个Stream的I/O,导致了线程的资源浪费