Veloris.
返回索引
设计实战 2026-02-14

Python Socket通信:TCP/UDP编程,用Python写网络调试工具

2 分钟
653 words

Python Socket通信:TCP/UDP编程,用Python写网络调试工具

Socket是网络通信的基础,掌握TCP/UDP通信对于开发网络工具、与设备通信等场景非常有用。本篇将介绍Python的socket编程,包括TCP和UDP通信的实现。


1. Socket基础概念

import socket

# Socket = IP地址 + 端口号
# 用于标识网络中的一个通信端点

# 创建Socket
# socket.socket(family, type)
# family: AF_INET (IPv4), AF_INET6 (IPv6)
# type: SOCK_STREAM (TCP), SOCK_DGRAM (UDP)

# TCP Socket
tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# UDP Socket
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

# 常用方法
# bind((host, port))  - 绑定地址
# listen(backlog)     - 开始监听(TCP)
# accept()            - 接受连接(TCP)
# connect((host, port)) - 连接服务器
# send(data)          - 发送数据
# recv(bufsize)       - 接收数据
# sendto(data, addr)  - 发送到指定地址(UDP)
# recvfrom(bufsize)   - 接收并获取发送方地址(UDP)
# close()             - 关闭连接

2. TCP通信

TCP是面向连接的可靠传输协议。

2.1 TCP服务器

import socket

def tcp_server(host='0.0.0.0', port=8888):
    """简单的TCP服务器"""
    # 创建Socket
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    # 设置地址重用(避免"Address already in use"错误)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    
    # 绑定地址
    server_socket.bind((host, port))
    
    # 开始监听
    server_socket.listen(5)  # 最大等待连接数
    print(f"TCP服务器启动,监听 {host}:{port}")
    
    try:
        while True:
            # 接受连接
            client_socket, client_addr = server_socket.accept()
            print(f"客户端连接:{client_addr}")
            
            try:
                while True:
                    # 接收数据
                    data = client_socket.recv(1024)
                    if not data:
                        break
                    
                    message = data.decode('utf-8')
                    print(f"收到:{message}")
                    
                    # 发送响应
                    response = f"服务器收到:{message}"
                    client_socket.send(response.encode('utf-8'))
                    
            except Exception as e:
                print(f"通信错误:{e}")
            finally:
                client_socket.close()
                print(f"客户端断开:{client_addr}")
                
    except KeyboardInterrupt:
        print("\n服务器关闭")
    finally:
        server_socket.close()

# tcp_server()

2.2 TCP客户端

import socket

def tcp_client(host='127.0.0.1', port=8888):
    """简单的TCP客户端"""
    # 创建Socket
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    try:
        # 连接服务器
        client_socket.connect((host, port))
        print(f"已连接到 {host}:{port}")
        
        while True:
            # 发送数据
            message = input("输入消息(quit退出):")
            if message.lower() == 'quit':
                break
            
            client_socket.send(message.encode('utf-8'))
            
            # 接收响应
            response = client_socket.recv(1024)
            print(f"服务器响应:{response.decode('utf-8')}")
            
    except Exception as e:
        print(f"连接错误:{e}")
    finally:
        client_socket.close()
        print("连接已关闭")

# tcp_client()

2.3 多客户端服务器

import socket
import threading

