python socket TCP/UDP/MULTICAST 收发示例

发布于:2024-09-19 ⋅ 阅读:(127) ⋅ 点赞:(0)

python socket TCP/UDP/MULTICAST 收发示例

一、接收端

import socket
import struct


def tcp_onece_receiver(port):
    """只接收一次 TCP 消息"""
    # 创建 TCP 套接字
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # 绑定到本地端口
    sock.bind(('', port))

    # 监听连接
    sock.listen(1)

    print('接收 TCP 消息中...')

    while True:
        conn, addr = sock.accept()
        data = conn.recv(1024)
        print(f'接收到来自 {addr} 的 TCP 消息: {data}')
        conn.close()


def tcp_receiver(port):
    # 创建 TCP 套接字
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # 绑定到本地端口
    sock.bind(('', port))

    # 监听连接
    sock.listen(1)

    print('接收 TCP 消息中...')

    conn, addr = sock.accept()
    while True:
        data = conn.recv(1024)

        print(f'接收到来自 {addr} 的 TCP 消息: {data}')

        if data == b'':
            break
    conn.close()


def udp_receiver(port):
    # 创建 UDP 套接字
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    # 绑定到本地端口
    sock.bind(('', port))

    print('接收 UDP 消息中...')

    while True:
        data, addr = sock.recvfrom(1024)
        print(f'接收到来自 {addr} 的 UDP 消息: {data}, {data}')


def multicast_receiver(group, port):
    # 创建 UDP 套接字
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    # 绑定到组播地址和端口
    sock.bind(('', port))

    # 加入组播组
    mreq = struct.pack('4sl', socket.inet_aton(group), socket.INADDR_ANY)
    sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)

    print('接收组播消息中...')

    while True:
        data, addr = sock.recvfrom(1024)
        print(f'接收到来自 {addr} 的组播消息: {data}')


if __name__ == "__main__":
    tcp_receiver(26000)
    # udp_receiver(26000)
    # multicast_receiver('224.2.2.1', 26000)

二、发送端

import socket
import struct


def tcp_sender(message: bytes, ip: str, port: int):
    # 创建 TCP 套接字
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    try:
        # 连接到指定地址
        sock.connect((ip, port))

        # 发送数据
        sock.sendall(message)

        print(f'已将 TCP 消息 {message} 发送到 {ip}:{port}')
    finally:
        sock.close()


def udp_sender(message: bytes, ip: str, port: int):
    # 创建 UDP 套接字
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    try:
        # 发送数据到指定地址
        sock.sendto(message, (ip, port))

        print(f'已将 UDP 消息 {message} 发送到 {ip}:{port}')
        from time import sleep
        sleep(5)
    finally:
        sock.close()


def multicast_sender(message, group, port):
    # 创建 UDP 套接字
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)

    # 设置TTL,控制数据包在网络中可以经过的最大跳数
    ttl = struct.pack('b', 1)
    sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)

    try:
        # 发送数据到组播地址
        sock.sendto(message, (group, port))

        print(f'已将组播消息 {message} 发送到 {group}:{port}')
    finally:
        sock.close()


if __name__ == "__main__":
    tcp_sender(b'hello', '127.0.0.1', 26000)
    # udp_sender(b'hello', '127.0.0.1', 26000)
    # multicast_sender(b'hello', '224.1.1.1', 26000)


网站公告

今日签到

点亮在社区的每一天
去签到