Veloris.
返回索引
设计实战 2026-02-14

Python二进制协议解析:struct模块拆解数据帧,CRC校验一步到位

2 分钟
478 words

Python二进制协议解析:struct模块拆解数据帧,CRC校验一步到位

在FPGA开发中,经常需要处理二进制数据协议,如UART/SPI/I2C通信协议、自定义数据帧格式等。本篇将介绍如何使用Python解析和构建二进制协议,以及常用的校验算法。


1. struct模块详解

import struct

# struct格式字符
# 字节序:
#   @: 本机字节序(默认)
#   <: 小端序(Little-endian)
#   >: 大端序(Big-endian)
#   !: 网络序(大端)

# 数据类型:
#   x: 填充字节
#   b/B: signed/unsigned char (1字节)
#   h/H: signed/unsigned short (2字节)
#   i/I: signed/unsigned int (4字节)
#   l/L: signed/unsigned long (4字节)
#   q/Q: signed/unsigned long long (8字节)
#   f: float (4字节)
#   d: double (8字节)
#   s: char[] (字符串)
#   p: pascal string
#   P: pointer

# 打包数据
data = struct.pack('>BHI', 0x01, 0x1234, 0x12345678)
print(f"打包结果:{data.hex()}")  # 01123412345678

# 解包数据
values = struct.unpack('>BHI', data)
print(f"解包结果:{values}")  # (1, 4660, 305419896)

# 计算格式大小
size = struct.calcsize('>BHI')
print(f"数据大小:{size} 字节")  # 7

# 打包多个相同类型
data = struct.pack('>4H', 1, 2, 3, 4)  # 4个unsigned short
print(data.hex())  # 0001000200030004

# 解包到指定位置
buffer = bytearray(10)
struct.pack_into('>HI', buffer, 2, 0x1234, 0x56789ABC)
print(buffer.hex())  # 00001234567899bc0000

# 从指定位置解包
values = struct.unpack_from('>HI', buffer, 2)
print(values)  # (4660, 1450744508)

# 迭代解包
data = bytes.fromhex('0001000200030004')
for value in struct.iter_unpack('>H', data):
    print(value)  # (1,) (2,) (3,) (4,)

2. 字节操作

# bytes和bytearray
# bytes: 不可变
# bytearray: 可变

# 创建
b1 = b'\x01\x02\x03'
b2 = bytes([1, 2, 3])
b3 = bytes.fromhex('010203')
b4 = bytearray(10)  # 10个零字节

# 转换
hex_str = b1.hex()           # '010203'
hex_str = b1.hex(' ')        # '01 02 03'
b5 = bytes.fromhex('01 02 03')

# 索引和切片
print(b1[0])      # 1 (整数)
print(b1[0:2])    # b'\x01\x02'

# 修改(bytearray)
ba = bytearray(b'\x01\x02\x03')
ba[0] = 0xFF
ba.append(0x04)
ba.extend([0x05, 0x06])
ba.insert(0, 0x00)

# 查找
idx = b1.find(b'\x02')  # 1
count = b1.count(b'\x01')  # 1

# 拼接
b6 = b1 + b2
b7 = b'\x00'.join([b1, b2, b3])

# 整数与字节转换
num = 0x12345678
b8 = num.to_bytes(4, 'big')     # b'\x12\x34\x56\x78'
b9 = num.to_bytes(4, 'little')  # b'\x78\x56\x34\x12'

num2 = int.from_bytes(b8, 'big')  # 305419896

# 有符号整数
signed_num = (-100).to_bytes(2, 'big', signed=True)
value = int.from_bytes(signed_num, 'big', signed=True)  # -100

3. 位操作

# 位操作在协议解析中非常常用

# 基本位操作
a = 0b11010110
b = 0b10101010

print(f"AND: {a & b:08b}")   # 10000010
print(f"OR:  {a | b:08b}")   # 11111110
print(f"XOR: {a ^ b:08b}")   # 01111100
print(f"NOT: {~a & 0xFF:08b}")  # 00101001
print(f"左移: {a << 2:08b}")  # 01011000 (溢出)
print(f"右移: {a >> 2:08b}")  # 00110101