class TCPServer:
    """支持多客户端的TCP服务器"""
    
    def __init__(self, host='0.0.0.0', port=8888):
        self.host = host
        self.port = port
        self.server_socket = None
        self.clients = {}  # {addr: socket}
        self.running = False
    
    def start(self):
        """启动服务器"""
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind((self.host, self.port))
        self.server_socket.listen(5)
        self.running = True
        
        print(f"服务器启动:{self.host}:{self.port}")
        
        try:
            while self.running:
                client_socket, client_addr = self.server_socket.accept()
                self.clients[client_addr] = client_socket
                print(f"新连接:{client_addr},当前连接数:{len(self.clients)}")
                
                # 为每个客户端创建线程
                thread = threading.Thread(
                    target=self.handle_client,
                    args=(client_socket, client_addr)
                )
                thread.daemon = True
                thread.start()
                
        except Exception as e:
            print(f"服务器错误:{e}")
        finally:
            self.stop()
    
    def handle_client(self, client_socket, client_addr):
        """处理客户端连接"""
        try:
            while self.running:
                data = client_socket.recv(1024)
                if not data:
                    break
                
                message = data.decode('utf-8')
                print(f"[{client_addr}] {message}")
                
                # 处理消息
                response = self.process_message(message)
                client_socket.send(response.encode('utf-8'))
                
        except Exception as e:
            print(f"客户端错误 {client_addr}{e}")
        finally:
            if client_addr in self.clients:
                del self.clients[client_addr]
            client_socket.close()
            print(f"断开连接:{client_addr}")
    
    def process_message(self, message):
        """处理消息(可重写)"""
        return f"Echo: {message}"
    
    def broadcast(self, message):
        """广播消息给所有客户端"""
        for addr, sock in list(self.clients.items()):
            try:
                sock.send(message.encode('utf-8'))
            except:
                del self.clients[addr]
    
    def stop(self):
        """停止服务器"""
        self.running = False
        for sock in self.clients.values():
            sock.close()
        if self.server_socket:
            self.server_socket.close()
        print("服务器已停止")

# 使用
# server = TCPServer()
# server.start()

3. UDP通信

UDP是无连接的不可靠传输协议,但速度快。

3.1 UDP服务器

import socket

def udp_server(host='0.0.0.0', port=8888):
    """UDP服务器"""
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    server_socket.bind((host, port))
    
    print(f"UDP服务器启动,监听 {host}:{port}")
    
    try:
        while True:
            # 接收数据和发送方地址
            data, client_addr = server_socket.recvfrom(1024)
            message = data.decode('utf-8')
            print(f"收到来自 {client_addr} 的消息:{message}")
            
            # 发送响应
            response = f"服务器收到:{message}"
            server_socket.sendto(response.encode('utf-8'), client_addr)
            
    except KeyboardInterrupt:
        print("\n服务器关闭")
    finally:
        server_socket.close()

# udp_server()

3.2 UDP客户端

import socket

def udp_client(host='127.0.0.1', port=8888):
    """UDP客户端"""
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    
    try:
        while True:
            message = input("输入消息(quit退出):")
            if message.lower() == 'quit':
                break
            
            # 发送数据
            client_socket.sendto(message.encode('utf-8'), (host, port))
            
            # 接收响应
            data, server_addr = client_socket.recvfrom(1024)
            print(f"服务器响应:{data.decode('utf-8')}")
            
    except Exception as e:
        print(f"错误:{e}")
    finally:
        client_socket.close()

# udp_client()

4. TCP vs UDP对比

特性TCPUDP
连接面向连接无连接
可靠性可靠(确认、重传)不可靠
顺序保证顺序不保证顺序
速度较慢较快
开销较大较小
适用场景文件传输、HTTP视频流、DNS、游戏

5. 数据打包与解包

import struct

# struct模块用于处理二进制数据
# 格式字符:
# b/B: signed/unsigned char (1字节)
# h/H: signed/unsigned short (2字节)
# i/I: signed/unsigned int (4字节)
# q/Q: signed/unsigned long long (8字节)
# f: float (4字节)
# d: double (8字节)
# s: char[] (字符串)
# <: 小端序
# >: 大端序
# !: 网络序(大端)

# 打包数据
def pack_message(msg_type, msg_id, data):
    """打包消息"""
    # 消息格式:类型(1字节) + ID(4字节) + 长度(4字节) + 数据
    data_bytes = data.encode('utf-8')
    header = struct.pack('!BII', msg_type, msg_id, len(data_bytes))
    return header + data_bytes

# 解包数据
def unpack_message(packet):
    """解包消息"""
    header_size = struct.calcsize('!BII')
    header = packet[:header_size]
    msg_type, msg_id, data_len = struct.unpack('!BII', header)
    data = packet[header_size:header_size + data_len].decode('utf-8')
    return msg_type, msg_id, data

# 示例
packet = pack_message(1, 12345, "Hello, World!")
print(f"打包后:{packet.hex()}")

msg_type, msg_id, data = unpack_message(packet)
print(f"解包后:type={msg_type}, id={msg_id}, data={data}")

# 定长消息接收
def recv_exact(sock, size):
    """接收指定长度的数据"""
    data = b''
    while len(data) < size:
        chunk = sock.recv(size - len(data))
        if not chunk:
            raise ConnectionError("连接断开")
        data += chunk
    return data

