Veloris.
返回索引
设计实战 2026-02-14

Python串口通信:pyserial连接FPGA/MCU,UART调试不再用串口助手

2 分钟
509 words

Python串口通信:pyserial连接FPGA/MCU,UART调试不再用串口助手

串口通信是FPGA开发中最常用的调试和数据传输方式。Python的pyserial库提供了简单易用的串口操作接口,可以实现与FPGA板卡的UART通信、数据采集和调试。


1. pyserial简介

pip install pyserial
import serial
import serial.tools.list_ports

# 列出所有可用串口
ports = serial.tools.list_ports.comports()
for port in ports:
    print(f"端口:{port.device}")
    print(f"  描述:{port.description}")
    print(f"  硬件ID:{port.hwid}")
    print()

# 常见串口名称
# Windows: COM1, COM2, COM3...
# Linux: /dev/ttyUSB0, /dev/ttyACM0...
# Mac: /dev/tty.usbserial...

2. 基本操作

import serial

# 打开串口
ser = serial.Serial(
    port='COM3',        # 串口号
    baudrate=115200,    # 波特率
    bytesize=8,         # 数据位
    parity='N',         # 校验位:N(无), E(偶), O(奇)
    stopbits=1,         # 停止位
    timeout=1           # 读取超时(秒)
)

# 检查串口状态
print(f"串口是否打开:{ser.is_open}")
print(f"串口名称:{ser.name}")
print(f"波特率:{ser.baudrate}")

# 发送数据
ser.write(b'Hello FPGA\n')

# 接收数据
data = ser.read(10)      # 读取10字节
data = ser.readline()    # 读取一行(遇到\n结束)
data = ser.read_all()    # 读取所有可用数据

# 关闭串口
ser.close()

# 使用with语句(推荐)
with serial.Serial('COM3', 115200, timeout=1) as ser:
    ser.write(b'Hello\n')
    response = ser.readline()
    print(response)

3. 数据收发

import serial
import time

def send_receive_example():
    """发送和接收示例"""
    with serial.Serial('COM3', 115200, timeout=1) as ser:
        # 清空缓冲区
        ser.reset_input_buffer()
        ser.reset_output_buffer()
        
        # 发送字符串
        message = "Hello FPGA"
        ser.write(message.encode('utf-8'))
        
        # 发送字节
        ser.write(bytes([0x01, 0x02, 0x03, 0x04]))
        
        # 发送十六进制
        hex_data = bytes.fromhex('AA BB CC DD')
        ser.write(hex_data)
        
        # 等待数据
        time.sleep(0.1)
        
        # 检查可读数据量
        available = ser.in_waiting
        print(f"可读取字节数:{available}")
        
        # 读取数据
        if available > 0:
            data = ser.read(available)
            print(f"接收到:{data.hex()}")

# 持续接收
def continuous_receive(port, baudrate=115200):
    """持续接收数据"""
    with serial.Serial(port, baudrate, timeout=0.1) as ser:
        print(f"开始监听 {port}...")
        try:
            while True:
                if ser.in_waiting > 0:
                    data = ser.read(ser.in_waiting)
                    # 显示十六进制
                    print(f"HEX: {data.hex(' ')}")
                    # 尝试解码为字符串
                    try:
                        print(f"STR: {data.decode('utf-8')}")
                    except:
                        pass
                time.sleep(0.01)
        except KeyboardInterrupt:
            print("\n停止监听")

# continuous_receive('COM3')

4. 串口配置详解

import serial

# 完整配置
ser = serial.Serial()
ser.port = 'COM3'
ser.baudrate = 115200

# 数据位:5, 6, 7, 8
ser.bytesize = serial.EIGHTBITS  # 8位

# 校验位
ser.parity = serial.PARITY_NONE   # 无校验
# serial.PARITY_EVEN  # 偶校验
# serial.PARITY_ODD   # 奇校验

# 停止位
ser.stopbits = serial.STOPBITS_ONE  # 1位
# serial.STOPBITS_TWO  # 2位

# 超时设置
ser.timeout = 1          # 读取超时(秒)
ser.write_timeout = 1    # 写入超时