# 提取特定位
def get_bit(value, bit_pos):
    """获取指定位"""
    return (value >> bit_pos) & 1

def get_bits(value, start, length):
    """获取位域"""
    mask = (1 << length) - 1
    return (value >> start) & mask

# 设置特定位
def set_bit(value, bit_pos):
    """设置指定位为1"""
    return value | (1 << bit_pos)

def clear_bit(value, bit_pos):
    """清除指定位为0"""
    return value & ~(1 << bit_pos)

def toggle_bit(value, bit_pos):
    """翻转指定位"""
    return value ^ (1 << bit_pos)

def set_bits(value, start, length, new_value):
    """设置位域"""
    mask = ((1 << length) - 1) << start
    return (value & ~mask) | ((new_value << start) & mask)

# 示例:解析状态寄存器
status = 0b11010110
# bit 0: 使能
# bit 1-2: 模式
# bit 3: 错误标志
# bit 4-7: 计数器

enabled = get_bit(status, 0)      # 0
mode = get_bits(status, 1, 2)     # 3 (0b11)
error = get_bit(status, 3)        # 0
counter = get_bits(status, 4, 4)  # 13 (0b1101)

print(f"使能: {enabled}, 模式: {mode}, 错误: {error}, 计数: {counter}")

# 位域类
class BitField:
    """位域操作类"""
    
    def __init__(self, value=0):
        self.value = value
    
    def get(self, start, length=1):
        mask = (1 << length) - 1
        return (self.value >> start) & mask
    
    def set(self, start, length, new_value):
        mask = ((1 << length) - 1) << start
        self.value = (self.value & ~mask) | ((new_value << start) & mask)
        return self
    
    def __repr__(self):
        return f"BitField(0x{self.value:X})"

# 使用
bf = BitField(0)
bf.set(0, 1, 1)    # 设置bit0为1
bf.set(1, 2, 3)    # 设置bit1-2为3
bf.set(4, 4, 10)   # 设置bit4-7为10
print(f"值: 0x{bf.value:02X}, 二进制: {bf.value:08b}")

4. 常用校验算法

# 校验和(Checksum)
def checksum_8(data: bytes) -> int:
    """8位校验和"""
    return sum(data) & 0xFF

def checksum_16(data: bytes) -> int:
    """16位校验和"""
    return sum(data) & 0xFFFF

# 异或校验(XOR)
def xor_checksum(data: bytes) -> int:
    """异或校验"""
    result = 0
    for byte in data:
        result ^= byte
    return result

# CRC-8
def crc8(data: bytes, poly: int = 0x07, init: int = 0x00) -> int:
    """CRC-8校验"""
    crc = init
    for byte in data:
        crc ^= byte
        for _ in range(8):
            if crc & 0x80:
                crc = ((crc << 1) ^ poly) & 0xFF
            else:
                crc = (crc << 1) & 0xFF
    return crc

# CRC-16 (Modbus)
def crc16_modbus(data: bytes) -> int:
    """CRC-16 Modbus"""
    crc = 0xFFFF
    for byte in data:
        crc ^= byte
        for _ in range(8):
            if crc & 0x0001:
                crc = (crc >> 1) ^ 0xA001
            else:
                crc >>= 1
    return crc

# CRC-32
import binascii

def crc32(data: bytes) -> int:
    """CRC-32校验"""
    return binascii.crc32(data) & 0xFFFFFFFF

# 使用查表法的CRC(更快)
class CRC16:
    """CRC-16查表法"""
    
    def __init__(self, poly: int = 0x8005, init: int = 0xFFFF):
        self.poly = poly
        self.init = init
        self.table = self._generate_table()
    
    def _generate_table(self):
        table = []
        for i in range(256):
            crc = i << 8
            for _ in range(8):
                if crc & 0x8000:
                    crc = ((crc << 1) ^ self.poly) & 0xFFFF
                else:
                    crc = (crc << 1) & 0xFFFF
            table.append(crc)
        return table
    
    def calculate(self, data: bytes) -> int:
        crc = self.init
        for byte in data:
            crc = ((crc << 8) ^ self.table[((crc >> 8) ^ byte) & 0xFF]) & 0xFFFF
        return crc

