Python串口通信:pyserial连接FPGA/MCU,UART调试不再用串口助手
串口通信是FPGA开发中最常用的调试和数据传输方式。Python的pyserial库提供了简单易用的串口操作接口,可以实现与FPGA板卡的UART通信、数据采集和调试。
1. pyserial简介
pip install pyserial
import serial
import serial.tools.list_ports
# 列出所有可用串口
ports = serial.tools.list_ports.comports()
for port in ports:
print(f"端口:{port.device}")
print(f" 描述:{port.description}")
print(f" 硬件ID:{port.hwid}")
print()
# 常见串口名称
# Windows: COM1, COM2, COM3...
# Linux: /dev/ttyUSB0, /dev/ttyACM0...
# Mac: /dev/tty.usbserial...
2. 基本操作
import serial
# 打开串口
ser = serial.Serial(
port='COM3', # 串口号
baudrate=115200, # 波特率
bytesize=8, # 数据位
parity='N', # 校验位:N(无), E(偶), O(奇)
stopbits=1, # 停止位
timeout=1 # 读取超时(秒)
)
# 检查串口状态
print(f"串口是否打开:{ser.is_open}")
print(f"串口名称:{ser.name}")
print(f"波特率:{ser.baudrate}")
# 发送数据
ser.write(b'Hello FPGA\n')
# 接收数据
data = ser.read(10) # 读取10字节
data = ser.readline() # 读取一行(遇到\n结束)
data = ser.read_all() # 读取所有可用数据
# 关闭串口
ser.close()
# 使用with语句(推荐)
with serial.Serial('COM3', 115200, timeout=1) as ser:
ser.write(b'Hello\n')
response = ser.readline()
print(response)
3. 数据收发
import serial
import time
def send_receive_example():
"""发送和接收示例"""
with serial.Serial('COM3', 115200, timeout=1) as ser:
# 清空缓冲区
ser.reset_input_buffer()
ser.reset_output_buffer()
# 发送字符串
message = "Hello FPGA"
ser.write(message.encode('utf-8'))
# 发送字节
ser.write(bytes([0x01, 0x02, 0x03, 0x04]))
# 发送十六进制
hex_data = bytes.fromhex('AA BB CC DD')
ser.write(hex_data)
# 等待数据
time.sleep(0.1)
# 检查可读数据量
available = ser.in_waiting
print(f"可读取字节数:{available}")
# 读取数据
if available > 0:
data = ser.read(available)
print(f"接收到:{data.hex()}")
# 持续接收
def continuous_receive(port, baudrate=115200):
"""持续接收数据"""
with serial.Serial(port, baudrate, timeout=0.1) as ser:
print(f"开始监听 {port}...")
try:
while True:
if ser.in_waiting > 0:
data = ser.read(ser.in_waiting)
# 显示十六进制
print(f"HEX: {data.hex(' ')}")
# 尝试解码为字符串
try:
print(f"STR: {data.decode('utf-8')}")
except:
pass
time.sleep(0.01)
except KeyboardInterrupt:
print("\n停止监听")
# continuous_receive('COM3')
4. 串口配置详解
import serial
# 完整配置
ser = serial.Serial()
ser.port = 'COM3'
ser.baudrate = 115200
# 数据位:5, 6, 7, 8
ser.bytesize = serial.EIGHTBITS # 8位
# 校验位
ser.parity = serial.PARITY_NONE # 无校验
# serial.PARITY_EVEN # 偶校验
# serial.PARITY_ODD # 奇校验
# 停止位
ser.stopbits = serial.STOPBITS_ONE # 1位
# serial.STOPBITS_TWO # 2位
# 超时设置
ser.timeout = 1 # 读取超时(秒)
ser.write_timeout = 1 # 写入超时
# 流控制
ser.xonxoff = False # 软件流控
ser.rtscts = False # 硬件流控RTS/CTS
ser.dsrdtr = False # 硬件流控DSR/DTR
# 打开串口
ser.open()
# 常用波特率
# 9600, 19200, 38400, 57600, 115200, 230400, 460800, 921600
# 动态修改波特率
ser.baudrate = 9600
# 控制信号
ser.setDTR(True) # 设置DTR
ser.setRTS(True) # 设置RTS
print(ser.getCTS()) # 读取CTS
print(ser.getDSR()) # 读取DSR
ser.close()
5. 数据解析
import struct
# 解析二进制数据
def parse_binary_data(data):
"""
解析FPGA发送的二进制数据包
格式:帧头(2B) + 长度(1B) + 数据(NB) + 校验(1B)
"""
if len(data) < 4:
return None
# 检查帧头
header = struct.unpack('>H', data[0:2])[0]
if header != 0xAA55:
return None
# 获取长度
length = data[2]
# 检查数据完整性
if len(data) < 4 + length:
return None
# 提取数据
payload = data[3:3+length]
# 校验
checksum = data[3+length]
calculated = sum(data[0:3+length]) & 0xFF
if checksum != calculated:
return None
return payload
# 构建数据包
def build_packet(cmd, data):
"""
构建发送给FPGA的数据包
格式:帧头(AA55) + 命令(1B) + 长度(1B) + 数据(NB) + 校验(1B)
"""
packet = bytearray()
packet.extend([0xAA, 0x55]) # 帧头
packet.append(cmd) # 命令
packet.append(len(data)) # 长度
packet.extend(data) # 数据
# 计算校验和
checksum = sum(packet) & 0xFF
packet.append(checksum)
return bytes(packet)
# 解析不同数据类型
def parse_sensor_data(data):
"""解析传感器数据"""
# 假设格式:温度(2B,有符号) + 湿度(2B,无符号) + 压力(4B,浮点)
if len(data) < 8:
return None
temp = struct.unpack('>h', data[0:2])[0] / 10.0 # 温度,0.1度精度
humidity = struct.unpack('>H', data[2:4])[0] / 10.0 # 湿度
pressure = struct.unpack('>f', data[4:8])[0] # 压力
return {
'temperature': temp,
'humidity': humidity,
'pressure': pressure
}
# struct格式字符
# b/B: signed/unsigned char (1字节)
# h/H: signed/unsigned short (2字节)
# i/I: signed/unsigned int (4字节)
# q/Q: signed/unsigned long long (8字节)
# f: float (4字节)
# d: double (8字节)
# >: 大端序
# <: 小端序
6. 串口工具类
import serial
import threading
import queue
import time
from typing import Optional, Callable
class SerialPort:
"""串口通信工具类"""
def __init__(self, port: str, baudrate: int = 115200):
self.port = port
self.baudrate = baudrate
self.serial: Optional[serial.Serial] = None
self.running = False
self.rx_queue = queue.Queue()
self.rx_thread: Optional[threading.Thread] = None
self.on_data_received: Optional[Callable] = None
def open(self) -> bool:
"""打开串口"""
try:
self.serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=0.1
)
self.running = True
self._start_rx_thread()
print(f"串口 {self.port} 已打开")
return True
except Exception as e:
print(f"打开串口失败:{e}")
return False
def close(self):
"""关闭串口"""
self.running = False
if self.rx_thread:
self.rx_thread.join(timeout=1)
if self.serial and self.serial.is_open:
self.serial.close()
print(f"串口 {self.port} 已关闭")
def _start_rx_thread(self):
"""启动接收线程"""
self.rx_thread = threading.Thread(target=self._rx_loop, daemon=True)
self.rx_thread.start()
def _rx_loop(self):
"""接收循环"""
buffer = bytearray()
while self.running:
try:
if self.serial.in_waiting > 0:
data = self.serial.read(self.serial.in_waiting)
buffer.extend(data)
# 处理完整数据包
while len(buffer) >= 4:
# 查找帧头
idx = buffer.find(b'\xAA\x55')
if idx == -1:
buffer.clear()
break
if idx > 0:
buffer = buffer[idx:]
# 检查长度
if len(buffer) < 4:
break
length = buffer[3]
packet_len = 5 + length # 帧头2 + 命令1 + 长度1 + 数据N + 校验1
if len(buffer) < packet_len:
break
# 提取数据包
packet = bytes(buffer[:packet_len])
buffer = buffer[packet_len:]
# 放入队列或回调
self.rx_queue.put(packet)
if self.on_data_received:
self.on_data_received(packet)
time.sleep(0.001)
except Exception as e:
if self.running:
print(f"接收错误:{e}")
def send(self, data: bytes) -> bool:
"""发送数据"""
if not self.serial or not self.serial.is_open:
return False
try:
self.serial.write(data)
return True
except Exception as e:
print(f"发送错误:{e}")
return False
def send_command(self, cmd: int, data: bytes = b'') -> bool:
"""发送命令"""
packet = self._build_packet(cmd, data)
return self.send(packet)
def _build_packet(self, cmd: int, data: bytes) -> bytes:
"""构建数据包"""
packet = bytearray([0xAA, 0x55, cmd, len(data)])
packet.extend(data)
packet.append(sum(packet) & 0xFF)
return bytes(packet)
def receive(self, timeout: float = 1.0) -> Optional[bytes]:
"""接收数据(阻塞)"""
try:
return self.rx_queue.get(timeout=timeout)
except queue.Empty:
return None
def __enter__(self):
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
# 使用示例
def serial_example():
with SerialPort('COM3', 115200) as sp:
# 设置回调
sp.on_data_received = lambda data: print(f"收到:{data.hex()}")
# 发送命令
sp.send_command(0x01, bytes([0x00, 0x01]))
# 等待响应
response = sp.receive(timeout=2.0)
if response:
print(f"响应:{response.hex()}")
time.sleep(5)
7. 与FPGA通信
"""
FPGA UART通信示例
常见应用:寄存器读写、数据采集、调试信息输出
"""
import serial
import struct
import time
class FPGAUart:
"""FPGA UART通信类"""
# 命令定义
CMD_READ_REG = 0x01
CMD_WRITE_REG = 0x02
CMD_READ_DATA = 0x03
CMD_START_ACQ = 0x04
CMD_STOP_ACQ = 0x05
def __init__(self, port: str, baudrate: int = 115200):
self.serial = serial.Serial(port, baudrate, timeout=1)
print(f"连接到FPGA:{port} @ {baudrate}")
def close(self):
self.serial.close()
def read_register(self, addr: int) -> int:
"""读取寄存器"""
# 发送读命令:AA 55 01 02 ADDR_H ADDR_L CHECKSUM
data = struct.pack('>H', addr)
self._send_command(self.CMD_READ_REG, data)
# 接收响应
response = self._receive_response()
if response and len(response) >= 4:
return struct.unpack('>I', response[:4])[0]
return 0
def write_register(self, addr: int, value: int):
"""写入寄存器"""
# 发送写命令:AA 55 02 06 ADDR_H ADDR_L VALUE(4B) CHECKSUM
data = struct.pack('>HI', addr, value)
self._send_command(self.CMD_WRITE_REG, data)
# 等待确认
response = self._receive_response()
return response is not None
def read_data(self, length: int) -> bytes:
"""读取数据"""
data = struct.pack('>H', length)
self._send_command(self.CMD_READ_DATA, data)
# 接收数据
result = b''
remaining = length
while remaining > 0:
chunk = self.serial.read(min(remaining, 1024))
if not chunk:
break
result += chunk
remaining -= len(chunk)
return result
def start_acquisition(self):
"""启动数据采集"""
self._send_command(self.CMD_START_ACQ, b'')
return self._receive_response() is not None
def stop_acquisition(self):
"""停止数据采集"""
self._send_command(self.CMD_STOP_ACQ, b'')
return self._receive_response() is not None
def _send_command(self, cmd: int, data: bytes):
"""发送命令"""
packet = bytearray([0xAA, 0x55, cmd, len(data)])
packet.extend(data)
packet.append(sum(packet) & 0xFF)
self.serial.reset_input_buffer()
self.serial.write(packet)
def _receive_response(self, timeout: float = 1.0) -> bytes:
"""接收响应"""
start = time.time()
buffer = bytearray()
while time.time() - start < timeout:
if self.serial.in_waiting > 0:
buffer.extend(self.serial.read(self.serial.in_waiting))
# 检查是否收到完整响应
if len(buffer) >= 4:
if buffer[0] == 0xAA and buffer[1] == 0x55:
length = buffer[3]
if len(buffer) >= 5 + length:
return bytes(buffer[4:4+length])
time.sleep(0.01)
return None
# 使用示例
def fpga_uart_example():
fpga = FPGAUart('COM3', 115200)
try:
# 读取版本寄存器
version = fpga.read_register(0x0000)
print(f"FPGA版本:{version:08X}")
# 写入控制寄存器
fpga.write_register(0x0004, 0x00000001)
# 启动采集
if fpga.start_acquisition():
print("采集已启动")
# 读取数据
time.sleep(1)
data = fpga.read_data(1024)
print(f"读取到 {len(data)} 字节数据")
# 停止采集
fpga.stop_acquisition()
print("采集已停止")
finally:
fpga.close()
# fpga_uart_example()
8. 常见问题
❌ 问题1:串口打开失败
# 检查串口是否被占用
import serial.tools.list_ports
def find_available_port():
ports = serial.tools.list_ports.comports()
for port in ports:
try:
ser = serial.Serial(port.device)
ser.close()
print(f"{port.device} 可用")
except:
print(f"{port.device} 被占用或不可用")
❌ 问题2:数据丢失
# 增加缓冲区大小
ser = serial.Serial('COM3', 115200)
ser.set_buffer_size(rx_size=65536, tx_size=65536)
# 使用线程持续读取
# 参考上面的SerialPort类
❌ 问题3:乱码
# 检查波特率是否匹配
# 检查数据位、校验位、停止位设置
# 查看原始十六进制数据
data = ser.read(100)
print(data.hex(' '))
❌ 问题4:超时
# 设置合适的超时时间
ser.timeout = 2 # 读取超时
ser.write_timeout = 2 # 写入超时
# 检查硬件连接
# 检查FPGA是否正确响应
9. 实战案例
案例:FPGA数据采集器
"""
实战案例:FPGA ADC数据采集器
通过串口读取FPGA采集的ADC数据并保存
"""
import serial
import struct
import time
import numpy as np
from pathlib import Path
from datetime import datetime
class ADCDataCollector:
"""ADC数据采集器"""
def __init__(self, port: str, baudrate: int = 921600):
self.serial = serial.Serial(port, baudrate, timeout=1)
self.serial.set_buffer_size(rx_size=1024*1024)
self.data_buffer = []
self.collecting = False
def configure(self, sample_rate: int, channels: int):
"""配置采集参数"""
# 发送配置命令
config = struct.pack('>IB', sample_rate, channels)
self._send_command(0x10, config)
response = self._wait_response()
if response and response[0] == 0x00:
print(f"配置成功:采样率={sample_rate}Hz, 通道数={channels}")
return True
return False
def start_collection(self, duration: float):
"""开始采集"""
self.data_buffer.clear()
self.collecting = True
# 发送开始命令
self._send_command(0x11, b'')
print(f"开始采集,持续{duration}秒...")
start_time = time.time()
while time.time() - start_time < duration and self.collecting:
if self.serial.in_waiting > 0:
data = self.serial.read(self.serial.in_waiting)
self.data_buffer.append(data)
time.sleep(0.001)
# 发送停止命令
self._send_command(0x12, b'')
self.collecting = False
print(f"采集完成,共收到 {sum(len(d) for d in self.data_buffer)} 字节")
def parse_data(self) -> np.ndarray:
"""解析采集的数据"""
raw_data = b''.join(self.data_buffer)
# 假设数据格式:每个采样点2字节,有符号整数
samples = len(raw_data) // 2
data = np.frombuffer(raw_data, dtype='>i2')
# 转换为电压(假设12位ADC,参考电压3.3V)
voltage = data.astype(float) / 4096 * 3.3
return voltage
def save_data(self, filename: str, data: np.ndarray):
"""保存数据"""
np.save(filename, data)
print(f"数据已保存到 {filename}")
# 同时保存CSV
csv_file = Path(filename).with_suffix('.csv')
np.savetxt(csv_file, data, delimiter=',', header='voltage')
print(f"CSV已保存到 {csv_file}")
def _send_command(self, cmd: int, data: bytes):
packet = bytes([0xAA, 0x55, cmd, len(data)]) + data
packet += bytes([sum(packet) & 0xFF])
self.serial.write(packet)
def _wait_response(self, timeout: float = 1.0) -> bytes:
start = time.time()
while time.time() - start < timeout:
if self.serial.in_waiting >= 5:
header = self.serial.read(4)
if header[0:2] == b'\xAA\x55':
length = header[3]
data = self.serial.read(length + 1)
return data[:-1]
time.sleep(0.01)
return None
def close(self):
self.serial.close()
# 使用示例
def adc_collection_example():
collector = ADCDataCollector('COM3', 921600)
try:
# 配置:1MHz采样率,1通道
if collector.configure(1000000, 1):
# 采集1秒数据
collector.start_collection(1.0)
# 解析数据
voltage = collector.parse_data()
print(f"采集到 {len(voltage)} 个采样点")
print(f"电压范围:{voltage.min():.3f}V - {voltage.max():.3f}V")
# 保存数据
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
collector.save_data(f'adc_data_{timestamp}.npy', voltage)
finally:
collector.close()
# adc_collection_example()
10. 总结
🔑 核心要点
| 知识点 | 要点 |
|---|---|
| pyserial | serial.Serial()打开串口 |
| 配置 | 波特率、数据位、校验位、停止位 |
| 收发 | write()发送,read()/readline()接收 |
| 数据解析 | struct模块处理二进制数据 |
| 多线程 | 后台线程持续接收数据 |
| 协议设计 | 帧头+长度+数据+校验 |
✅ 学习检查清单
- 能打开和配置串口
- 能发送和接收数据
- 能解析二进制数据
- 能设计简单的通信协议
- 能实现与FPGA的通信
📖 下一步学习
掌握了串口通信后,让我们学习二进制协议解析:
常见问题 FAQ
💬 串口收到的数据是乱码怎么办?
检查波特率是否匹配、数据位/停止位/校验位是否一致。FPGA端和Python端配置必须完全相同。还要注意字节序和编码问题。
💬 怎么处理串口数据粘包?
串口是字节流,没有消息边界。需要自定义协议:用帧头帧尾分隔,或者用长度字段标识。接收端用缓冲区累积数据,按协议拆包。
� 系列导航
- 上一篇:21 - Python定时任务与自动化脚本
- 当前:22 - Python串口通信
- 下一篇:23 - Python二进制协议解析与校验