# 流控制
ser.xonxoff = False      # 软件流控
ser.rtscts = False       # 硬件流控RTS/CTS
ser.dsrdtr = False       # 硬件流控DSR/DTR

# 打开串口
ser.open()

# 常用波特率
# 9600, 19200, 38400, 57600, 115200, 230400, 460800, 921600

# 动态修改波特率
ser.baudrate = 9600

# 控制信号
ser.setDTR(True)   # 设置DTR
ser.setRTS(True)   # 设置RTS
print(ser.getCTS())  # 读取CTS
print(ser.getDSR())  # 读取DSR

ser.close()

5. 数据解析

import struct

# 解析二进制数据
def parse_binary_data(data):
    """
    解析FPGA发送的二进制数据包
    格式:帧头(2B) + 长度(1B) + 数据(NB) + 校验(1B)
    """
    if len(data) < 4:
        return None
    
    # 检查帧头
    header = struct.unpack('>H', data[0:2])[0]
    if header != 0xAA55:
        return None
    
    # 获取长度
    length = data[2]
    
    # 检查数据完整性
    if len(data) < 4 + length:
        return None
    
    # 提取数据
    payload = data[3:3+length]
    
    # 校验
    checksum = data[3+length]
    calculated = sum(data[0:3+length]) & 0xFF
    if checksum != calculated:
        return None
    
    return payload

# 构建数据包
def build_packet(cmd, data):
    """
    构建发送给FPGA的数据包
    格式:帧头(AA55) + 命令(1B) + 长度(1B) + 数据(NB) + 校验(1B)
    """
    packet = bytearray()
    packet.extend([0xAA, 0x55])  # 帧头
    packet.append(cmd)           # 命令
    packet.append(len(data))     # 长度
    packet.extend(data)          # 数据
    
    # 计算校验和
    checksum = sum(packet) & 0xFF
    packet.append(checksum)
    
    return bytes(packet)

# 解析不同数据类型
def parse_sensor_data(data):
    """解析传感器数据"""
    # 假设格式:温度(2B,有符号) + 湿度(2B,无符号) + 压力(4B,浮点)
    if len(data) < 8:
        return None
    
    temp = struct.unpack('>h', data[0:2])[0] / 10.0  # 温度,0.1度精度
    humidity = struct.unpack('>H', data[2:4])[0] / 10.0  # 湿度
    pressure = struct.unpack('>f', data[4:8])[0]  # 压力
    
    return {
        'temperature': temp,
        'humidity': humidity,
        'pressure': pressure
    }

# struct格式字符
# b/B: signed/unsigned char (1字节)
# h/H: signed/unsigned short (2字节)
# i/I: signed/unsigned int (4字节)
# q/Q: signed/unsigned long long (8字节)
# f: float (4字节)
# d: double (8字节)
# >: 大端序
# <: 小端序

6. 串口工具类

import serial
import threading
import queue
import time
from typing import Optional, Callable