# 带长度前缀的消息
def send_message(sock, message):
    """发送带长度前缀的消息"""
    data = message.encode('utf-8')
    length = struct.pack('!I', len(data))
    sock.sendall(length + data)

def recv_message(sock):
    """接收带长度前缀的消息"""
    length_data = recv_exact(sock, 4)
    length = struct.unpack('!I', length_data)[0]
    data = recv_exact(sock, length)
    return data.decode('utf-8')

6. 超时与异常处理

import socket

def robust_tcp_client(host, port, timeout=5):
    """带超时和异常处理的TCP客户端"""
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client_socket.settimeout(timeout)
    
    try:
        # 连接
        client_socket.connect((host, port))
        print("连接成功")
        
        # 发送
        client_socket.send(b"Hello")
        
        # 接收
        data = client_socket.recv(1024)
        print(f"收到:{data}")
        
    except socket.timeout:
        print("连接超时")
    except socket.error as e:
        print(f"Socket错误:{e}")
    except ConnectionRefusedError:
        print("连接被拒绝")
    except ConnectionResetError:
        print("连接被重置")
    except Exception as e:
        print(f"未知错误:{e}")
    finally:
        client_socket.close()

# 非阻塞Socket
def non_blocking_example():
    """非阻塞Socket示例"""
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setblocking(False)
    
    try:
        sock.connect(('127.0.0.1', 8888))
    except BlockingIOError:
        pass  # 非阻塞连接会立即返回
    
    # 使用select检查状态
    import select
    readable, writable, _ = select.select([], [sock], [], 5)
    
    if writable:
        print("连接成功")
    else:
        print("连接超时")

7. 实用工具类

import socket
import struct
import json
import threading
from typing import Callable, Optional

class TCPClient:
    """TCP客户端工具类"""
    
    def __init__(self, host: str, port: int, timeout: float = 10):
        self.host = host
        self.port = port
        self.timeout = timeout
        self.socket: Optional[socket.socket] = None
        self.connected = False
    
    def connect(self) -> bool:
        """连接服务器"""
        try:
            self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.socket.settimeout(self.timeout)
            self.socket.connect((self.host, self.port))
            self.connected = True
            return True
        except Exception as e:
            print(f"连接失败:{e}")
            return False
    
    def disconnect(self):
        """断开连接"""
        if self.socket:
            self.socket.close()
            self.socket = None
        self.connected = False
    
    def send(self, data: bytes) -> bool:
        """发送数据"""
        if not self.connected:
            return False
        try:
            self.socket.sendall(data)
            return True
        except Exception as e:
            print(f"发送失败:{e}")
            self.connected = False
            return False
    
    def recv(self, size: int = 1024) -> Optional[bytes]:
        """接收数据"""
        if not self.connected:
            return None
        try:
            return self.socket.recv(size)
        except socket.timeout:
            return None
        except Exception as e:
            print(f"接收失败:{e}")
            self.connected = False
            return None
    
    def send_json(self, data: dict) -> bool:
        """发送JSON数据"""
        json_str = json.dumps(data)
        return self.send(json_str.encode('utf-8'))
    
    def recv_json(self) -> Optional[dict]:
        """接收JSON数据"""
        data = self.recv()
        if data:
            return json.loads(data.decode('utf-8'))
        return None
    
    def __enter__(self):
        self.connect()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.disconnect()

# 使用
# with TCPClient('127.0.0.1', 8888) as client:
#     client.send(b'Hello')
#     response = client.recv()
#     print(response)

8. 实战案例

案例:简单的命令服务器

"""
实战案例:命令服务器
支持多客户端,可执行简单命令
"""
import socket
import threading
import json
from datetime import datetime

