TCP通信的封装,我们可以从以下几个方面进行改进:
线程池优化:使用更高效的线程池配置,避免频繁创建和销毁线程。
连接重试机制:在网络不稳定时,自动重试连接。
心跳机制:保持长连接,避免因超时断开。
数据缓冲区优化:动态调整缓冲区大小,适应不同数据量。
异常处理增强:区分不同类型的异常,提供更详细的错误信息。
代码简洁性:减少冗余代码,提高可读性和可维护性。
TCP客户端封装(Java)
import android.util.Log;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class TcpClient {
private static final String TAG = "TcpClient";
private static final int HEARTBEAT_INTERVAL = 10; // 心跳间隔(秒)
private static final int CONNECT_TIMEOUT = 5000; // 连接超时时间(毫秒)
private static final int RECONNECT_DELAY = 3000; // 重连延迟时间(毫秒)
private Socket socket;
private InputStream inputStream;
private OutputStream outputStream;
private ExecutorService executorService;
private ScheduledExecutorService heartbeatExecutor;
private boolean isConnected = false;
private TcpListener listener;
private String serverIp;
private int serverPort;
public TcpClient(TcpListener listener) {
this.listener = listener;
executorService = Executors.newCachedThreadPool();
heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
}
/**
* 连接到服务器
*
* @param ip 服务器IP地址
* @param port 服务器端口
*/
public void connect(String ip, int port) {
this.serverIp = ip;
this.serverPort = port;
executorService.execute(this::connectInternal);
}
private void connectInternal() {
try {
// 创建Socket并连接服务器
socket = new Socket();
socket.connect(new InetSocketAddress(serverIp, serverPort), CONNECT_TIMEOUT);
inputStream = socket.getInputStream();
outputStream = socket.getOutputStream();
isConnected = true;
// 通知连接成功
if (listener != null) {
listener.onConnected();
}
// 开始接收数据
receiveData();
// 启动心跳机制
startHeartbeat();
} catch (IOException e) {
Log.e(TAG, "Connection failed: " + e.getMessage());
if (listener != null) {
listener.onError("Connection failed: " + e.getMessage());
}
scheduleReconnect();
}
}
/**
* 断开连接
*/
public void disconnect() {
executorService.execute(() -> {
try {
if (socket != null) {
socket.close();
}
if (inputStream != null) {
inputStream.close();
}
if (outputStream != null) {
outputStream.close();
}
isConnected = false;
// 通知断开连接
if (listener != null) {
listener.onDisconnected();
}
} catch (IOException e) {
Log.e(TAG, "Disconnect error: " + e.getMessage());
} finally {
stopHeartbeat();
}
});
}
/**
* 发送数据
*
* @param data 要发送的数据
*/
public void sendData(byte[] data) {
if (!isConnected || outputStream == null) {
Log.e(TAG, "Not connected to server");
return;
}
executorService.execute(() -> {
try {
outputStream.write(data);
outputStream.flush();
Log.d(TAG, "Data sent successfully");
} catch (IOException e) {
Log.e(TAG, "Failed to send data: " + e.getMessage());
if (listener != null) {
listener.onError("Failed to send data: " + e.getMessage());
}
disconnect();
}
});
}
/**
* 接收数据
*/
private void receiveData() {
executorService.execute(() -> {
byte[] buffer = new byte[1024];
int bytesRead;
while (isConnected) {
try {
bytesRead = inputStream.read(buffer);
if (bytesRead == -1) {
// 服务器关闭连接
disconnect();
break;
}
// 处理接收到的数据
byte[] receivedData = new byte[bytesRead];
System.arraycopy(buffer, 0, receivedData, 0, bytesRead);
// 通知数据接收
if (listener != null) {
listener.onDataReceived(receivedData);
}
} catch (IOException e) {
Log.e(TAG, "Failed to receive data: " + e.getMessage());
if (listener != null) {
listener.onError("Failed to receive data: " + e.getMessage());
}
disconnect();
break;
}
}
});
}
/**
* 启动心跳机制
*/
private void startHeartbeat() {
heartbeatExecutor.scheduleAtFixedRate(() -> {
if (isConnected) {
sendData("HEARTBEAT".getBytes()); // 发送心跳包
}
}, HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
}
/**
* 停止心跳机制
*/
private void stopHeartbeat() {
heartbeatExecutor.shutdown();
}
/**
* 安排重连
*/
private void scheduleReconnect() {
executorService.schedule(this::connectInternal, RECONNECT_DELAY, TimeUnit.MILLISECONDS);
}
/**
* 是否已连接
*/
public boolean isConnected() {
return isConnected;
}
/**
* 关闭线程池
*/
public void shutdown() {
executorService.shutdown();
heartbeatExecutor.shutdown();
}
/**
* TCP事件监听器
*/
public interface TcpListener {
void onConnected(); // 连接成功
void onDisconnected(); // 断开连接
void onDataReceived(byte[] data); // 接收到数据
void onError(String error); // 发生错误
}
}
2. 在Activity中使用
import android.os.Bundle;
import android.util.Log;
import androidx.appcompat.app.AppCompatActivity;
public class MainActivity extends AppCompatActivity implements TcpClient.TcpListener {
private static final String SERVER_IP = "192.168.1.100"; // 服务器IP
private static final int SERVER_PORT = 8080; // 服务器端口
private TcpClient tcpClient;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
// 初始化TCP客户端
tcpClient = new TcpClient(this);
// 连接到服务器
tcpClient.connect(SERVER_IP, SERVER_PORT);
// 发送数据
String message = "Hello, Server!";
tcpClient.sendData(message.getBytes());
}
@Override
public void onConnected() {
Log.d("TcpClient", "Connected to server");
}
@Override
public void onDisconnected() {
Log.d("TcpClient", "Disconnected from server");
}
@Override
public void onDataReceived(byte[] data) {
String message = new String(data);
Log.d("TcpClient", "Received data: " + message);
}
@Override
public void onError(String error) {
Log.e("TcpClient", "Error: " + error);
}
@Override
protected void onDestroy() {
super.onDestroy();
// 断开连接并释放资源
if (tcpClient != null) {
tcpClient.disconnect();
tcpClient.shutdown();
}
}
}
进一步优化(Kotlin版本)
import android.os.Bundle
import android.util.Log
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import java.io.IOException
import java.io.InputStream
import java.io.OutputStream
import java.net.InetSocketAddress
import java.net.Socket
class MainActivity : AppCompatActivity(), TcpClient.TcpListener {
private val serverIp = "192.168.1.100" // 服务器IP
private val serverPort = 8080 // 服务器端口
private lateinit var tcpClient: TcpClient
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
// 初始化TCP客户端
tcpClient = TcpClient(this)
// 连接到服务器
tcpClient.connect(serverIp, serverPort)
// 发送数据
val message = "Hello, Server!"
tcpClient.sendData(message.toByteArray())
}
override fun onConnected() {
Log.d("TcpClient", "Connected to server")
}
override fun onDisconnected() {
Log.d("TcpClient", "Disconnected from server")
}
override fun onDataReceived(data: ByteArray) {
val message = String(data)
Log.d("TcpClient", "Received data: $message")
}
override fun onError(error: String) {
Log.e("TcpClient", "Error: $error")
}
override fun onDestroy() {
super.onDestroy()
// 断开连接并释放资源
tcpClient.disconnect()
tcpClient.shutdown()
}
}
class TcpClient(private val listener: TcpListener) {
private var socket: Socket? = null
private var inputStream: InputStream? = null
private var outputStream: OutputStream? = null
private var isConnected = false
private val scope = CoroutineScope(Dispatchers.IO)
private var heartbeatJob: Job? = null
fun connect(ip: String, port: Int) {
scope.launch {
try {
socket = Socket().apply {
connect(InetSocketAddress(ip, port), 5000) // 5秒超时
}
inputStream = socket?.getInputStream()
outputStream = socket?.getOutputStream()
isConnected = true
withContext(Dispatchers.Main) {
listener.onConnected()
}
receiveData()
startHeartbeat()
} catch (e: IOException) {
withContext(Dispatchers.Main) {
listener.onError("Connection failed: ${e.message}")
}
scheduleReconnect()
}
}
}
fun disconnect() {
scope.launch {
try {
socket?.close()
inputStream?.close()
outputStream?.close()
isConnected = false
withContext(Dispatchers.Main) {
listener.onDisconnected()
}
} catch (e: IOException) {
withContext(Dispatchers.Main) {
listener.onError("Disconnect error: ${e.message}")
}
} finally {
stopHeartbeat()
}
}
}
fun sendData(data: ByteArray) {
if (!isConnected || outputStream == null) {
Log.e("TcpClient", "Not connected to server")
return
}
scope.launch {
try {
outputStream?.write(data)
outputStream?.flush()
Log.d("TcpClient", "Data sent successfully")
} catch (e: IOException) {
withContext(Dispatchers.Main) {
listener.onError("Failed to send data: ${e.message}")
}
disconnect()
}
}
}
private fun receiveData() {
scope.launch {
val buffer = ByteArray(1024)
var bytesRead: Int
while (isConnected) {
try {
bytesRead = inputStream?.read(buffer) ?: -1
if (bytesRead == -1) {
disconnect()
break
}
val receivedData = buffer.copyOf(bytesRead)
withContext(Dispatchers.Main) {
listener.onDataReceived(receivedData)
}
} catch (e: IOException) {
withContext(Dispatchers.Main) {
listener.onError("Failed to receive data: ${e.message}")
}
disconnect()
break
}
}
}
}
private fun startHeartbeat() {
heartbeatJob = scope.launch {
while (isConnected) {
sendData("HEARTBEAT".toByteArray())
delay(10000) // 10秒间隔
}
}
}
private fun stopHeartbeat() {
heartbeatJob?.cancel()
}
private fun scheduleReconnect() {
scope.launch {
delay(3000) // 3秒后重连
connect(socket?.inetAddress?.hostAddress ?: "", socket?.port ?: 0)
}
}
fun shutdown() {
scope.cancel()
}
interface TcpListener {
fun onConnected()
fun onDisconnected()
fun onDataReceived(data: ByteArray)
fun onError(error: String)
}
}