# 测试
data = b'\x01\x02\x03\x04\x05'
print(f"Checksum-8: 0x{checksum_8(data):02X}")
print(f"XOR: 0x{xor_checksum(data):02X}")
print(f"CRC-8: 0x{crc8(data):02X}")
print(f"CRC-16 Modbus: 0x{crc16_modbus(data):04X}")
print(f"CRC-32: 0x{crc32(data):08X}")

5. 协议设计模式

from dataclasses import dataclass
from typing import Optional, List
import struct

# 典型协议帧格式
# | 帧头 | 长度 | 命令 | 数据 | 校验 |
# | 2B   | 1B   | 1B   | NB   | 1B   |

@dataclass
class ProtocolFrame:
    """协议帧"""
    HEADER = 0xAA55
    
    command: int
    data: bytes
    
    def encode(self) -> bytes:
        """编码为字节"""
        length = len(self.data)
        packet = struct.pack('>HBB', self.HEADER, length, self.command)
        packet += self.data
        packet += bytes([checksum_8(packet)])
        return packet
    
    @classmethod
    def decode(cls, data: bytes) -> Optional['ProtocolFrame']:
        """从字节解码"""
        if len(data) < 5:
            return None
        
        header, length, command = struct.unpack('>HBB', data[:4])
        
        if header != cls.HEADER:
            return None
        
        if len(data) < 5 + length:
            return None
        
        payload = data[4:4+length]
        checksum = data[4+length]
        
        # 验证校验和
        if checksum != checksum_8(data[:4+length]):
            return None
        
        return cls(command=command, data=payload)

# 使用
frame = ProtocolFrame(command=0x01, data=b'\x00\x01\x02')
encoded = frame.encode()
print(f"编码:{encoded.hex()}")

decoded = ProtocolFrame.decode(encoded)
print(f"解码:cmd={decoded.command}, data={decoded.data.hex()}")

# 更复杂的协议定义
class Protocol:
    """协议定义"""
    
    # 命令定义
    CMD_READ = 0x01
    CMD_WRITE = 0x02
    CMD_ACK = 0x80
    CMD_NAK = 0x81
    
    # 错误码
    ERR_OK = 0x00
    ERR_CHECKSUM = 0x01
    ERR_TIMEOUT = 0x02
    ERR_INVALID = 0x03
    
    @staticmethod
    def build_read_cmd(addr: int, length: int) -> bytes:
        """构建读命令"""
        data = struct.pack('>HH', addr, length)
        return ProtocolFrame(Protocol.CMD_READ, data).encode()
    
    @staticmethod
    def build_write_cmd(addr: int, data: bytes) -> bytes:
        """构建写命令"""
        payload = struct.pack('>H', addr) + data
        return ProtocolFrame(Protocol.CMD_WRITE, payload).encode()
    
    @staticmethod
    def parse_response(data: bytes) -> dict:
        """解析响应"""
        frame = ProtocolFrame.decode(data)
        if not frame:
            return {'error': 'Invalid frame'}
        
        if frame.command == Protocol.CMD_ACK:
            return {'status': 'ok', 'data': frame.data}
        elif frame.command == Protocol.CMD_NAK:
            error_code = frame.data[0] if frame.data else 0xFF
            return {'status': 'error', 'code': error_code}
        else:
            return {'status': 'unknown', 'command': frame.command}

6. 协议解析器

from enum import Enum, auto
from typing import Callable, Optional
import struct

class ParserState(Enum):
    """解析器状态"""
    WAIT_HEADER1 = auto()
    WAIT_HEADER2 = auto()
    WAIT_LENGTH = auto()
    WAIT_COMMAND = auto()
    WAIT_DATA = auto()
    WAIT_CHECKSUM = auto()