class SerialPort:
    """串口通信工具类"""
    
    def __init__(self, port: str, baudrate: int = 115200):
        self.port = port
        self.baudrate = baudrate
        self.serial: Optional[serial.Serial] = None
        self.running = False
        self.rx_queue = queue.Queue()
        self.rx_thread: Optional[threading.Thread] = None
        self.on_data_received: Optional[Callable] = None
    
    def open(self) -> bool:
        """打开串口"""
        try:
            self.serial = serial.Serial(
                port=self.port,
                baudrate=self.baudrate,
                timeout=0.1
            )
            self.running = True
            self._start_rx_thread()
            print(f"串口 {self.port} 已打开")
            return True
        except Exception as e:
            print(f"打开串口失败:{e}")
            return False
    
    def close(self):
        """关闭串口"""
        self.running = False
        if self.rx_thread:
            self.rx_thread.join(timeout=1)
        if self.serial and self.serial.is_open:
            self.serial.close()
        print(f"串口 {self.port} 已关闭")
    
    def _start_rx_thread(self):
        """启动接收线程"""
        self.rx_thread = threading.Thread(target=self._rx_loop, daemon=True)
        self.rx_thread.start()
    
    def _rx_loop(self):
        """接收循环"""
        buffer = bytearray()
        while self.running:
            try:
                if self.serial.in_waiting > 0:
                    data = self.serial.read(self.serial.in_waiting)
                    buffer.extend(data)
                    
                    # 处理完整数据包
                    while len(buffer) >= 4:
                        # 查找帧头
                        idx = buffer.find(b'\xAA\x55')
                        if idx == -1:
                            buffer.clear()
                            break
                        if idx > 0:
                            buffer = buffer[idx:]
                        
                        # 检查长度
                        if len(buffer) < 4:
                            break
                        
                        length = buffer[3]
                        packet_len = 5 + length  # 帧头2 + 命令1 + 长度1 + 数据N + 校验1
                        
                        if len(buffer) < packet_len:
                            break
                        
                        # 提取数据包
                        packet = bytes(buffer[:packet_len])
                        buffer = buffer[packet_len:]
                        
                        # 放入队列或回调
                        self.rx_queue.put(packet)
                        if self.on_data_received:
                            self.on_data_received(packet)
                
                time.sleep(0.001)
            except Exception as e:
                if self.running:
                    print(f"接收错误:{e}")
    
    def send(self, data: bytes) -> bool:
        """发送数据"""
        if not self.serial or not self.serial.is_open:
            return False
        try:
            self.serial.write(data)
            return True
        except Exception as e:
            print(f"发送错误:{e}")
            return False
    
    def send_command(self, cmd: int, data: bytes = b'') -> bool:
        """发送命令"""
        packet = self._build_packet(cmd, data)
        return self.send(packet)
    
    def _build_packet(self, cmd: int, data: bytes) -> bytes:
        """构建数据包"""
        packet = bytearray([0xAA, 0x55, cmd, len(data)])
        packet.extend(data)
        packet.append(sum(packet) & 0xFF)
        return bytes(packet)
    
    def receive(self, timeout: float = 1.0) -> Optional[bytes]:
        """接收数据(阻塞)"""
        try:
            return self.rx_queue.get(timeout=timeout)
        except queue.Empty:
            return None
    
    def __enter__(self):
        self.open()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

# 使用示例
def serial_example():
    with SerialPort('COM3', 115200) as sp:
        # 设置回调
        sp.on_data_received = lambda data: print(f"收到:{data.hex()}")
        
        # 发送命令
        sp.send_command(0x01, bytes([0x00, 0x01]))
        
        # 等待响应
        response = sp.receive(timeout=2.0)
        if response:
            print(f"响应:{response.hex()}")
        
        time.sleep(5)

7. 与FPGA通信

"""
FPGA UART通信示例
常见应用:寄存器读写、数据采集、调试信息输出
"""
import serial
import struct
import time

