文章目录
在基于 WebSocket 开发应用时,为了保证连接的良好状态并提供稳定的实时通信,通常需要实现以下功能:
1. 连接管理
- 建立连接:初始化 WebSocket 连接。
- 保持连接状态:监控连接状态,确保连接始终处于活跃状态。
- 关闭连接:在合适的时候(如用户退出、应用销毁等)正确关闭连接,释放资源。
2. 心跳检测
- 发送心跳消息:定期向服务器发送心跳消息(如
"ping"
),以检测连接是否仍然活跃。 - 接收心跳响应:服务器收到心跳消息后,返回响应(如
"pong"
)。客户端通过接收响应来确认连接状态。 - 超时处理:如果在一定时间内没有收到心跳响应,认为连接已断开,触发重连机制。
3. 重连机制
- 自动重连:在连接断开时(如网络问题、服务器故障等),自动尝试重新建立连接。
- 重连策略:可以设置重连的间隔时间(如指数退避策略),避免频繁重连导致的资源浪费。
- 重连次数限制:设置最大重连次数,避免无限重连。
4. 消息队列
- 消息缓存:在连接断开时,将未发送的消息缓存起来。
- 消息重发:在连接重新建立后,从队列中取出缓存的消息并重新发送。
5. 错误处理
- 捕获错误:捕获 WebSocket 连接过程中可能出现的错误(如网络异常、服务器拒绝连接等)。
- 日志记录:记录错误信息,便于调试和排查问题。
- 用户提示:向用户展示错误信息,提示用户当前状态(如“网络连接失败,请检查网络”)。
6. 资源管理
- 内存管理:确保在连接关闭时释放相关资源,避免内存泄漏。
- 线程管理:合理管理 WebSocket 的工作线程,避免线程资源耗尽。
7. 安全性
- 加密通信:使用
wss://
(WebSocket Secure)协议,通过 TLS/SSL 加密数据传输,确保通信安全。 - 身份验证:在建立连接时进行身份验证,确保只有合法用户可以连接到服务器。
- 数据校验:对发送和接收的数据进行校验,防止恶意数据注入。
8. 状态同步
- 同步状态:在连接重新建立后,同步客户端和服务器的状态,确保数据一致性。
- 消息确认:对于重要消息,实现消息确认机制,确保消息被正确接收。
示例代码
1. 添加依赖
在 build.gradle
文件中添加 OkHttp 的依赖:
dependencies {
implementation 'com.squareup.okhttp3:okhttp:4.10.0'
}
2. WebSocket 客户端实现
代码注释
import android.os.Handler;
import android.os.Looper;
import android.util.Log;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ConcurrentLinkedQueue;
public class WebSocketClient {
private static final String TAG = "WebSocketClient"; // 日志标签
private WebSocket webSocket; // 当前 WebSocket 连接
private final OkHttpClient client = new OkHttpClient(); // OkHttp 客户端实例
private final Handler handler = new Handler(Looper.getMainLooper()); // 用于在主线程中更新 UI
private final Runnable heartbeatRunnable = new Runnable() { // 心跳检测任务
@Override
public void run() {
if (webSocket != null && webSocket.send("ping")) { // 发送心跳消息 "ping"
handler.postDelayed(this, 5000); // 每5秒发送一次心跳
}
}
};
private final Runnable reconnectRunnable = new Runnable() { // 重连任务
@Override
public void run() {
connect(url, callback); // 重新连接
}
};
private final ConcurrentLinkedQueue<String> messageQueue = new ConcurrentLinkedQueue<>(); // 消息队列
private final Object lock = new Object(); // 同步锁
private String url; // WebSocket 服务器的 URL
private WebSocketCallback callback; // WebSocket 回调接口
public interface WebSocketCallback { // WebSocket 回调接口
void onOpen(); // 连接成功
void onMessage(String text); // 接收到消息
void onClose(); // 连接关闭
void onFailure(Throwable t); // 发生错误
}
public WebSocketClient(String url, WebSocketCallback callback) {
this.url = url; // 初始化 URL
this.callback = callback; // 初始化回调接口
}
public void connect() { // 建立 WebSocket 连接
Request request = new Request.Builder()
.url(url)
.build();
webSocket = client.newWebSocket(request, new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, okhttp3.Response response) {
super.onOpen(webSocket, response);
Log.d(TAG, "WebSocket connected"); // 日志记录连接成功
callback.onOpen(); // 调用回调接口的 onOpen 方法
handler.post(heartbeatRunnable); // 启动心跳检测
synchronized (lock) {
while (!messageQueue.isEmpty()) { // 发送消息队列中的消息
webSocket.send(messageQueue.poll());
}
}
}
@Override
public void onMessage(WebSocket webSocket, String text) {
super.onMessage(webSocket, text);
Log.d(TAG, "Received message: " + text); // 日志记录接收到的消息
callback.onMessage(text); // 调用回调接口的 onMessage 方法
if ("pong".equals(text)) { // 如果收到服务器的 "pong" 响应
// 收到服务器响应,重置心跳检测
handler.removeCallbacks(heartbeatRunnable);
handler.post(heartbeatRunnable);
}
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
super.onClosed(webSocket, code, reason);
Log.d(TAG, "WebSocket closed: " + reason); // 日志记录连接关闭
callback.onClose(); // 调用回调接口的 onClose 方法
handler.removeCallbacks(heartbeatRunnable); // 停止心跳检测
handler.postDelayed(reconnectRunnable, 5000); // 5秒后重连
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, okhttp3.Response response) {
super.onFailure(webSocket, t, response);
Log.e(TAG, "WebSocket error: " + t.getMessage()); // 日志记录错误信息
callback.onFailure(t); // 调用回调接口的 onFailure 方法
handler.removeCallbacks(heartbeatRunnable); // 停止心跳检测
handler.postDelayed(reconnectRunnable, 5000); // 5秒后重连
}
});
}
public void sendMessage(String message) { // 发送消息
synchronized (lock) {
if (webSocket != null) {
webSocket.send(message); // 如果连接已建立,直接发送消息
} else {
messageQueue.add(message); // 如果连接未建立,将消息加入队列
}
}
}
public void disconnect() { // 关闭 WebSocket 连接
if (webSocket != null) {
webSocket.close(1000, "Goodbye"); // 关闭连接
}
handler.removeCallbacks(heartbeatRunnable); // 停止心跳检测
handler.removeCallbacks(reconnectRunnable); // 停止重连
}
}
功能标注
连接管理:
connect()
方法:建立 WebSocket 连接。disconnect()
方法:关闭 WebSocket 连接。
心跳检测:
heartbeatRunnable
:定期发送心跳消息"ping"
。- 在
onMessage()
方法中,收到"pong"
响应时重置心跳检测。
重连机制:
reconnectRunnable
:在连接关闭或发生错误时,5秒后自动尝试重新连接。
消息队列:
messageQueue
:在连接未建立时,将消息加入队列。- 在
onOpen()
方法中,从队列中取出消息并发送。
错误处理:
- 在
onFailure()
方法中,记录错误信息并调用回调接口的onFailure()
方法。
- 在
资源管理:
- 在
disconnect()
方法中,停止心跳检测和重连任务,释放资源。
- 在
安全通信:
- 使用
wss://
协议(WebSocket Secure)确保通信安全。
- 使用
状态同步:
- 在
onOpen()
方法中,发送消息队列中的消息,确保消息不丢失。 - 在
onMessage()
方法中,调用回调接口的onMessage()
方法,同步接收到的消息。
- 在
3. 安卓端使用
MainActivity.java
import android.os.Bundle;
import android.util.Log;
import android.view.View;
import android.widget.Button;
import android.widget.EditText;
import android.widget.TextView;
import androidx.appcompat.app.AppCompatActivity;
import java.util.concurrent.TimeUnit;
public class MainActivity extends AppCompatActivity {
private WebSocketClient webSocketClient;
private EditText messageEditText;
private Button sendButton;
private TextView messageTextView;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
messageEditText = findViewById(R.id.message);
sendButton = findViewById(R.id.send);
messageTextView = findViewById(R.id.messageTextView);
webSocketClient = new WebSocketClient("wss://yourserver.com/chat", new WebSocketClient.WebSocketCallback() {
@Override
public void onOpen() {
Log.d("WebSocket", "Connected");
runOnUiThread(() -> messageTextView.setText("Connected"));
}
@Override
public void onMessage(String text) {
Log.d("WebSocket", "Received: " + text);
runOnUiThread(() -> messageTextView.setText("Received: " + text));
}
@Override
public void onClose() {
Log.d("WebSocket", "Disconnected");
runOnUiThread(() -> messageTextView.setText("Disconnected"));
}
@Override
public void onFailure(Throwable t) {
Log.e("WebSocket", "Error: " + t.getMessage());
runOnUiThread(() -> messageTextView.setText("Error: " + t.getMessage()));
}
});
webSocketClient.connect();
sendButton.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
String message = messageEditText.getText().toString();
if (!message.isEmpty()) {
webSocketClient.sendMessage(message);
messageEditText.setText("");
}
}
});
}
@Override
protected void onDestroy() {
super.onDestroy();
webSocketClient.disconnect();
}
}
布局文件(activity_main.xml
)
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
android:layout_width="match_parent"
android:layout_height="match_parent"
android:orientation="vertical"
android:padding="16dp">
<EditText
android:id="@+id/message"
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:hint="Enter message" />
<Button
android:id="@+id/send"
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:text="Send" />
<TextView
android:id="@+id/messageTextView"
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:text="Status" />
</LinearLayout>
4. 后端(Flask + Flask-SocketIO)
安装依赖
pip install flask flask-socketio eventlet
Flask 应用代码(app.py
)
from flask import Flask, render_template
from flask_socketio import SocketIO, send
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
@app.route('/')
def index():
return render_template('index.html')
@socketio.on('message')
def handle_message(data):
print('Received message: ', data)
send(data, broadcast=True)
if __name__ == '__main__':
socketio.run(app, debug=True, host='0.0.0.0', port=5000)
5. 运行
- 启动 Flask 应用:
python app.py
- 启动 Android 应用:
- 输入消息并点击发送,消息将通过 WebSocket 发送到服务器。
- 服务器将消息广播给所有连接的客户端,客户端接收到消息后显示在界面上。