class CommandServer:
    """命令服务器"""
    
    def __init__(self, host='0.0.0.0', port=9999):
        self.host = host
        self.port = port
        self.server_socket = None
        self.running = False
        self.clients = {}
        
        # 注册命令
        self.commands = {
            'time': self.cmd_time,
            'echo': self.cmd_echo,
            'list': self.cmd_list,
            'help': self.cmd_help,
        }
    
    def start(self):
        """启动服务器"""
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_socket.bind((self.host, self.port))
        self.server_socket.listen(5)
        self.running = True
        
        print(f"命令服务器启动:{self.host}:{self.port}")
        print("可用命令:", list(self.commands.keys()))
        
        while self.running:
            try:
                client_socket, addr = self.server_socket.accept()
                self.clients[addr] = client_socket
                
                thread = threading.Thread(
                    target=self.handle_client,
                    args=(client_socket, addr)
                )
                thread.daemon = True
                thread.start()
                
            except Exception as e:
                if self.running:
                    print(f"错误:{e}")
    
    def handle_client(self, sock, addr):
        """处理客户端"""
        print(f"客户端连接:{addr}")
        sock.send(b"Welcome! Type 'help' for commands.\n")
        
        try:
            while self.running:
                data = sock.recv(1024)
                if not data:
                    break
                
                command_line = data.decode('utf-8').strip()
                if not command_line:
                    continue
                
                # 解析命令
                parts = command_line.split(' ', 1)
                cmd = parts[0].lower()
                args = parts[1] if len(parts) > 1 else ''
                
                # 执行命令
                if cmd in self.commands:
                    result = self.commands[cmd](args, addr)
                else:
                    result = f"Unknown command: {cmd}"
                
                sock.send(f"{result}\n".encode('utf-8'))
                
        except Exception as e:
            print(f"客户端错误 {addr}{e}")
        finally:
            if addr in self.clients:
                del self.clients[addr]
            sock.close()
            print(f"客户端断开:{addr}")
    
    # 命令实现
    def cmd_time(self, args, addr):
        return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    
    def cmd_echo(self, args, addr):
        return args if args else "Usage: echo <message>"
    
    def cmd_list(self, args, addr):
        return f"Connected clients: {len(self.clients)}"
    
    def cmd_help(self, args, addr):
        return "Available commands: " + ", ".join(self.commands.keys())
    
    def stop(self):
        """停止服务器"""
        self.running = False
        for sock in self.clients.values():
            sock.close()
        if self.server_socket:
            self.server_socket.close()

# 启动服务器
# server = CommandServer()
# server.start()

# 客户端测试
def test_client():
    """测试客户端"""
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(('127.0.0.1', 9999))
    
    # 接收欢迎消息
    print(sock.recv(1024).decode())
    
    # 发送命令
    commands = ['help', 'time', 'echo Hello World', 'list']
    for cmd in commands:
        sock.send(f"{cmd}\n".encode())
        response = sock.recv(1024).decode()
        print(f"> {cmd}\n{response}")
    
    sock.close()

# test_client()

9. 常见问题

❌ 问题1:Address already in use

# 解决:设置地址重用
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

❌ 问题2:粘包问题

# TCP是流协议,可能多个消息粘在一起
# 解决:使用定长消息或长度前缀

# 方法1:定长消息
data = sock.recv(1024)
while len(data) < expected_length:
    data += sock.recv(1024)

# 方法2:长度前缀(推荐)
# 先发送4字节长度,再发送数据

❌ 问题3:连接超时

# 设置超时
sock.settimeout(5.0)

# 或使用select
import select
readable, _, _ = select.select([sock], [], [], 5.0)
if readable:
    data = sock.recv(1024)

10. 总结

🔑 核心要点

知识点要点
TCP面向连接,可靠,适合文件传输
UDP无连接,快速,适合实时数据
服务器bind → listen → accept → recv/send
客户端connect → send/recv
数据打包struct模块处理二进制数据
多客户端使用threading处理并发

✅ 学习检查清单

  • 能创建TCP服务器和客户端
  • 能创建UDP服务器和客户端
  • 理解TCP和UDP的区别
  • 能使用struct打包解包数据
  • 能处理多客户端连接

📖 下一步学习

掌握了Socket通信后,让我们学习定时任务与自动化脚本:


常见问题 FAQ

💬 TCP和UDP怎么选?

需要可靠传输(文件传输、命令控制)用TCP,对实时性要求高且容忍丢包(视频流、传感器数据)用UDP。嵌入式设备通信大多用TCP。

💬 Socket通信和HTTP有什么关系?

HTTP是基于TCP Socket的应用层协议。Socket是底层传输,HTTP是上层约定。如果你只需要请求网页API,用requests库;如果需要自定义协议(如和FPGA板卡通信),用Socket。


📘 系列导航

End of file.