Android TCP封装工具类

发布于:2025-03-11 ⋅ 阅读:(22) ⋅ 点赞:(0)

TCP通信的封装,我们可以从以下几个方面进行改进:

线程池优化:使用更高效的线程池配置,避免频繁创建和销毁线程。

连接重试机制:在网络不稳定时,自动重试连接。

心跳机制:保持长连接,避免因超时断开。

数据缓冲区优化:动态调整缓冲区大小,适应不同数据量。

异常处理增强:区分不同类型的异常,提供更详细的错误信息。

代码简洁性:减少冗余代码,提高可读性和可维护性。

TCP客户端封装(Java)

import android.util.Log;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class TcpClient {

    private static final String TAG = "TcpClient";
    private static final int HEARTBEAT_INTERVAL = 10; // 心跳间隔(秒)
    private static final int CONNECT_TIMEOUT = 5000; // 连接超时时间(毫秒)
    private static final int RECONNECT_DELAY = 3000; // 重连延迟时间(毫秒)

    private Socket socket;
    private InputStream inputStream;
    private OutputStream outputStream;
    private ExecutorService executorService;
    private ScheduledExecutorService heartbeatExecutor;
    private boolean isConnected = false;
    private TcpListener listener;
    private String serverIp;
    private int serverPort;

    public TcpClient(TcpListener listener) {
        this.listener = listener;
        executorService = Executors.newCachedThreadPool();
        heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
    }

    /**
     * 连接到服务器
     *
     * @param ip   服务器IP地址
     * @param port 服务器端口
     */
    public void connect(String ip, int port) {
        this.serverIp = ip;
        this.serverPort = port;
        executorService.execute(this::connectInternal);
    }

    private void connectInternal() {
        try {
            // 创建Socket并连接服务器
            socket = new Socket();
            socket.connect(new InetSocketAddress(serverIp, serverPort), CONNECT_TIMEOUT);
            inputStream = socket.getInputStream();
            outputStream = socket.getOutputStream();
            isConnected = true;

            // 通知连接成功
            if (listener != null) {
                listener.onConnected();
            }

            // 开始接收数据
            receiveData();

            // 启动心跳机制
            startHeartbeat();
        } catch (IOException e) {
            Log.e(TAG, "Connection failed: " + e.getMessage());
            if (listener != null) {
                listener.onError("Connection failed: " + e.getMessage());
            }
            scheduleReconnect();
        }
    }

    /**
     * 断开连接
     */
    public void disconnect() {
        executorService.execute(() -> {
            try {
                if (socket != null) {
                    socket.close();
                }
                if (inputStream != null) {
                    inputStream.close();
                }
                if (outputStream != null) {
                    outputStream.close();
                }
                isConnected = false;

                // 通知断开连接
                if (listener != null) {
                    listener.onDisconnected();
                }
            } catch (IOException e) {
                Log.e(TAG, "Disconnect error: " + e.getMessage());
            } finally {
                stopHeartbeat();
            }
        });
    }

    /**
     * 发送数据
     *
     * @param data 要发送的数据
     */
    public void sendData(byte[] data) {
        if (!isConnected || outputStream == null) {
            Log.e(TAG, "Not connected to server");
            return;
        }

        executorService.execute(() -> {
            try {
                outputStream.write(data);
                outputStream.flush();
                Log.d(TAG, "Data sent successfully");
            } catch (IOException e) {
                Log.e(TAG, "Failed to send data: " + e.getMessage());
                if (listener != null) {
                    listener.onError("Failed to send data: " + e.getMessage());
                }
                disconnect();
            }
        });
    }

    /**
     * 接收数据
     */
    private void receiveData() {
        executorService.execute(() -> {
            byte[] buffer = new byte[1024];
            int bytesRead;

            while (isConnected) {
                try {
                    bytesRead = inputStream.read(buffer);
                    if (bytesRead == -1) {
                        // 服务器关闭连接
                        disconnect();
                        break;
                    }

                    // 处理接收到的数据
                    byte[] receivedData = new byte[bytesRead];
                    System.arraycopy(buffer, 0, receivedData, 0, bytesRead);

                    // 通知数据接收
                    if (listener != null) {
                        listener.onDataReceived(receivedData);
                    }
                } catch (IOException e) {
                    Log.e(TAG, "Failed to receive data: " + e.getMessage());
                    if (listener != null) {
                        listener.onError("Failed to receive data: " + e.getMessage());
                    }
                    disconnect();
                    break;
                }
            }
        });
    }

    /**
     * 启动心跳机制
     */
    private void startHeartbeat() {
        heartbeatExecutor.scheduleAtFixedRate(() -> {
            if (isConnected) {
                sendData("HEARTBEAT".getBytes()); // 发送心跳包
            }
        }, HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
    }

    /**
     * 停止心跳机制
     */
    private void stopHeartbeat() {
        heartbeatExecutor.shutdown();
    }

    /**
     * 安排重连
     */
    private void scheduleReconnect() {
        executorService.schedule(this::connectInternal, RECONNECT_DELAY, TimeUnit.MILLISECONDS);
    }

    /**
     * 是否已连接
     */
    public boolean isConnected() {
        return isConnected;
    }

    /**
     * 关闭线程池
     */
    public void shutdown() {
        executorService.shutdown();
        heartbeatExecutor.shutdown();
    }

    /**
     * TCP事件监听器
     */
    public interface TcpListener {
        void onConnected(); // 连接成功
        void onDisconnected(); // 断开连接
        void onDataReceived(byte[] data); // 接收到数据
        void onError(String error); // 发生错误
    }
}

2. 在Activity中使用

import android.os.Bundle;
import android.util.Log;
import androidx.appcompat.app.AppCompatActivity;

public class MainActivity extends AppCompatActivity implements TcpClient.TcpListener {

    private static final String SERVER_IP = "192.168.1.100"; // 服务器IP
    private static final int SERVER_PORT = 8080; // 服务器端口
    private TcpClient tcpClient;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        // 初始化TCP客户端
        tcpClient = new TcpClient(this);

        // 连接到服务器
        tcpClient.connect(SERVER_IP, SERVER_PORT);

        // 发送数据
        String message = "Hello, Server!";
        tcpClient.sendData(message.getBytes());
    }

    @Override
    public void onConnected() {
        Log.d("TcpClient", "Connected to server");
    }

    @Override
    public void onDisconnected() {
        Log.d("TcpClient", "Disconnected from server");
    }

    @Override
    public void onDataReceived(byte[] data) {
        String message = new String(data);
        Log.d("TcpClient", "Received data: " + message);
    }

    @Override
    public void onError(String error) {
        Log.e("TcpClient", "Error: " + error);
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        // 断开连接并释放资源
        if (tcpClient != null) {
            tcpClient.disconnect();
            tcpClient.shutdown();
        }
    }
}