class ProtocolParser:
    """协议解析器(状态机)"""
    
    HEADER1 = 0xAA
    HEADER2 = 0x55
    
    def __init__(self, on_frame: Callable[[bytes, int, bytes], None] = None):
        self.state = ParserState.WAIT_HEADER1
        self.buffer = bytearray()
        self.length = 0
        self.command = 0
        self.on_frame = on_frame
        self.frame_count = 0
        self.error_count = 0
    
    def feed(self, data: bytes):
        """输入数据"""
        for byte in data:
            self._process_byte(byte)
    
    def _process_byte(self, byte: int):
        """处理单个字节"""
        if self.state == ParserState.WAIT_HEADER1:
            if byte == self.HEADER1:
                self.buffer = bytearray([byte])
                self.state = ParserState.WAIT_HEADER2
        
        elif self.state == ParserState.WAIT_HEADER2:
            if byte == self.HEADER2:
                self.buffer.append(byte)
                self.state = ParserState.WAIT_LENGTH
            else:
                self.state = ParserState.WAIT_HEADER1
        
        elif self.state == ParserState.WAIT_LENGTH:
            self.length = byte
            self.buffer.append(byte)
            self.state = ParserState.WAIT_COMMAND
        
        elif self.state == ParserState.WAIT_COMMAND:
            self.command = byte
            self.buffer.append(byte)
            if self.length > 0:
                self.state = ParserState.WAIT_DATA
            else:
                self.state = ParserState.WAIT_CHECKSUM
        
        elif self.state == ParserState.WAIT_DATA:
            self.buffer.append(byte)
            if len(self.buffer) >= 4 + self.length:
                self.state = ParserState.WAIT_CHECKSUM
        
        elif self.state == ParserState.WAIT_CHECKSUM:
            expected = sum(self.buffer) & 0xFF
            if byte == expected:
                self.frame_count += 1
                payload = bytes(self.buffer[4:4+self.length])
                if self.on_frame:
                    self.on_frame(self.buffer, self.command, payload)
            else:
                self.error_count += 1
            
            self.state = ParserState.WAIT_HEADER1
    
    def reset(self):
        """重置解析器"""
        self.state = ParserState.WAIT_HEADER1
        self.buffer = bytearray()
        self.length = 0
        self.command = 0

# 使用示例
def on_frame_received(raw: bytes, cmd: int, data: bytes):
    print(f"收到帧:cmd=0x{cmd:02X}, data={data.hex()}")

parser = ProtocolParser(on_frame=on_frame_received)

# 模拟接收数据(可能分片)
test_data = bytes.fromhex('AA5503010102030F')  # 完整帧
parser.feed(test_data[:3])  # 分片1
parser.feed(test_data[3:])  # 分片2

print(f"帧计数:{parser.frame_count}, 错误计数:{parser.error_count}")

7. 常见协议示例