class FPGAUart:
    """FPGA UART通信类"""
    
    # 命令定义
    CMD_READ_REG = 0x01
    CMD_WRITE_REG = 0x02
    CMD_READ_DATA = 0x03
    CMD_START_ACQ = 0x04
    CMD_STOP_ACQ = 0x05
    
    def __init__(self, port: str, baudrate: int = 115200):
        self.serial = serial.Serial(port, baudrate, timeout=1)
        print(f"连接到FPGA:{port} @ {baudrate}")
    
    def close(self):
        self.serial.close()
    
    def read_register(self, addr: int) -> int:
        """读取寄存器"""
        # 发送读命令:AA 55 01 02 ADDR_H ADDR_L CHECKSUM
        data = struct.pack('>H', addr)
        self._send_command(self.CMD_READ_REG, data)
        
        # 接收响应
        response = self._receive_response()
        if response and len(response) >= 4:
            return struct.unpack('>I', response[:4])[0]
        return 0
    
    def write_register(self, addr: int, value: int):
        """写入寄存器"""
        # 发送写命令:AA 55 02 06 ADDR_H ADDR_L VALUE(4B) CHECKSUM
        data = struct.pack('>HI', addr, value)
        self._send_command(self.CMD_WRITE_REG, data)
        
        # 等待确认
        response = self._receive_response()
        return response is not None
    
    def read_data(self, length: int) -> bytes:
        """读取数据"""
        data = struct.pack('>H', length)
        self._send_command(self.CMD_READ_DATA, data)
        
        # 接收数据
        result = b''
        remaining = length
        while remaining > 0:
            chunk = self.serial.read(min(remaining, 1024))
            if not chunk:
                break
            result += chunk
            remaining -= len(chunk)
        
        return result
    
    def start_acquisition(self):
        """启动数据采集"""
        self._send_command(self.CMD_START_ACQ, b'')
        return self._receive_response() is not None
    
    def stop_acquisition(self):
        """停止数据采集"""
        self._send_command(self.CMD_STOP_ACQ, b'')
        return self._receive_response() is not None
    
    def _send_command(self, cmd: int, data: bytes):
        """发送命令"""
        packet = bytearray([0xAA, 0x55, cmd, len(data)])
        packet.extend(data)
        packet.append(sum(packet) & 0xFF)
        
        self.serial.reset_input_buffer()
        self.serial.write(packet)
    
    def _receive_response(self, timeout: float = 1.0) -> bytes:
        """接收响应"""
        start = time.time()
        buffer = bytearray()
        
        while time.time() - start < timeout:
            if self.serial.in_waiting > 0:
                buffer.extend(self.serial.read(self.serial.in_waiting))
                
                # 检查是否收到完整响应
                if len(buffer) >= 4:
                    if buffer[0] == 0xAA and buffer[1] == 0x55:
                        length = buffer[3]
                        if len(buffer) >= 5 + length:
                            return bytes(buffer[4:4+length])
            
            time.sleep(0.01)
        
        return None

# 使用示例
def fpga_uart_example():
    fpga = FPGAUart('COM3', 115200)
    
    try:
        # 读取版本寄存器
        version = fpga.read_register(0x0000)
        print(f"FPGA版本:{version:08X}")
        
        # 写入控制寄存器
        fpga.write_register(0x0004, 0x00000001)
        
        # 启动采集
        if fpga.start_acquisition():
            print("采集已启动")
            
            # 读取数据
            time.sleep(1)
            data = fpga.read_data(1024)
            print(f"读取到 {len(data)} 字节数据")
            
            # 停止采集
            fpga.stop_acquisition()
            print("采集已停止")
    
    finally:
        fpga.close()

# fpga_uart_example()

8. 常见问题

❌ 问题1:串口打开失败

# 检查串口是否被占用
import serial.tools.list_ports

def find_available_port():
    ports = serial.tools.list_ports.comports()
    for port in ports:
        try:
            ser = serial.Serial(port.device)
            ser.close()
            print(f"{port.device} 可用")
        except:
            print(f"{port.device} 被占用或不可用")

❌ 问题2:数据丢失

# 增加缓冲区大小
ser = serial.Serial('COM3', 115200)
ser.set_buffer_size(rx_size=65536, tx_size=65536)

# 使用线程持续读取
# 参考上面的SerialPort类

❌ 问题3:乱码

# 检查波特率是否匹配
# 检查数据位、校验位、停止位设置

# 查看原始十六进制数据
data = ser.read(100)
print(data.hex(' '))

❌ 问题4:超时

# 设置合适的超时时间
ser.timeout = 2  # 读取超时
ser.write_timeout = 2  # 写入超时

# 检查硬件连接
# 检查FPGA是否正确响应

9. 实战案例

案例:FPGA数据采集器

"""
实战案例:FPGA ADC数据采集器
通过串口读取FPGA采集的ADC数据并保存
"""
import serial
import struct
import time
import numpy as np
from pathlib import Path
from datetime import datetime

