Python二进制协议解析:struct模块拆解数据帧,CRC校验一步到位
在FPGA开发中,经常需要处理二进制数据协议,如UART/SPI/I2C通信协议、自定义数据帧格式等。本篇将介绍如何使用Python解析和构建二进制协议,以及常用的校验算法。
1. struct模块详解
import struct
# struct格式字符
# 字节序:
# @: 本机字节序(默认)
# <: 小端序(Little-endian)
# >: 大端序(Big-endian)
# !: 网络序(大端)
# 数据类型:
# x: 填充字节
# b/B: signed/unsigned char (1字节)
# h/H: signed/unsigned short (2字节)
# i/I: signed/unsigned int (4字节)
# l/L: signed/unsigned long (4字节)
# q/Q: signed/unsigned long long (8字节)
# f: float (4字节)
# d: double (8字节)
# s: char[] (字符串)
# p: pascal string
# P: pointer
# 打包数据
data = struct.pack('>BHI', 0x01, 0x1234, 0x12345678)
print(f"打包结果:{data.hex()}") # 01123412345678
# 解包数据
values = struct.unpack('>BHI', data)
print(f"解包结果:{values}") # (1, 4660, 305419896)
# 计算格式大小
size = struct.calcsize('>BHI')
print(f"数据大小:{size} 字节") # 7
# 打包多个相同类型
data = struct.pack('>4H', 1, 2, 3, 4) # 4个unsigned short
print(data.hex()) # 0001000200030004
# 解包到指定位置
buffer = bytearray(10)
struct.pack_into('>HI', buffer, 2, 0x1234, 0x56789ABC)
print(buffer.hex()) # 00001234567899bc0000
# 从指定位置解包
values = struct.unpack_from('>HI', buffer, 2)
print(values) # (4660, 1450744508)
# 迭代解包
data = bytes.fromhex('0001000200030004')
for value in struct.iter_unpack('>H', data):
print(value) # (1,) (2,) (3,) (4,)
2. 字节操作
# bytes和bytearray
# bytes: 不可变
# bytearray: 可变
# 创建
b1 = b'\x01\x02\x03'
b2 = bytes([1, 2, 3])
b3 = bytes.fromhex('010203')
b4 = bytearray(10) # 10个零字节
# 转换
hex_str = b1.hex() # '010203'
hex_str = b1.hex(' ') # '01 02 03'
b5 = bytes.fromhex('01 02 03')
# 索引和切片
print(b1[0]) # 1 (整数)
print(b1[0:2]) # b'\x01\x02'
# 修改(bytearray)
ba = bytearray(b'\x01\x02\x03')
ba[0] = 0xFF
ba.append(0x04)
ba.extend([0x05, 0x06])
ba.insert(0, 0x00)
# 查找
idx = b1.find(b'\x02') # 1
count = b1.count(b'\x01') # 1
# 拼接
b6 = b1 + b2
b7 = b'\x00'.join([b1, b2, b3])
# 整数与字节转换
num = 0x12345678
b8 = num.to_bytes(4, 'big') # b'\x12\x34\x56\x78'
b9 = num.to_bytes(4, 'little') # b'\x78\x56\x34\x12'
num2 = int.from_bytes(b8, 'big') # 305419896
# 有符号整数
signed_num = (-100).to_bytes(2, 'big', signed=True)
value = int.from_bytes(signed_num, 'big', signed=True) # -100
3. 位操作
# 位操作在协议解析中非常常用
# 基本位操作
a = 0b11010110
b = 0b10101010
print(f"AND: {a & b:08b}") # 10000010
print(f"OR: {a | b:08b}") # 11111110
print(f"XOR: {a ^ b:08b}") # 01111100
print(f"NOT: {~a & 0xFF:08b}") # 00101001
print(f"左移: {a << 2:08b}") # 01011000 (溢出)
print(f"右移: {a >> 2:08b}") # 00110101
# 提取特定位
def get_bit(value, bit_pos):
"""获取指定位"""
return (value >> bit_pos) & 1
def get_bits(value, start, length):
"""获取位域"""
mask = (1 << length) - 1
return (value >> start) & mask
# 设置特定位
def set_bit(value, bit_pos):
"""设置指定位为1"""
return value | (1 << bit_pos)
def clear_bit(value, bit_pos):
"""清除指定位为0"""
return value & ~(1 << bit_pos)
def toggle_bit(value, bit_pos):
"""翻转指定位"""
return value ^ (1 << bit_pos)
def set_bits(value, start, length, new_value):
"""设置位域"""
mask = ((1 << length) - 1) << start
return (value & ~mask) | ((new_value << start) & mask)
# 示例:解析状态寄存器
status = 0b11010110
# bit 0: 使能
# bit 1-2: 模式
# bit 3: 错误标志
# bit 4-7: 计数器
enabled = get_bit(status, 0) # 0
mode = get_bits(status, 1, 2) # 3 (0b11)
error = get_bit(status, 3) # 0
counter = get_bits(status, 4, 4) # 13 (0b1101)
print(f"使能: {enabled}, 模式: {mode}, 错误: {error}, 计数: {counter}")
# 位域类
class BitField:
"""位域操作类"""
def __init__(self, value=0):
self.value = value
def get(self, start, length=1):
mask = (1 << length) - 1
return (self.value >> start) & mask
def set(self, start, length, new_value):
mask = ((1 << length) - 1) << start
self.value = (self.value & ~mask) | ((new_value << start) & mask)
return self
def __repr__(self):
return f"BitField(0x{self.value:X})"
# 使用
bf = BitField(0)
bf.set(0, 1, 1) # 设置bit0为1
bf.set(1, 2, 3) # 设置bit1-2为3
bf.set(4, 4, 10) # 设置bit4-7为10
print(f"值: 0x{bf.value:02X}, 二进制: {bf.value:08b}")
4. 常用校验算法
# 校验和(Checksum)
def checksum_8(data: bytes) -> int:
"""8位校验和"""
return sum(data) & 0xFF
def checksum_16(data: bytes) -> int:
"""16位校验和"""
return sum(data) & 0xFFFF
# 异或校验(XOR)
def xor_checksum(data: bytes) -> int:
"""异或校验"""
result = 0
for byte in data:
result ^= byte
return result
# CRC-8
def crc8(data: bytes, poly: int = 0x07, init: int = 0x00) -> int:
"""CRC-8校验"""
crc = init
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x80:
crc = ((crc << 1) ^ poly) & 0xFF
else:
crc = (crc << 1) & 0xFF
return crc
# CRC-16 (Modbus)
def crc16_modbus(data: bytes) -> int:
"""CRC-16 Modbus"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc
# CRC-32
import binascii
def crc32(data: bytes) -> int:
"""CRC-32校验"""
return binascii.crc32(data) & 0xFFFFFFFF
# 使用查表法的CRC(更快)
class CRC16:
"""CRC-16查表法"""
def __init__(self, poly: int = 0x8005, init: int = 0xFFFF):
self.poly = poly
self.init = init
self.table = self._generate_table()
def _generate_table(self):
table = []
for i in range(256):
crc = i << 8
for _ in range(8):
if crc & 0x8000:
crc = ((crc << 1) ^ self.poly) & 0xFFFF
else:
crc = (crc << 1) & 0xFFFF
table.append(crc)
return table
def calculate(self, data: bytes) -> int:
crc = self.init
for byte in data:
crc = ((crc << 8) ^ self.table[((crc >> 8) ^ byte) & 0xFF]) & 0xFFFF
return crc
# 测试
data = b'\x01\x02\x03\x04\x05'
print(f"Checksum-8: 0x{checksum_8(data):02X}")
print(f"XOR: 0x{xor_checksum(data):02X}")
print(f"CRC-8: 0x{crc8(data):02X}")
print(f"CRC-16 Modbus: 0x{crc16_modbus(data):04X}")
print(f"CRC-32: 0x{crc32(data):08X}")
5. 协议设计模式
from dataclasses import dataclass
from typing import Optional, List
import struct
# 典型协议帧格式
# | 帧头 | 长度 | 命令 | 数据 | 校验 |
# | 2B | 1B | 1B | NB | 1B |
@dataclass
class ProtocolFrame:
"""协议帧"""
HEADER = 0xAA55
command: int
data: bytes
def encode(self) -> bytes:
"""编码为字节"""
length = len(self.data)
packet = struct.pack('>HBB', self.HEADER, length, self.command)
packet += self.data
packet += bytes([checksum_8(packet)])
return packet
@classmethod
def decode(cls, data: bytes) -> Optional['ProtocolFrame']:
"""从字节解码"""
if len(data) < 5:
return None
header, length, command = struct.unpack('>HBB', data[:4])
if header != cls.HEADER:
return None
if len(data) < 5 + length:
return None
payload = data[4:4+length]
checksum = data[4+length]
# 验证校验和
if checksum != checksum_8(data[:4+length]):
return None
return cls(command=command, data=payload)
# 使用
frame = ProtocolFrame(command=0x01, data=b'\x00\x01\x02')
encoded = frame.encode()
print(f"编码:{encoded.hex()}")
decoded = ProtocolFrame.decode(encoded)
print(f"解码:cmd={decoded.command}, data={decoded.data.hex()}")
# 更复杂的协议定义
class Protocol:
"""协议定义"""
# 命令定义
CMD_READ = 0x01
CMD_WRITE = 0x02
CMD_ACK = 0x80
CMD_NAK = 0x81
# 错误码
ERR_OK = 0x00
ERR_CHECKSUM = 0x01
ERR_TIMEOUT = 0x02
ERR_INVALID = 0x03
@staticmethod
def build_read_cmd(addr: int, length: int) -> bytes:
"""构建读命令"""
data = struct.pack('>HH', addr, length)
return ProtocolFrame(Protocol.CMD_READ, data).encode()
@staticmethod
def build_write_cmd(addr: int, data: bytes) -> bytes:
"""构建写命令"""
payload = struct.pack('>H', addr) + data
return ProtocolFrame(Protocol.CMD_WRITE, payload).encode()
@staticmethod
def parse_response(data: bytes) -> dict:
"""解析响应"""
frame = ProtocolFrame.decode(data)
if not frame:
return {'error': 'Invalid frame'}
if frame.command == Protocol.CMD_ACK:
return {'status': 'ok', 'data': frame.data}
elif frame.command == Protocol.CMD_NAK:
error_code = frame.data[0] if frame.data else 0xFF
return {'status': 'error', 'code': error_code}
else:
return {'status': 'unknown', 'command': frame.command}
6. 协议解析器
from enum import Enum, auto
from typing import Callable, Optional
import struct
class ParserState(Enum):
"""解析器状态"""
WAIT_HEADER1 = auto()
WAIT_HEADER2 = auto()
WAIT_LENGTH = auto()
WAIT_COMMAND = auto()
WAIT_DATA = auto()
WAIT_CHECKSUM = auto()
class ProtocolParser:
"""协议解析器(状态机)"""
HEADER1 = 0xAA
HEADER2 = 0x55
def __init__(self, on_frame: Callable[[bytes, int, bytes], None] = None):
self.state = ParserState.WAIT_HEADER1
self.buffer = bytearray()
self.length = 0
self.command = 0
self.on_frame = on_frame
self.frame_count = 0
self.error_count = 0
def feed(self, data: bytes):
"""输入数据"""
for byte in data:
self._process_byte(byte)
def _process_byte(self, byte: int):
"""处理单个字节"""
if self.state == ParserState.WAIT_HEADER1:
if byte == self.HEADER1:
self.buffer = bytearray([byte])
self.state = ParserState.WAIT_HEADER2
elif self.state == ParserState.WAIT_HEADER2:
if byte == self.HEADER2:
self.buffer.append(byte)
self.state = ParserState.WAIT_LENGTH
else:
self.state = ParserState.WAIT_HEADER1
elif self.state == ParserState.WAIT_LENGTH:
self.length = byte
self.buffer.append(byte)
self.state = ParserState.WAIT_COMMAND
elif self.state == ParserState.WAIT_COMMAND:
self.command = byte
self.buffer.append(byte)
if self.length > 0:
self.state = ParserState.WAIT_DATA
else:
self.state = ParserState.WAIT_CHECKSUM
elif self.state == ParserState.WAIT_DATA:
self.buffer.append(byte)
if len(self.buffer) >= 4 + self.length:
self.state = ParserState.WAIT_CHECKSUM
elif self.state == ParserState.WAIT_CHECKSUM:
expected = sum(self.buffer) & 0xFF
if byte == expected:
self.frame_count += 1
payload = bytes(self.buffer[4:4+self.length])
if self.on_frame:
self.on_frame(self.buffer, self.command, payload)
else:
self.error_count += 1
self.state = ParserState.WAIT_HEADER1
def reset(self):
"""重置解析器"""
self.state = ParserState.WAIT_HEADER1
self.buffer = bytearray()
self.length = 0
self.command = 0
# 使用示例
def on_frame_received(raw: bytes, cmd: int, data: bytes):
print(f"收到帧:cmd=0x{cmd:02X}, data={data.hex()}")
parser = ProtocolParser(on_frame=on_frame_received)
# 模拟接收数据(可能分片)
test_data = bytes.fromhex('AA5503010102030F') # 完整帧
parser.feed(test_data[:3]) # 分片1
parser.feed(test_data[3:]) # 分片2
print(f"帧计数:{parser.frame_count}, 错误计数:{parser.error_count}")
7. 常见协议示例
# Modbus RTU协议
class ModbusRTU:
"""Modbus RTU协议"""
# 功能码
READ_COILS = 0x01
READ_DISCRETE_INPUTS = 0x02
READ_HOLDING_REGISTERS = 0x03
READ_INPUT_REGISTERS = 0x04
WRITE_SINGLE_COIL = 0x05
WRITE_SINGLE_REGISTER = 0x06
WRITE_MULTIPLE_REGISTERS = 0x10
@staticmethod
def build_read_registers(slave_id: int, start_addr: int, count: int) -> bytes:
"""构建读寄存器请求"""
data = struct.pack('>BBHH', slave_id, ModbusRTU.READ_HOLDING_REGISTERS,
start_addr, count)
crc = crc16_modbus(data)
return data + struct.pack('<H', crc) # CRC是小端序
@staticmethod
def parse_read_response(data: bytes) -> dict:
"""解析读响应"""
if len(data) < 5:
return {'error': 'Too short'}
slave_id = data[0]
func_code = data[1]
if func_code & 0x80: # 异常响应
return {'error': 'Exception', 'code': data[2]}
byte_count = data[2]
registers = []
for i in range(byte_count // 2):
value = struct.unpack('>H', data[3+i*2:5+i*2])[0]
registers.append(value)
return {'slave_id': slave_id, 'registers': registers}
# I2C协议帧
class I2CFrame:
"""I2C协议帧"""
@staticmethod
def build_write(device_addr: int, reg_addr: int, data: bytes) -> bytes:
"""构建写操作"""
return bytes([device_addr << 1, reg_addr]) + data
@staticmethod
def build_read(device_addr: int, reg_addr: int, length: int) -> tuple:
"""构建读操作(返回写和读两个帧)"""
write_frame = bytes([device_addr << 1, reg_addr])
read_frame = bytes([(device_addr << 1) | 1])
return write_frame, read_frame, length
# SPI协议帧
class SPIFrame:
"""SPI协议帧"""
@staticmethod
def build_write(reg_addr: int, data: bytes) -> bytes:
"""构建写操作(地址最高位为0表示写)"""
return bytes([reg_addr & 0x7F]) + data
@staticmethod
def build_read(reg_addr: int, length: int) -> bytes:
"""构建读操作(地址最高位为1表示读)"""
return bytes([reg_addr | 0x80]) + bytes(length)
# CAN协议帧
@dataclass
class CANFrame:
"""CAN协议帧"""
id: int # 11位或29位ID
data: bytes # 0-8字节数据
extended: bool = False # 是否扩展帧
remote: bool = False # 是否远程帧
def encode(self) -> bytes:
"""编码为字节(简化格式)"""
flags = (self.extended << 1) | self.remote
if self.extended:
header = struct.pack('>BI', flags, self.id)
else:
header = struct.pack('>BH', flags, self.id)
return header + bytes([len(self.data)]) + self.data
8. 实战案例
案例:FPGA寄存器读写协议
"""
实战案例:FPGA寄存器读写协议实现
协议格式:
请求:AA 55 CMD LEN ADDR(2B) [DATA] CHECKSUM
响应:AA 55 CMD LEN [DATA] CHECKSUM
"""
import struct
from dataclasses import dataclass
from typing import Optional, Union
from enum import IntEnum
class Command(IntEnum):
"""命令定义"""
READ_REG = 0x01
WRITE_REG = 0x02
READ_MEM = 0x03
WRITE_MEM = 0x04
ACK = 0x80
NAK = 0x81
class ErrorCode(IntEnum):
"""错误码"""
OK = 0x00
INVALID_CMD = 0x01
INVALID_ADDR = 0x02
CHECKSUM_ERR = 0x03
TIMEOUT = 0x04
@dataclass
class RegisterAccess:
"""寄存器访问"""
address: int
value: Optional[int] = None
width: int = 32 # 位宽:8, 16, 32
class FPGAProtocol:
"""FPGA通信协议"""
HEADER = 0xAA55
def __init__(self):
self.parser = ProtocolParser(on_frame=self._on_frame)
self.pending_response = None
def _on_frame(self, raw: bytes, cmd: int, data: bytes):
"""帧接收回调"""
self.pending_response = (cmd, data)
def build_read_reg(self, addr: int, width: int = 32) -> bytes:
"""构建读寄存器命令"""
data = struct.pack('>HB', addr, width // 8)
return self._build_frame(Command.READ_REG, data)
def build_write_reg(self, addr: int, value: int, width: int = 32) -> bytes:
"""构建写寄存器命令"""
if width == 8:
data = struct.pack('>HB', addr, value)
elif width == 16:
data = struct.pack('>HH', addr, value)
else:
data = struct.pack('>HI', addr, value)
return self._build_frame(Command.WRITE_REG, data)
def build_read_mem(self, addr: int, length: int) -> bytes:
"""构建读内存命令"""
data = struct.pack('>HH', addr, length)
return self._build_frame(Command.READ_MEM, data)
def build_write_mem(self, addr: int, data: bytes) -> bytes:
"""构建写内存命令"""
payload = struct.pack('>H', addr) + data
return self._build_frame(Command.WRITE_MEM, payload)
def _build_frame(self, cmd: int, data: bytes) -> bytes:
"""构建帧"""
frame = struct.pack('>HBB', self.HEADER, len(data), cmd)
frame += data
frame += bytes([sum(frame) & 0xFF])
return frame
def parse_response(self, data: bytes) -> dict:
"""解析响应"""
self.pending_response = None
self.parser.feed(data)
if not self.pending_response:
return {'error': 'No valid frame'}
cmd, payload = self.pending_response
if cmd == Command.ACK:
return {'status': 'ok', 'data': payload}
elif cmd == Command.NAK:
error_code = payload[0] if payload else 0xFF
return {'status': 'error', 'code': ErrorCode(error_code).name}
else:
return {'status': 'unknown', 'cmd': cmd}
def parse_read_reg_response(self, data: bytes, width: int = 32) -> Optional[int]:
"""解析读寄存器响应"""
result = self.parse_response(data)
if result.get('status') != 'ok':
return None
payload = result['data']
if width == 8:
return payload[0]
elif width == 16:
return struct.unpack('>H', payload[:2])[0]
else:
return struct.unpack('>I', payload[:4])[0]
# 完整的通信示例
class FPGADevice:
"""FPGA设备"""
def __init__(self, serial_port):
self.serial = serial_port
self.protocol = FPGAProtocol()
def read_register(self, addr: int, width: int = 32) -> Optional[int]:
"""读寄存器"""
cmd = self.protocol.build_read_reg(addr, width)
self.serial.write(cmd)
response = self.serial.read(100)
return self.protocol.parse_read_reg_response(response, width)
def write_register(self, addr: int, value: int, width: int = 32) -> bool:
"""写寄存器"""
cmd = self.protocol.build_write_reg(addr, value, width)
self.serial.write(cmd)
response = self.serial.read(100)
result = self.protocol.parse_response(response)
return result.get('status') == 'ok'
def read_memory(self, addr: int, length: int) -> Optional[bytes]:
"""读内存"""
cmd = self.protocol.build_read_mem(addr, length)
self.serial.write(cmd)
response = self.serial.read(length + 10)
result = self.protocol.parse_response(response)
if result.get('status') == 'ok':
return result['data']
return None
# 测试
def test_protocol():
protocol = FPGAProtocol()
# 构建命令
read_cmd = protocol.build_read_reg(0x0000, 32)
print(f"读寄存器命令:{read_cmd.hex()}")
write_cmd = protocol.build_write_reg(0x0004, 0x12345678, 32)
print(f"写寄存器命令:{write_cmd.hex()}")
# 模拟响应
response = bytes.fromhex('AA55800412345678XX') # XX是校验和
# 计算正确的校验和
checksum = sum(response[:-1]) & 0xFF
response = response[:-1] + bytes([checksum])
result = protocol.parse_response(response)
print(f"解析结果:{result}")
test_protocol()
9. 总结
🔑 核心要点
| 知识点 | 要点 |
|---|---|
| struct | 二进制数据打包解包 |
| 字节操作 | bytes/bytearray,整数转换 |
| 位操作 | 位域提取、设置、掩码 |
| 校验算法 | Checksum, XOR, CRC |
| 协议设计 | 帧头+长度+命令+数据+校验 |
| 状态机 | 解析分片数据 |
✅ 学习检查清单
- 掌握struct模块的使用
- 能进行字节和位操作
- 了解常用校验算法
- 能设计简单的通信协议
- 能实现协议解析器
📖 下一步学习
掌握了二进制协议解析后,让我们学习NumPy数据处理:
常见问题 FAQ
💬 struct的格式字符串怎么记?
常用的就几个:B(uint8)、H(uint16)、I(uint32)、f(float)、s(bytes)。前缀<小端、>大端。和C的类型对应很直观。
💬 CRC校验和校验和(checksum)怎么选?
简单场景用校验和(异或或累加),检错能力有限但实现简单。需要高可靠性用CRC-16或CRC-32,能检测突发错误。FPGA通信建议至少用CRC-8。
� 系列导航
- 上一篇:22 - Python串口通信
- 当前:23 - Python二进制协议解析与校验
- 下一篇:24 - Python NumPy数据处理