进一步优化(Kotlin版本)

import android.os.Bundle
import android.util.Log
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import java.io.IOException
import java.io.InputStream
import java.io.OutputStream
import java.net.InetSocketAddress
import java.net.Socket

class MainActivity : AppCompatActivity(), TcpClient.TcpListener {

    private val serverIp = "192.168.1.100" // 服务器IP
    private val serverPort = 8080 // 服务器端口
    private lateinit var tcpClient: TcpClient

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // 初始化TCP客户端
        tcpClient = TcpClient(this)

        // 连接到服务器
        tcpClient.connect(serverIp, serverPort)

        // 发送数据
        val message = "Hello, Server!"
        tcpClient.sendData(message.toByteArray())
    }

    override fun onConnected() {
        Log.d("TcpClient", "Connected to server")
    }

    override fun onDisconnected() {
        Log.d("TcpClient", "Disconnected from server")
    }

    override fun onDataReceived(data: ByteArray) {
        val message = String(data)
        Log.d("TcpClient", "Received data: $message")
    }

    override fun onError(error: String) {
        Log.e("TcpClient", "Error: $error")
    }

    override fun onDestroy() {
        super.onDestroy()
        // 断开连接并释放资源
        tcpClient.disconnect()
        tcpClient.shutdown()
    }
}

class TcpClient(private val listener: TcpListener) {

    private var socket: Socket? = null
    private var inputStream: InputStream? = null
    private var outputStream: OutputStream? = null
    private var isConnected = false
    private val scope = CoroutineScope(Dispatchers.IO)
    private var heartbeatJob: Job? = null

    fun connect(ip: String, port: Int) {
        scope.launch {
            try {
                socket = Socket().apply {
                    connect(InetSocketAddress(ip, port), 5000) // 5秒超时
                }
                inputStream = socket?.getInputStream()
                outputStream = socket?.getOutputStream()
                isConnected = true

                withContext(Dispatchers.Main) {
                    listener.onConnected()
                }

                receiveData()
                startHeartbeat()
            } catch (e: IOException) {
                withContext(Dispatchers.Main) {
                    listener.onError("Connection failed: ${e.message}")
                }
                scheduleReconnect()
            }
        }
    }

    fun disconnect() {
        scope.launch {
            try {
                socket?.close()
                inputStream?.close()
                outputStream?.close()
                isConnected = false

                withContext(Dispatchers.Main) {
                    listener.onDisconnected()
                }
            } catch (e: IOException) {
                withContext(Dispatchers.Main) {
                    listener.onError("Disconnect error: ${e.message}")
                }
            } finally {
                stopHeartbeat()
            }
        }
    }

    fun sendData(data: ByteArray) {
        if (!isConnected || outputStream == null) {
            Log.e("TcpClient", "Not connected to server")
            return
        }

        scope.launch {
            try {
                outputStream?.write(data)
                outputStream?.flush()
                Log.d("TcpClient", "Data sent successfully")
            } catch (e: IOException) {
                withContext(Dispatchers.Main) {
                    listener.onError("Failed to send data: ${e.message}")
                }
                disconnect()
            }
        }
    }

    private fun receiveData() {
        scope.launch {
            val buffer = ByteArray(1024)
            var bytesRead: Int

            while (isConnected) {
                try {
                    bytesRead = inputStream?.read(buffer) ?: -1
                    if (bytesRead == -1) {
                        disconnect()
                        break
                    }

                    val receivedData = buffer.copyOf(bytesRead)
                    withContext(Dispatchers.Main) {
                        listener.onDataReceived(receivedData)
                    }
                } catch (e: IOException) {
                    withContext(Dispatchers.Main) {
                        listener.onError("Failed to receive data: ${e.message}")
                    }
                    disconnect()
                    break
                }
            }
        }
    }

    private fun startHeartbeat() {
        heartbeatJob = scope.launch {
            while (isConnected) {
                sendData("HEARTBEAT".toByteArray())
                delay(10000) // 10秒间隔
            }
        }
    }

    private fun stopHeartbeat() {
        heartbeatJob?.cancel()
    }

    private fun scheduleReconnect() {
        scope.launch {
            delay(3000) // 3秒后重连
            connect(socket?.inetAddress?.hostAddress ?: "", socket?.port ?: 0)
        }
    }

    fun shutdown() {
        scope.cancel()
    }

    interface TcpListener {
        fun onConnected()
        fun onDisconnected()
        fun onDataReceived(data: ByteArray)
        fun onError(error: String)
    }
}