Python Socket通信:TCP/UDP编程,用Python写网络调试工具
Socket是网络通信的基础,掌握TCP/UDP通信对于开发网络工具、与设备通信等场景非常有用。本篇将介绍Python的socket编程,包括TCP和UDP通信的实现。
1. Socket基础概念
import socket
# Socket = IP地址 + 端口号
# 用于标识网络中的一个通信端点
# 创建Socket
# socket.socket(family, type)
# family: AF_INET (IPv4), AF_INET6 (IPv6)
# type: SOCK_STREAM (TCP), SOCK_DGRAM (UDP)
# TCP Socket
tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# UDP Socket
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 常用方法
# bind((host, port)) - 绑定地址
# listen(backlog) - 开始监听(TCP)
# accept() - 接受连接(TCP)
# connect((host, port)) - 连接服务器
# send(data) - 发送数据
# recv(bufsize) - 接收数据
# sendto(data, addr) - 发送到指定地址(UDP)
# recvfrom(bufsize) - 接收并获取发送方地址(UDP)
# close() - 关闭连接
2. TCP通信
TCP是面向连接的可靠传输协议。
2.1 TCP服务器
import socket
def tcp_server(host='0.0.0.0', port=8888):
"""简单的TCP服务器"""
# 创建Socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置地址重用(避免"Address already in use"错误)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 绑定地址
server_socket.bind((host, port))
# 开始监听
server_socket.listen(5) # 最大等待连接数
print(f"TCP服务器启动,监听 {host}:{port}")
try:
while True:
# 接受连接
client_socket, client_addr = server_socket.accept()
print(f"客户端连接:{client_addr}")
try:
while True:
# 接收数据
data = client_socket.recv(1024)
if not data:
break
message = data.decode('utf-8')
print(f"收到:{message}")
# 发送响应
response = f"服务器收到:{message}"
client_socket.send(response.encode('utf-8'))
except Exception as e:
print(f"通信错误:{e}")
finally:
client_socket.close()
print(f"客户端断开:{client_addr}")
except KeyboardInterrupt:
print("\n服务器关闭")
finally:
server_socket.close()
# tcp_server()
2.2 TCP客户端
import socket
def tcp_client(host='127.0.0.1', port=8888):
"""简单的TCP客户端"""
# 创建Socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# 连接服务器
client_socket.connect((host, port))
print(f"已连接到 {host}:{port}")
while True:
# 发送数据
message = input("输入消息(quit退出):")
if message.lower() == 'quit':
break
client_socket.send(message.encode('utf-8'))
# 接收响应
response = client_socket.recv(1024)
print(f"服务器响应:{response.decode('utf-8')}")
except Exception as e:
print(f"连接错误:{e}")
finally:
client_socket.close()
print("连接已关闭")
# tcp_client()
2.3 多客户端服务器
import socket
import threading
class TCPServer:
"""支持多客户端的TCP服务器"""
def __init__(self, host='0.0.0.0', port=8888):
self.host = host
self.port = port
self.server_socket = None
self.clients = {} # {addr: socket}
self.running = False
def start(self):
"""启动服务器"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
self.running = True
print(f"服务器启动:{self.host}:{self.port}")
try:
while self.running:
client_socket, client_addr = self.server_socket.accept()
self.clients[client_addr] = client_socket
print(f"新连接:{client_addr},当前连接数:{len(self.clients)}")
# 为每个客户端创建线程
thread = threading.Thread(
target=self.handle_client,
args=(client_socket, client_addr)
)
thread.daemon = True
thread.start()
except Exception as e:
print(f"服务器错误:{e}")
finally:
self.stop()
def handle_client(self, client_socket, client_addr):
"""处理客户端连接"""
try:
while self.running:
data = client_socket.recv(1024)
if not data:
break
message = data.decode('utf-8')
print(f"[{client_addr}] {message}")
# 处理消息
response = self.process_message(message)
client_socket.send(response.encode('utf-8'))
except Exception as e:
print(f"客户端错误 {client_addr}:{e}")
finally:
if client_addr in self.clients:
del self.clients[client_addr]
client_socket.close()
print(f"断开连接:{client_addr}")
def process_message(self, message):
"""处理消息(可重写)"""
return f"Echo: {message}"
def broadcast(self, message):
"""广播消息给所有客户端"""
for addr, sock in list(self.clients.items()):
try:
sock.send(message.encode('utf-8'))
except:
del self.clients[addr]
def stop(self):
"""停止服务器"""
self.running = False
for sock in self.clients.values():
sock.close()
if self.server_socket:
self.server_socket.close()
print("服务器已停止")
# 使用
# server = TCPServer()
# server.start()
3. UDP通信
UDP是无连接的不可靠传输协议,但速度快。
3.1 UDP服务器
import socket
def udp_server(host='0.0.0.0', port=8888):
"""UDP服务器"""
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.bind((host, port))
print(f"UDP服务器启动,监听 {host}:{port}")
try:
while True:
# 接收数据和发送方地址
data, client_addr = server_socket.recvfrom(1024)
message = data.decode('utf-8')
print(f"收到来自 {client_addr} 的消息:{message}")
# 发送响应
response = f"服务器收到:{message}"
server_socket.sendto(response.encode('utf-8'), client_addr)
except KeyboardInterrupt:
print("\n服务器关闭")
finally:
server_socket.close()
# udp_server()
3.2 UDP客户端
import socket
def udp_client(host='127.0.0.1', port=8888):
"""UDP客户端"""
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
while True:
message = input("输入消息(quit退出):")
if message.lower() == 'quit':
break
# 发送数据
client_socket.sendto(message.encode('utf-8'), (host, port))
# 接收响应
data, server_addr = client_socket.recvfrom(1024)
print(f"服务器响应:{data.decode('utf-8')}")
except Exception as e:
print(f"错误:{e}")
finally:
client_socket.close()
# udp_client()
4. TCP vs UDP对比
| 特性 | TCP | UDP |
|---|---|---|
| 连接 | 面向连接 | 无连接 |
| 可靠性 | 可靠(确认、重传) | 不可靠 |
| 顺序 | 保证顺序 | 不保证顺序 |
| 速度 | 较慢 | 较快 |
| 开销 | 较大 | 较小 |
| 适用场景 | 文件传输、HTTP | 视频流、DNS、游戏 |
5. 数据打包与解包
import struct
# struct模块用于处理二进制数据
# 格式字符:
# b/B: signed/unsigned char (1字节)
# h/H: signed/unsigned short (2字节)
# i/I: signed/unsigned int (4字节)
# q/Q: signed/unsigned long long (8字节)
# f: float (4字节)
# d: double (8字节)
# s: char[] (字符串)
# <: 小端序
# >: 大端序
# !: 网络序(大端)
# 打包数据
def pack_message(msg_type, msg_id, data):
"""打包消息"""
# 消息格式:类型(1字节) + ID(4字节) + 长度(4字节) + 数据
data_bytes = data.encode('utf-8')
header = struct.pack('!BII', msg_type, msg_id, len(data_bytes))
return header + data_bytes
# 解包数据
def unpack_message(packet):
"""解包消息"""
header_size = struct.calcsize('!BII')
header = packet[:header_size]
msg_type, msg_id, data_len = struct.unpack('!BII', header)
data = packet[header_size:header_size + data_len].decode('utf-8')
return msg_type, msg_id, data
# 示例
packet = pack_message(1, 12345, "Hello, World!")
print(f"打包后:{packet.hex()}")
msg_type, msg_id, data = unpack_message(packet)
print(f"解包后:type={msg_type}, id={msg_id}, data={data}")
# 定长消息接收
def recv_exact(sock, size):
"""接收指定长度的数据"""
data = b''
while len(data) < size:
chunk = sock.recv(size - len(data))
if not chunk:
raise ConnectionError("连接断开")
data += chunk
return data
# 带长度前缀的消息
def send_message(sock, message):
"""发送带长度前缀的消息"""
data = message.encode('utf-8')
length = struct.pack('!I', len(data))
sock.sendall(length + data)
def recv_message(sock):
"""接收带长度前缀的消息"""
length_data = recv_exact(sock, 4)
length = struct.unpack('!I', length_data)[0]
data = recv_exact(sock, length)
return data.decode('utf-8')
6. 超时与异常处理
import socket
def robust_tcp_client(host, port, timeout=5):
"""带超时和异常处理的TCP客户端"""
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.settimeout(timeout)
try:
# 连接
client_socket.connect((host, port))
print("连接成功")
# 发送
client_socket.send(b"Hello")
# 接收
data = client_socket.recv(1024)
print(f"收到:{data}")
except socket.timeout:
print("连接超时")
except socket.error as e:
print(f"Socket错误:{e}")
except ConnectionRefusedError:
print("连接被拒绝")
except ConnectionResetError:
print("连接被重置")
except Exception as e:
print(f"未知错误:{e}")
finally:
client_socket.close()
# 非阻塞Socket
def non_blocking_example():
"""非阻塞Socket示例"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(False)
try:
sock.connect(('127.0.0.1', 8888))
except BlockingIOError:
pass # 非阻塞连接会立即返回
# 使用select检查状态
import select
readable, writable, _ = select.select([], [sock], [], 5)
if writable:
print("连接成功")
else:
print("连接超时")
7. 实用工具类
import socket
import struct
import json
import threading
from typing import Callable, Optional
class TCPClient:
"""TCP客户端工具类"""
def __init__(self, host: str, port: int, timeout: float = 10):
self.host = host
self.port = port
self.timeout = timeout
self.socket: Optional[socket.socket] = None
self.connected = False
def connect(self) -> bool:
"""连接服务器"""
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(self.timeout)
self.socket.connect((self.host, self.port))
self.connected = True
return True
except Exception as e:
print(f"连接失败:{e}")
return False
def disconnect(self):
"""断开连接"""
if self.socket:
self.socket.close()
self.socket = None
self.connected = False
def send(self, data: bytes) -> bool:
"""发送数据"""
if not self.connected:
return False
try:
self.socket.sendall(data)
return True
except Exception as e:
print(f"发送失败:{e}")
self.connected = False
return False
def recv(self, size: int = 1024) -> Optional[bytes]:
"""接收数据"""
if not self.connected:
return None
try:
return self.socket.recv(size)
except socket.timeout:
return None
except Exception as e:
print(f"接收失败:{e}")
self.connected = False
return None
def send_json(self, data: dict) -> bool:
"""发送JSON数据"""
json_str = json.dumps(data)
return self.send(json_str.encode('utf-8'))
def recv_json(self) -> Optional[dict]:
"""接收JSON数据"""
data = self.recv()
if data:
return json.loads(data.decode('utf-8'))
return None
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.disconnect()
# 使用
# with TCPClient('127.0.0.1', 8888) as client:
# client.send(b'Hello')
# response = client.recv()
# print(response)
8. 实战案例
案例:简单的命令服务器
"""
实战案例:命令服务器
支持多客户端,可执行简单命令
"""
import socket
import threading
import json
from datetime import datetime
class CommandServer:
"""命令服务器"""
def __init__(self, host='0.0.0.0', port=9999):
self.host = host
self.port = port
self.server_socket = None
self.running = False
self.clients = {}
# 注册命令
self.commands = {
'time': self.cmd_time,
'echo': self.cmd_echo,
'list': self.cmd_list,
'help': self.cmd_help,
}
def start(self):
"""启动服务器"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
self.running = True
print(f"命令服务器启动:{self.host}:{self.port}")
print("可用命令:", list(self.commands.keys()))
while self.running:
try:
client_socket, addr = self.server_socket.accept()
self.clients[addr] = client_socket
thread = threading.Thread(
target=self.handle_client,
args=(client_socket, addr)
)
thread.daemon = True
thread.start()
except Exception as e:
if self.running:
print(f"错误:{e}")
def handle_client(self, sock, addr):
"""处理客户端"""
print(f"客户端连接:{addr}")
sock.send(b"Welcome! Type 'help' for commands.\n")
try:
while self.running:
data = sock.recv(1024)
if not data:
break
command_line = data.decode('utf-8').strip()
if not command_line:
continue
# 解析命令
parts = command_line.split(' ', 1)
cmd = parts[0].lower()
args = parts[1] if len(parts) > 1 else ''
# 执行命令
if cmd in self.commands:
result = self.commands[cmd](args, addr)
else:
result = f"Unknown command: {cmd}"
sock.send(f"{result}\n".encode('utf-8'))
except Exception as e:
print(f"客户端错误 {addr}:{e}")
finally:
if addr in self.clients:
del self.clients[addr]
sock.close()
print(f"客户端断开:{addr}")
# 命令实现
def cmd_time(self, args, addr):
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
def cmd_echo(self, args, addr):
return args if args else "Usage: echo <message>"
def cmd_list(self, args, addr):
return f"Connected clients: {len(self.clients)}"
def cmd_help(self, args, addr):
return "Available commands: " + ", ".join(self.commands.keys())
def stop(self):
"""停止服务器"""
self.running = False
for sock in self.clients.values():
sock.close()
if self.server_socket:
self.server_socket.close()
# 启动服务器
# server = CommandServer()
# server.start()
# 客户端测试
def test_client():
"""测试客户端"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('127.0.0.1', 9999))
# 接收欢迎消息
print(sock.recv(1024).decode())
# 发送命令
commands = ['help', 'time', 'echo Hello World', 'list']
for cmd in commands:
sock.send(f"{cmd}\n".encode())
response = sock.recv(1024).decode()
print(f"> {cmd}\n{response}")
sock.close()
# test_client()
9. 常见问题
❌ 问题1:Address already in use
# 解决:设置地址重用
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
❌ 问题2:粘包问题
# TCP是流协议,可能多个消息粘在一起
# 解决:使用定长消息或长度前缀
# 方法1:定长消息
data = sock.recv(1024)
while len(data) < expected_length:
data += sock.recv(1024)
# 方法2:长度前缀(推荐)
# 先发送4字节长度,再发送数据
❌ 问题3:连接超时
# 设置超时
sock.settimeout(5.0)
# 或使用select
import select
readable, _, _ = select.select([sock], [], [], 5.0)
if readable:
data = sock.recv(1024)
10. 总结
🔑 核心要点
| 知识点 | 要点 |
|---|---|
| TCP | 面向连接,可靠,适合文件传输 |
| UDP | 无连接,快速,适合实时数据 |
| 服务器 | bind → listen → accept → recv/send |
| 客户端 | connect → send/recv |
| 数据打包 | struct模块处理二进制数据 |
| 多客户端 | 使用threading处理并发 |
✅ 学习检查清单
- 能创建TCP服务器和客户端
- 能创建UDP服务器和客户端
- 理解TCP和UDP的区别
- 能使用struct打包解包数据
- 能处理多客户端连接
📖 下一步学习
掌握了Socket通信后,让我们学习定时任务与自动化脚本:
常见问题 FAQ
💬 TCP和UDP怎么选?
需要可靠传输(文件传输、命令控制)用TCP,对实时性要求高且容忍丢包(视频流、传感器数据)用UDP。嵌入式设备通信大多用TCP。
💬 Socket通信和HTTP有什么关系?
HTTP是基于TCP Socket的应用层协议。Socket是底层传输,HTTP是上层约定。如果你只需要请求网页API,用requests库;如果需要自定义协议(如和FPGA板卡通信),用Socket。
📘 系列导航
- 上一篇:19 - Python邮件自动化
- 当前:20 - Python Socket通信
- 下一篇:21 - Python定时任务与自动化脚本