Python NumPy数据处理:用数组运算替代for循环,速度快100倍
NumPy是Python科学计算的基础库,提供高效的多维数组操作。在FPGA开发中,NumPy常用于信号处理、数据分析、波形生成和验证等场景。
1. NumPy简介
pip install numpy
import numpy as np
# NumPy的核心是ndarray(N-dimensional array)
# 优点:
# 1. 高效的数值计算(底层C实现)
# 2. 向量化操作(无需循环)
# 3. 广播机制
# 4. 丰富的数学函数
# 查看版本
print(np.__version__)
# 基本数组
arr = np.array([1, 2, 3, 4, 5])
print(f"数组:{arr}")
print(f"类型:{type(arr)}")
print(f"数据类型:{arr.dtype}")
print(f"形状:{arr.shape}")
print(f"维度:{arr.ndim}")
print(f"元素数:{arr.size}")
2. 数组创建
import numpy as np
# 从列表创建
arr1 = np.array([1, 2, 3])
arr2 = np.array([[1, 2, 3], [4, 5, 6]]) # 二维
# 指定数据类型
arr_int = np.array([1, 2, 3], dtype=np.int32)
arr_float = np.array([1, 2, 3], dtype=np.float64)
arr_complex = np.array([1+2j, 3+4j], dtype=np.complex128)
# 常用数据类型
# np.int8, np.int16, np.int32, np.int64
# np.uint8, np.uint16, np.uint32, np.uint64
# np.float32, np.float64
# np.complex64, np.complex128
# np.bool_
# 特殊数组
zeros = np.zeros((3, 4)) # 全零
ones = np.ones((3, 4)) # 全一
empty = np.empty((3, 4)) # 未初始化
full = np.full((3, 4), 7) # 填充指定值
eye = np.eye(4) # 单位矩阵
diag = np.diag([1, 2, 3]) # 对角矩阵
# 序列数组
arange = np.arange(0, 10, 2) # [0, 2, 4, 6, 8]
linspace = np.linspace(0, 1, 5) # [0, 0.25, 0.5, 0.75, 1]
logspace = np.logspace(0, 2, 5) # 对数等分
# 随机数组
rand = np.random.rand(3, 4) # [0, 1)均匀分布
randn = np.random.randn(3, 4) # 标准正态分布
randint = np.random.randint(0, 10, (3, 4)) # 随机整数
# 从字节创建(FPGA数据常用)
data = b'\x01\x02\x03\x04'
arr_from_bytes = np.frombuffer(data, dtype=np.uint8)
arr_from_bytes = np.frombuffer(data, dtype='>i2') # 大端16位整数
# 从文件创建
# arr = np.fromfile('data.bin', dtype=np.int16)
3. 数组操作
import numpy as np
arr = np.arange(12).reshape(3, 4)
print(arr)
# [[ 0 1 2 3]
# [ 4 5 6 7]
# [ 8 9 10 11]]
# 索引和切片
print(arr[0, 0]) # 0
print(arr[1, :]) # [4 5 6 7]
print(arr[:, 2]) # [2 6 10]
print(arr[0:2, 1:3]) # [[1 2] [5 6]]
# 布尔索引
print(arr[arr > 5]) # [6 7 8 9 10 11]
arr[arr > 5] = 0 # 条件赋值
# 花式索引
print(arr[[0, 2], :]) # 第0行和第2行
# 形状操作
arr2 = arr.reshape(4, 3) # 改变形状
arr3 = arr.flatten() # 展平为一维
arr4 = arr.T # 转置
arr5 = arr.ravel() # 展平(返回视图)
# 数组拼接
a = np.array([[1, 2], [3, 4]])
b = np.array([[5, 6], [7, 8]])
concat_v = np.vstack((a, b)) # 垂直拼接
concat_h = np.hstack((a, b)) # 水平拼接
concat = np.concatenate((a, b), axis=0) # 沿轴拼接
# 数组分割
arr = np.arange(12)
split = np.split(arr, 3) # 等分为3份
split = np.split(arr, [3, 7]) # 在索引3和7处分割
# 数组复制
arr_view = arr.view() # 视图(共享数据)
arr_copy = arr.copy() # 深复制
# 广播机制
a = np.array([[1], [2], [3]]) # (3, 1)
b = np.array([10, 20, 30]) # (3,)
c = a + b # 广播后相加
# [[11 21 31]
# [12 22 32]
# [13 23 33]]
4. 数学运算
import numpy as np
a = np.array([1, 2, 3, 4])
b = np.array([5, 6, 7, 8])
# 基本运算(逐元素)
print(a + b) # [ 6 8 10 12]
print(a - b) # [-4 -4 -4 -4]
print(a * b) # [ 5 12 21 32]
print(a / b) # [0.2 0.33 0.43 0.5]
print(a ** 2) # [ 1 4 9 16]
print(np.sqrt(a)) # [1. 1.41 1.73 2.]
# 三角函数
theta = np.linspace(0, 2*np.pi, 100)
sin_wave = np.sin(theta)
cos_wave = np.cos(theta)
# 指数和对数
exp = np.exp(a) # e^x
log = np.log(a) # ln(x)
log10 = np.log10(a) # log10(x)
log2 = np.log2(a) # log2(x)
# 矩阵运算
A = np.array([[1, 2], [3, 4]])
B = np.array([[5, 6], [7, 8]])
dot = np.dot(A, B) # 矩阵乘法
dot = A @ B # 同上(Python 3.5+)
element = A * B # 逐元素乘法
# 线性代数
det = np.linalg.det(A) # 行列式
inv = np.linalg.inv(A) # 逆矩阵
eig = np.linalg.eig(A) # 特征值和特征向量
svd = np.linalg.svd(A) # 奇异值分解
norm = np.linalg.norm(a) # 范数
# 位运算(整数数组)
x = np.array([0b1010, 0b1100], dtype=np.uint8)
y = np.array([0b0110, 0b1010], dtype=np.uint8)
print(np.bitwise_and(x, y)) # 按位与
print(np.bitwise_or(x, y)) # 按位或
print(np.bitwise_xor(x, y)) # 按位异或
print(np.left_shift(x, 2)) # 左移
print(np.right_shift(x, 2)) # 右移
5. 统计函数
import numpy as np
arr = np.random.randn(100)
# 基本统计
print(f"最小值:{np.min(arr):.4f}")
print(f"最大值:{np.max(arr):.4f}")
print(f"均值:{np.mean(arr):.4f}")
print(f"中位数:{np.median(arr):.4f}")
print(f"标准差:{np.std(arr):.4f}")
print(f"方差:{np.var(arr):.4f}")
print(f"求和:{np.sum(arr):.4f}")
# 位置
print(f"最小值索引:{np.argmin(arr)}")
print(f"最大值索引:{np.argmax(arr)}")
# 百分位数
print(f"25%分位:{np.percentile(arr, 25):.4f}")
print(f"75%分位:{np.percentile(arr, 75):.4f}")
# 多维数组统计
arr2d = np.random.randn(3, 4)
print(f"全局均值:{np.mean(arr2d):.4f}")
print(f"列均值:{np.mean(arr2d, axis=0)}") # 沿行方向
print(f"行均值:{np.mean(arr2d, axis=1)}") # 沿列方向
# 累积运算
cumsum = np.cumsum(arr) # 累积和
cumprod = np.cumprod(arr) # 累积积
# 相关性
x = np.random.randn(100)
y = x + np.random.randn(100) * 0.5
corr = np.corrcoef(x, y)
print(f"相关系数:{corr[0, 1]:.4f}")
# 直方图
hist, bins = np.histogram(arr, bins=10)
print(f"直方图:{hist}")
print(f"边界:{bins}")
6. 信号处理基础
import numpy as np
# 生成信号
fs = 1000 # 采样率
t = np.arange(0, 1, 1/fs) # 时间轴
# 正弦波
freq = 10 # 频率
amplitude = 1.0
phase = 0
sine_wave = amplitude * np.sin(2 * np.pi * freq * t + phase)
# 方波
square_wave = np.sign(np.sin(2 * np.pi * freq * t))
# 锯齿波
sawtooth = 2 * (t * freq - np.floor(0.5 + t * freq))
# 添加噪声
noise = np.random.randn(len(t)) * 0.1
noisy_signal = sine_wave + noise
# FFT(快速傅里叶变换)
fft_result = np.fft.fft(sine_wave)
fft_freq = np.fft.fftfreq(len(t), 1/fs)
fft_magnitude = np.abs(fft_result)
# 只取正频率部分
positive_freq = fft_freq[:len(fft_freq)//2]
positive_mag = fft_magnitude[:len(fft_magnitude)//2] * 2 / len(t)
# 逆FFT
reconstructed = np.fft.ifft(fft_result)
# 卷积
kernel = np.ones(10) / 10 # 移动平均滤波器
filtered = np.convolve(noisy_signal, kernel, mode='same')
# 相关
correlation = np.correlate(sine_wave, sine_wave, mode='full')
# 窗函数
hamming = np.hamming(len(t))
hanning = np.hanning(len(t))
blackman = np.blackman(len(t))
windowed_signal = sine_wave * hamming
# 信噪比计算
def calculate_snr(signal, noise):
"""计算信噪比(dB)"""
signal_power = np.mean(signal ** 2)
noise_power = np.mean(noise ** 2)
return 10 * np.log10(signal_power / noise_power)
snr = calculate_snr(sine_wave, noise)
print(f"信噪比:{snr:.2f} dB")
7. 文件读写
import numpy as np
# 保存和加载NumPy格式
arr = np.random.randn(100, 100)
# 单个数组
np.save('array.npy', arr)
loaded = np.load('array.npy')
# 多个数组
np.savez('arrays.npz', arr1=arr, arr2=arr*2)
data = np.load('arrays.npz')
print(data['arr1'])
print(data['arr2'])
# 压缩保存
np.savez_compressed('arrays_compressed.npz', arr1=arr)
# 文本文件
np.savetxt('array.txt', arr, delimiter=',', fmt='%.4f')
loaded_txt = np.loadtxt('array.txt', delimiter=',')
# 二进制文件(FPGA数据常用)
arr_int16 = np.array([1, 2, 3, 4, 5], dtype=np.int16)
arr_int16.tofile('data.bin')
# 读取二进制
loaded_bin = np.fromfile('data.bin', dtype=np.int16)
# 指定字节序
arr_be = np.array([0x1234, 0x5678], dtype='>i2') # 大端
arr_le = np.array([0x1234, 0x5678], dtype='<i2') # 小端
# 内存映射(大文件)
mmap = np.memmap('large_file.bin', dtype=np.float32, mode='r', shape=(1000, 1000))
# 只读取需要的部分,不加载整个文件
# 从字节转换
raw_bytes = b'\x00\x01\x00\x02\x00\x03'
arr_from_bytes = np.frombuffer(raw_bytes, dtype='>i2')
print(arr_from_bytes) # [1 2 3]
# 转换为字节
arr_to_bytes = arr_int16.tobytes()
print(arr_to_bytes.hex())
8. FPGA应用场景
import numpy as np
# 1. ADC数据处理
def process_adc_data(raw_data: bytes, bits: int = 12, vref: float = 3.3) -> np.ndarray:
"""处理ADC原始数据"""
# 假设数据是大端16位
samples = np.frombuffer(raw_data, dtype='>i2')
# 转换为电压
max_code = 2 ** bits - 1
voltage = samples.astype(float) / max_code * vref
return voltage
# 2. 生成测试波形
def generate_test_waveform(freq: float, fs: float, duration: float,
bits: int = 12) -> np.ndarray:
"""生成测试正弦波"""
t = np.arange(0, duration, 1/fs)
wave = np.sin(2 * np.pi * freq * t)
# 量化
max_code = 2 ** (bits - 1) - 1
quantized = np.round(wave * max_code).astype(np.int16)
return quantized
# 3. 生成查找表(LUT)
def generate_sine_lut(depth: int, width: int) -> np.ndarray:
"""生成正弦查找表"""
# depth: 地址位数(表深度 = 2^depth)
# width: 数据位数
table_size = 2 ** depth
max_value = 2 ** (width - 1) - 1
# 生成一个周期的正弦波
indices = np.arange(table_size)
lut = np.round(max_value * np.sin(2 * np.pi * indices / table_size))
return lut.astype(np.int16)
# 4. 生成滤波器系数
def generate_fir_coefficients(num_taps: int, cutoff: float, fs: float,
bits: int = 16) -> np.ndarray:
"""生成FIR低通滤波器系数"""
# 归一化截止频率
normalized_cutoff = cutoff / (fs / 2)
# 理想低通滤波器
n = np.arange(num_taps)
center = (num_taps - 1) / 2
# sinc函数
h = np.sinc(2 * normalized_cutoff * (n - center))
# 加窗
window = np.hamming(num_taps)
h = h * window
# 归一化
h = h / np.sum(h)
# 量化为定点数
scale = 2 ** (bits - 1) - 1
h_quantized = np.round(h * scale).astype(np.int16)
return h_quantized
# 5. 数据比对
def compare_data(expected: np.ndarray, actual: np.ndarray,
tolerance: float = 0) -> dict:
"""比对数据"""
if expected.shape != actual.shape:
return {'match': False, 'error': 'Shape mismatch'}
diff = np.abs(expected.astype(float) - actual.astype(float))
if tolerance == 0:
match = np.array_equal(expected, actual)
else:
match = np.all(diff <= tolerance)
return {
'match': match,
'max_diff': np.max(diff),
'mean_diff': np.mean(diff),
'mismatch_count': np.sum(diff > tolerance),
'mismatch_indices': np.where(diff > tolerance)[0]
}
# 6. 生成测试向量
def generate_test_vectors(num_vectors: int, data_width: int) -> np.ndarray:
"""生成随机测试向量"""
max_value = 2 ** data_width
return np.random.randint(0, max_value, num_vectors, dtype=np.uint32)
# 7. CRC计算
def crc32_numpy(data: np.ndarray) -> int:
"""使用NumPy计算CRC32"""
import binascii
return binascii.crc32(data.tobytes()) & 0xFFFFFFFF
9. 实战案例
案例:FPGA信号分析工具
"""
实战案例:FPGA ADC信号分析工具
"""
import numpy as np
from dataclasses import dataclass
from typing import Tuple, Optional
@dataclass
class SignalAnalysisResult:
"""信号分析结果"""
sample_count: int
sample_rate: float
min_value: float
max_value: float
mean_value: float
std_value: float
peak_to_peak: float
rms_value: float
dominant_freq: float
snr_db: float
thd_db: float
class FPGASignalAnalyzer:
"""FPGA信号分析器"""
def __init__(self, sample_rate: float, adc_bits: int = 12, vref: float = 3.3):
self.sample_rate = sample_rate
self.adc_bits = adc_bits
self.vref = vref
self.max_code = 2 ** adc_bits - 1
def load_data(self, filepath: str, dtype: str = '>i2') -> np.ndarray:
"""加载二进制数据"""
raw = np.fromfile(filepath, dtype=dtype)
return self.convert_to_voltage(raw)
def convert_to_voltage(self, raw_data: np.ndarray) -> np.ndarray:
"""转换为电压"""
return raw_data.astype(float) / self.max_code * self.vref
def analyze(self, signal: np.ndarray) -> SignalAnalysisResult:
"""分析信号"""
# 基本统计
min_val = np.min(signal)
max_val = np.max(signal)
mean_val = np.mean(signal)
std_val = np.std(signal)
peak_to_peak = max_val - min_val
rms_val = np.sqrt(np.mean(signal ** 2))
# 频谱分析
fft_result = np.fft.fft(signal)
fft_magnitude = np.abs(fft_result[:len(signal)//2])
fft_freq = np.fft.fftfreq(len(signal), 1/self.sample_rate)[:len(signal)//2]
# 主频
dominant_idx = np.argmax(fft_magnitude[1:]) + 1 # 排除DC
dominant_freq = fft_freq[dominant_idx]
# 信噪比
snr_db = self._calculate_snr(fft_magnitude, dominant_idx)
# 总谐波失真
thd_db = self._calculate_thd(fft_magnitude, dominant_idx)
return SignalAnalysisResult(
sample_count=len(signal),
sample_rate=self.sample_rate,
min_value=min_val,
max_value=max_val,
mean_value=mean_val,
std_value=std_val,
peak_to_peak=peak_to_peak,
rms_value=rms_val,
dominant_freq=dominant_freq,
snr_db=snr_db,
thd_db=thd_db
)
def _calculate_snr(self, fft_mag: np.ndarray, signal_idx: int,
window: int = 5) -> float:
"""计算信噪比"""
# 信号功率(主频附近)
signal_power = np.sum(fft_mag[signal_idx-window:signal_idx+window+1] ** 2)
# 总功率
total_power = np.sum(fft_mag ** 2)
# 噪声功率
noise_power = total_power - signal_power
if noise_power <= 0:
return float('inf')
return 10 * np.log10(signal_power / noise_power)
def _calculate_thd(self, fft_mag: np.ndarray, fundamental_idx: int,
num_harmonics: int = 5) -> float:
"""计算总谐波失真"""
fundamental_power = fft_mag[fundamental_idx] ** 2
harmonic_power = 0
for i in range(2, num_harmonics + 2):
harmonic_idx = fundamental_idx * i
if harmonic_idx < len(fft_mag):
harmonic_power += fft_mag[harmonic_idx] ** 2
if fundamental_power <= 0:
return float('inf')
return 10 * np.log10(harmonic_power / fundamental_power)
def generate_report(self, result: SignalAnalysisResult) -> str:
"""生成分析报告"""
report = f"""
信号分析报告
============
采样点数:{result.sample_count}
采样率:{result.sample_rate/1000:.1f} kHz
电压统计
--------
最小值:{result.min_value:.4f} V
最大值:{result.max_value:.4f} V
均值:{result.mean_value:.4f} V
标准差:{result.std_value:.4f} V
峰峰值:{result.peak_to_peak:.4f} V
RMS值:{result.rms_value:.4f} V
频谱分析
--------
主频:{result.dominant_freq:.2f} Hz
信噪比:{result.snr_db:.2f} dB
THD:{result.thd_db:.2f} dB
"""
return report
# 使用示例
def demo():
# 创建分析器
analyzer = FPGASignalAnalyzer(sample_rate=100000, adc_bits=12)
# 生成测试信号
t = np.arange(0, 0.1, 1/100000)
signal = 1.5 + 1.0 * np.sin(2 * np.pi * 1000 * t) # 1kHz正弦波
signal += 0.1 * np.sin(2 * np.pi * 2000 * t) # 二次谐波
signal += 0.05 * np.random.randn(len(t)) # 噪声
# 分析
result = analyzer.analyze(signal)
# 生成报告
report = analyzer.generate_report(result)
print(report)
demo()
10. 总结
🔑 核心要点
| 知识点 | 要点 |
|---|---|
| 数组创建 | np.array(), np.zeros(), np.arange() |
| 数组操作 | 索引、切片、reshape、拼接 |
| 数学运算 | 向量化运算、矩阵运算、FFT |
| 统计函数 | mean, std, min, max, histogram |
| 文件读写 | np.save(), np.fromfile(), np.frombuffer() |
| FPGA应用 | ADC数据处理、LUT生成、滤波器系数 |
✅ 学习检查清单
- 能创建和操作NumPy数组
- 掌握基本数学运算
- 能进行简单的信号处理
- 能读写二进制数据文件
- 能应用于FPGA数据处理
📖 下一步学习
掌握了NumPy数据处理后,让我们学习数据可视化:
常见问题 FAQ
💬 NumPy数组和Python列表有什么区别?
NumPy数组元素类型必须相同(像C数组),支持向量化运算,速度快100倍。列表元素可以不同类型,灵活但慢。数值计算用NumPy,通用数据用列表。
💬 NumPy的广播机制是什么?
不同形状的数组运算时,NumPy自动扩展小数组以匹配大数组。比如array + 1会给每个元素加1,matrix * vector会逐行乘。理解广播是写高效NumPy代码的关键。
� 系列导航
- 上一篇:23 - Python二进制协议解析与校验
- 当前:24 - Python NumPy数据处理
- 下一篇:25 - Python数据可视化