# Modbus RTU协议
class ModbusRTU:
    """Modbus RTU协议"""
    
    # 功能码
    READ_COILS = 0x01
    READ_DISCRETE_INPUTS = 0x02
    READ_HOLDING_REGISTERS = 0x03
    READ_INPUT_REGISTERS = 0x04
    WRITE_SINGLE_COIL = 0x05
    WRITE_SINGLE_REGISTER = 0x06
    WRITE_MULTIPLE_REGISTERS = 0x10
    
    @staticmethod
    def build_read_registers(slave_id: int, start_addr: int, count: int) -> bytes:
        """构建读寄存器请求"""
        data = struct.pack('>BBHH', slave_id, ModbusRTU.READ_HOLDING_REGISTERS,
                          start_addr, count)
        crc = crc16_modbus(data)
        return data + struct.pack('<H', crc)  # CRC是小端序
    
    @staticmethod
    def parse_read_response(data: bytes) -> dict:
        """解析读响应"""
        if len(data) < 5:
            return {'error': 'Too short'}
        
        slave_id = data[0]
        func_code = data[1]
        
        if func_code & 0x80:  # 异常响应
            return {'error': 'Exception', 'code': data[2]}
        
        byte_count = data[2]
        registers = []
        
        for i in range(byte_count // 2):
            value = struct.unpack('>H', data[3+i*2:5+i*2])[0]
            registers.append(value)
        
        return {'slave_id': slave_id, 'registers': registers}

# I2C协议帧
class I2CFrame:
    """I2C协议帧"""
    
    @staticmethod
    def build_write(device_addr: int, reg_addr: int, data: bytes) -> bytes:
        """构建写操作"""
        return bytes([device_addr << 1, reg_addr]) + data
    
    @staticmethod
    def build_read(device_addr: int, reg_addr: int, length: int) -> tuple:
        """构建读操作(返回写和读两个帧)"""
        write_frame = bytes([device_addr << 1, reg_addr])
        read_frame = bytes([(device_addr << 1) | 1])
        return write_frame, read_frame, length

# SPI协议帧
class SPIFrame:
    """SPI协议帧"""
    
    @staticmethod
    def build_write(reg_addr: int, data: bytes) -> bytes:
        """构建写操作(地址最高位为0表示写)"""
        return bytes([reg_addr & 0x7F]) + data
    
    @staticmethod
    def build_read(reg_addr: int, length: int) -> bytes:
        """构建读操作(地址最高位为1表示读)"""
        return bytes([reg_addr | 0x80]) + bytes(length)

# CAN协议帧
@dataclass
class CANFrame:
    """CAN协议帧"""
    id: int           # 11位或29位ID
    data: bytes       # 0-8字节数据
    extended: bool = False  # 是否扩展帧
    remote: bool = False    # 是否远程帧
    
    def encode(self) -> bytes:
        """编码为字节(简化格式)"""
        flags = (self.extended << 1) | self.remote
        if self.extended:
            header = struct.pack('>BI', flags, self.id)
        else:
            header = struct.pack('>BH', flags, self.id)
        return header + bytes([len(self.data)]) + self.data

8. 实战案例

案例:FPGA寄存器读写协议

"""
实战案例:FPGA寄存器读写协议实现
协议格式:
  请求:AA 55 CMD LEN ADDR(2B) [DATA] CHECKSUM
  响应:AA 55 CMD LEN [DATA] CHECKSUM
"""
import struct
from dataclasses import dataclass
from typing import Optional, Union
from enum import IntEnum

class Command(IntEnum):
    """命令定义"""
    READ_REG = 0x01
    WRITE_REG = 0x02
    READ_MEM = 0x03
    WRITE_MEM = 0x04
    ACK = 0x80
    NAK = 0x81

class ErrorCode(IntEnum):
    """错误码"""
    OK = 0x00
    INVALID_CMD = 0x01
    INVALID_ADDR = 0x02
    CHECKSUM_ERR = 0x03
    TIMEOUT = 0x04

@dataclass
class RegisterAccess:
    """寄存器访问"""
    address: int
    value: Optional[int] = None
    width: int = 32  # 位宽:8, 16, 32

class FPGAProtocol:
    """FPGA通信协议"""
    
    HEADER = 0xAA55
    
    def __init__(self):
        self.parser = ProtocolParser(on_frame=self._on_frame)
        self.pending_response = None
    
    def _on_frame(self, raw: bytes, cmd: int, data: bytes):
        """帧接收回调"""
        self.pending_response = (cmd, data)
    
    def build_read_reg(self, addr: int, width: int = 32) -> bytes:
        """构建读寄存器命令"""
        data = struct.pack('>HB', addr, width // 8)
        return self._build_frame(Command.READ_REG, data)
    
    def build_write_reg(self, addr: int, value: int, width: int = 32) -> bytes:
        """构建写寄存器命令"""
        if width == 8:
            data = struct.pack('>HB', addr, value)
        elif width == 16:
            data = struct.pack('>HH', addr, value)
        else:
            data = struct.pack('>HI', addr, value)
        return self._build_frame(Command.WRITE_REG, data)
    
    def build_read_mem(self, addr: int, length: int) -> bytes:
        """构建读内存命令"""
        data = struct.pack('>HH', addr, length)
        return self._build_frame(Command.READ_MEM, data)
    
    def build_write_mem(self, addr: int, data: bytes) -> bytes:
        """构建写内存命令"""
        payload = struct.pack('>H', addr) + data
        return self._build_frame(Command.WRITE_MEM, payload)
    
    def _build_frame(self, cmd: int, data: bytes) -> bytes:
        """构建帧"""
        frame = struct.pack('>HBB', self.HEADER, len(data), cmd)
        frame += data
        frame += bytes([sum(frame) & 0xFF])
        return frame
    
    def parse_response(self, data: bytes) -> dict:
        """解析响应"""
        self.pending_response = None
        self.parser.feed(data)
        
        if not self.pending_response:
            return {'error': 'No valid frame'}
        
        cmd, payload = self.pending_response
        
        if cmd == Command.ACK:
            return {'status': 'ok', 'data': payload}
        elif cmd == Command.NAK:
            error_code = payload[0] if payload else 0xFF
            return {'status': 'error', 'code': ErrorCode(error_code).name}
        else:
            return {'status': 'unknown', 'cmd': cmd}
    
    def parse_read_reg_response(self, data: bytes, width: int = 32) -> Optional[int]:
        """解析读寄存器响应"""
        result = self.parse_response(data)
        if result.get('status') != 'ok':
            return None
        
        payload = result['data']
        if width == 8:
            return payload[0]
        elif width == 16:
            return struct.unpack('>H', payload[:2])[0]
        else:
            return struct.unpack('>I', payload[:4])[0]

# 完整的通信示例
class FPGADevice:
    """FPGA设备"""
    
    def __init__(self, serial_port):
        self.serial = serial_port
        self.protocol = FPGAProtocol()
    
    def read_register(self, addr: int, width: int = 32) -> Optional[int]:
        """读寄存器"""
        cmd = self.protocol.build_read_reg(addr, width)
        self.serial.write(cmd)
        
        response = self.serial.read(100)
        return self.protocol.parse_read_reg_response(response, width)
    
    def write_register(self, addr: int, value: int, width: int = 32) -> bool:
        """写寄存器"""
        cmd = self.protocol.build_write_reg(addr, value, width)
        self.serial.write(cmd)
        
        response = self.serial.read(100)
        result = self.protocol.parse_response(response)
        return result.get('status') == 'ok'
    
    def read_memory(self, addr: int, length: int) -> Optional[bytes]:
        """读内存"""
        cmd = self.protocol.build_read_mem(addr, length)
        self.serial.write(cmd)
        
        response = self.serial.read(length + 10)
        result = self.protocol.parse_response(response)
        if result.get('status') == 'ok':
            return result['data']
        return None

# 测试
def test_protocol():
    protocol = FPGAProtocol()
    
    # 构建命令
    read_cmd = protocol.build_read_reg(0x0000, 32)
    print(f"读寄存器命令:{read_cmd.hex()}")
    
    write_cmd = protocol.build_write_reg(0x0004, 0x12345678, 32)
    print(f"写寄存器命令:{write_cmd.hex()}")
    
    # 模拟响应
    response = bytes.fromhex('AA55800412345678XX')  # XX是校验和
    # 计算正确的校验和
    checksum = sum(response[:-1]) & 0xFF
    response = response[:-1] + bytes([checksum])
    
    result = protocol.parse_response(response)
    print(f"解析结果:{result}")

test_protocol()

9. 总结

🔑 核心要点

知识点要点
struct二进制数据打包解包
字节操作bytes/bytearray,整数转换
位操作位域提取、设置、掩码
校验算法Checksum, XOR, CRC
协议设计帧头+长度+命令+数据+校验
状态机解析分片数据

✅ 学习检查清单

  • 掌握struct模块的使用
  • 能进行字节和位操作
  • 了解常用校验算法
  • 能设计简单的通信协议
  • 能实现协议解析器

📖 下一步学习

掌握了二进制协议解析后,让我们学习NumPy数据处理:


常见问题 FAQ

💬 struct的格式字符串怎么记?

常用的就几个:B(uint8)、H(uint16)、I(uint32)、f(float)、s(bytes)。前缀<小端、>大端。和C的类型对应很直观。

💬 CRC校验和校验和(checksum)怎么选?

简单场景用校验和(异或或累加),检错能力有限但实现简单。需要高可靠性用CRC-16或CRC-32,能检测突发错误。FPGA通信建议至少用CRC-8。


系列导航

End of file.