class ADCDataCollector:
    """ADC数据采集器"""
    
    def __init__(self, port: str, baudrate: int = 921600):
        self.serial = serial.Serial(port, baudrate, timeout=1)
        self.serial.set_buffer_size(rx_size=1024*1024)
        self.data_buffer = []
        self.collecting = False
    
    def configure(self, sample_rate: int, channels: int):
        """配置采集参数"""
        # 发送配置命令
        config = struct.pack('>IB', sample_rate, channels)
        self._send_command(0x10, config)
        
        response = self._wait_response()
        if response and response[0] == 0x00:
            print(f"配置成功:采样率={sample_rate}Hz, 通道数={channels}")
            return True
        return False
    
    def start_collection(self, duration: float):
        """开始采集"""
        self.data_buffer.clear()
        self.collecting = True
        
        # 发送开始命令
        self._send_command(0x11, b'')
        
        print(f"开始采集,持续{duration}秒...")
        start_time = time.time()
        
        while time.time() - start_time < duration and self.collecting:
            if self.serial.in_waiting > 0:
                data = self.serial.read(self.serial.in_waiting)
                self.data_buffer.append(data)
            time.sleep(0.001)
        
        # 发送停止命令
        self._send_command(0x12, b'')
        self.collecting = False
        
        print(f"采集完成,共收到 {sum(len(d) for d in self.data_buffer)} 字节")
    
    def parse_data(self) -> np.ndarray:
        """解析采集的数据"""
        raw_data = b''.join(self.data_buffer)
        
        # 假设数据格式:每个采样点2字节,有符号整数
        samples = len(raw_data) // 2
        data = np.frombuffer(raw_data, dtype='>i2')
        
        # 转换为电压(假设12位ADC,参考电压3.3V)
        voltage = data.astype(float) / 4096 * 3.3
        
        return voltage
    
    def save_data(self, filename: str, data: np.ndarray):
        """保存数据"""
        np.save(filename, data)
        print(f"数据已保存到 {filename}")
        
        # 同时保存CSV
        csv_file = Path(filename).with_suffix('.csv')
        np.savetxt(csv_file, data, delimiter=',', header='voltage')
        print(f"CSV已保存到 {csv_file}")
    
    def _send_command(self, cmd: int, data: bytes):
        packet = bytes([0xAA, 0x55, cmd, len(data)]) + data
        packet += bytes([sum(packet) & 0xFF])
        self.serial.write(packet)
    
    def _wait_response(self, timeout: float = 1.0) -> bytes:
        start = time.time()
        while time.time() - start < timeout:
            if self.serial.in_waiting >= 5:
                header = self.serial.read(4)
                if header[0:2] == b'\xAA\x55':
                    length = header[3]
                    data = self.serial.read(length + 1)
                    return data[:-1]
            time.sleep(0.01)
        return None
    
    def close(self):
        self.serial.close()

# 使用示例
def adc_collection_example():
    collector = ADCDataCollector('COM3', 921600)
    
    try:
        # 配置:1MHz采样率,1通道
        if collector.configure(1000000, 1):
            # 采集1秒数据
            collector.start_collection(1.0)
            
            # 解析数据
            voltage = collector.parse_data()
            print(f"采集到 {len(voltage)} 个采样点")
            print(f"电压范围:{voltage.min():.3f}V - {voltage.max():.3f}V")
            
            # 保存数据
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            collector.save_data(f'adc_data_{timestamp}.npy', voltage)
    
    finally:
        collector.close()

# adc_collection_example()

10. 总结

🔑 核心要点

知识点要点
pyserialserial.Serial()打开串口
配置波特率、数据位、校验位、停止位
收发write()发送,read()/readline()接收
数据解析struct模块处理二进制数据
多线程后台线程持续接收数据
协议设计帧头+长度+数据+校验

✅ 学习检查清单

  • 能打开和配置串口
  • 能发送和接收数据
  • 能解析二进制数据
  • 能设计简单的通信协议
  • 能实现与FPGA的通信

📖 下一步学习

掌握了串口通信后,让我们学习二进制协议解析:


常见问题 FAQ

💬 串口收到的数据是乱码怎么办?

检查波特率是否匹配、数据位/停止位/校验位是否一致。FPGA端和Python端配置必须完全相同。还要注意字节序和编码问题。

💬 怎么处理串口数据粘包?

串口是字节流,没有消息边界。需要自定义协议:用帧头帧尾分隔,或者用长度字段标识。接收端用缓冲区累积数据,按协议拆包。


系列导航

End of file.