Veloris.
返回索引
设计实战 2026-02-14

Python NumPy数据处理:用数组运算替代for循环,速度快100倍

2 分钟
477 words

Python NumPy数据处理:用数组运算替代for循环,速度快100倍

NumPy是Python科学计算的基础库,提供高效的多维数组操作。在FPGA开发中,NumPy常用于信号处理、数据分析、波形生成和验证等场景。


1. NumPy简介

pip install numpy
import numpy as np

# NumPy的核心是ndarray(N-dimensional array)
# 优点:
# 1. 高效的数值计算(底层C实现)
# 2. 向量化操作(无需循环)
# 3. 广播机制
# 4. 丰富的数学函数

# 查看版本
print(np.__version__)

# 基本数组
arr = np.array([1, 2, 3, 4, 5])
print(f"数组:{arr}")
print(f"类型:{type(arr)}")
print(f"数据类型:{arr.dtype}")
print(f"形状:{arr.shape}")
print(f"维度:{arr.ndim}")
print(f"元素数:{arr.size}")

2. 数组创建

import numpy as np

# 从列表创建
arr1 = np.array([1, 2, 3])
arr2 = np.array([[1, 2, 3], [4, 5, 6]])  # 二维

# 指定数据类型
arr_int = np.array([1, 2, 3], dtype=np.int32)
arr_float = np.array([1, 2, 3], dtype=np.float64)
arr_complex = np.array([1+2j, 3+4j], dtype=np.complex128)

# 常用数据类型
# np.int8, np.int16, np.int32, np.int64
# np.uint8, np.uint16, np.uint32, np.uint64
# np.float32, np.float64
# np.complex64, np.complex128
# np.bool_

# 特殊数组
zeros = np.zeros((3, 4))           # 全零
ones = np.ones((3, 4))             # 全一
empty = np.empty((3, 4))           # 未初始化
full = np.full((3, 4), 7)          # 填充指定值
eye = np.eye(4)                    # 单位矩阵
diag = np.diag([1, 2, 3])          # 对角矩阵

# 序列数组
arange = np.arange(0, 10, 2)       # [0, 2, 4, 6, 8]
linspace = np.linspace(0, 1, 5)    # [0, 0.25, 0.5, 0.75, 1]
logspace = np.logspace(0, 2, 5)    # 对数等分

# 随机数组
rand = np.random.rand(3, 4)        # [0, 1)均匀分布
randn = np.random.randn(3, 4)      # 标准正态分布
randint = np.random.randint(0, 10, (3, 4))  # 随机整数

# 从字节创建(FPGA数据常用)
data = b'\x01\x02\x03\x04'
arr_from_bytes = np.frombuffer(data, dtype=np.uint8)
arr_from_bytes = np.frombuffer(data, dtype='>i2')  # 大端16位整数

# 从文件创建
# arr = np.fromfile('data.bin', dtype=np.int16)

3. 数组操作

import numpy as np

arr = np.arange(12).reshape(3, 4)
print(arr)
# [[ 0  1  2  3]
#  [ 4  5  6  7]
#  [ 8  9 10 11]]

# 索引和切片
print(arr[0, 0])      # 0
print(arr[1, :])      # [4 5 6 7]
print(arr[:, 2])      # [2 6 10]
print(arr[0:2, 1:3])  # [[1 2] [5 6]]

# 布尔索引
print(arr[arr > 5])   # [6 7 8 9 10 11]
arr[arr > 5] = 0      # 条件赋值

# 花式索引
print(arr[[0, 2], :])  # 第0行和第2行

# 形状操作
arr2 = arr.reshape(4, 3)    # 改变形状
arr3 = arr.flatten()        # 展平为一维
arr4 = arr.T                # 转置
arr5 = arr.ravel()          # 展平(返回视图)

# 数组拼接
a = np.array([[1, 2], [3, 4]])
b = np.array([[5, 6], [7, 8]])

concat_v = np.vstack((a, b))  # 垂直拼接
concat_h = np.hstack((a, b))  # 水平拼接
concat = np.concatenate((a, b), axis=0)  # 沿轴拼接

# 数组分割
arr = np.arange(12)
split = np.split(arr, 3)      # 等分为3份
split = np.split(arr, [3, 7]) # 在索引3和7处分割

# 数组复制
arr_view = arr.view()    # 视图(共享数据)
arr_copy = arr.copy()    # 深复制

# 广播机制
a = np.array([[1], [2], [3]])  # (3, 1)
b = np.array([10, 20, 30])     # (3,)
c = a + b  # 广播后相加
# [[11 21 31]
#  [12 22 32]
#  [13 23 33]]

4. 数学运算

import numpy as np

a = np.array([1, 2, 3, 4])
b = np.array([5, 6, 7, 8])

# 基本运算(逐元素)
print(a + b)      # [ 6  8 10 12]
print(a - b)      # [-4 -4 -4 -4]
print(a * b)      # [ 5 12 21 32]
print(a / b)      # [0.2 0.33 0.43 0.5]
print(a ** 2)     # [ 1  4  9 16]
print(np.sqrt(a)) # [1. 1.41 1.73 2.]

# 三角函数
theta = np.linspace(0, 2*np.pi, 100)
sin_wave = np.sin(theta)
cos_wave = np.cos(theta)

# 指数和对数
exp = np.exp(a)       # e^x
log = np.log(a)       # ln(x)
log10 = np.log10(a)   # log10(x)
log2 = np.log2(a)     # log2(x)

# 矩阵运算
A = np.array([[1, 2], [3, 4]])
B = np.array([[5, 6], [7, 8]])

dot = np.dot(A, B)     # 矩阵乘法
dot = A @ B            # 同上(Python 3.5+)
element = A * B        # 逐元素乘法

# 线性代数
det = np.linalg.det(A)           # 行列式
inv = np.linalg.inv(A)           # 逆矩阵
eig = np.linalg.eig(A)           # 特征值和特征向量
svd = np.linalg.svd(A)           # 奇异值分解
norm = np.linalg.norm(a)         # 范数

# 位运算(整数数组)
x = np.array([0b1010, 0b1100], dtype=np.uint8)
y = np.array([0b0110, 0b1010], dtype=np.uint8)

print(np.bitwise_and(x, y))  # 按位与
print(np.bitwise_or(x, y))   # 按位或
print(np.bitwise_xor(x, y))  # 按位异或
print(np.left_shift(x, 2))   # 左移
print(np.right_shift(x, 2))  # 右移

5. 统计函数

import numpy as np

arr = np.random.randn(100)

# 基本统计
print(f"最小值:{np.min(arr):.4f}")
print(f"最大值:{np.max(arr):.4f}")
print(f"均值:{np.mean(arr):.4f}")
print(f"中位数:{np.median(arr):.4f}")
print(f"标准差:{np.std(arr):.4f}")
print(f"方差:{np.var(arr):.4f}")
print(f"求和:{np.sum(arr):.4f}")

# 位置
print(f"最小值索引:{np.argmin(arr)}")
print(f"最大值索引:{np.argmax(arr)}")

# 百分位数
print(f"25%分位:{np.percentile(arr, 25):.4f}")
print(f"75%分位:{np.percentile(arr, 75):.4f}")

# 多维数组统计
arr2d = np.random.randn(3, 4)
print(f"全局均值:{np.mean(arr2d):.4f}")
print(f"列均值:{np.mean(arr2d, axis=0)}")  # 沿行方向
print(f"行均值:{np.mean(arr2d, axis=1)}")  # 沿列方向

# 累积运算
cumsum = np.cumsum(arr)   # 累积和
cumprod = np.cumprod(arr) # 累积积

# 相关性
x = np.random.randn(100)
y = x + np.random.randn(100) * 0.5
corr = np.corrcoef(x, y)
print(f"相关系数:{corr[0, 1]:.4f}")

# 直方图
hist, bins = np.histogram(arr, bins=10)
print(f"直方图:{hist}")
print(f"边界:{bins}")

6. 信号处理基础

import numpy as np

# 生成信号
fs = 1000  # 采样率
t = np.arange(0, 1, 1/fs)  # 时间轴

# 正弦波
freq = 10  # 频率
amplitude = 1.0
phase = 0
sine_wave = amplitude * np.sin(2 * np.pi * freq * t + phase)

# 方波
square_wave = np.sign(np.sin(2 * np.pi * freq * t))

# 锯齿波
sawtooth = 2 * (t * freq - np.floor(0.5 + t * freq))

# 添加噪声
noise = np.random.randn(len(t)) * 0.1
noisy_signal = sine_wave + noise

# FFT(快速傅里叶变换)
fft_result = np.fft.fft(sine_wave)
fft_freq = np.fft.fftfreq(len(t), 1/fs)
fft_magnitude = np.abs(fft_result)

# 只取正频率部分
positive_freq = fft_freq[:len(fft_freq)//2]
positive_mag = fft_magnitude[:len(fft_magnitude)//2] * 2 / len(t)

# 逆FFT
reconstructed = np.fft.ifft(fft_result)

# 卷积
kernel = np.ones(10) / 10  # 移动平均滤波器
filtered = np.convolve(noisy_signal, kernel, mode='same')

# 相关
correlation = np.correlate(sine_wave, sine_wave, mode='full')

# 窗函数
hamming = np.hamming(len(t))
hanning = np.hanning(len(t))
blackman = np.blackman(len(t))

windowed_signal = sine_wave * hamming

# 信噪比计算
def calculate_snr(signal, noise):
    """计算信噪比(dB)"""
    signal_power = np.mean(signal ** 2)
    noise_power = np.mean(noise ** 2)
    return 10 * np.log10(signal_power / noise_power)

snr = calculate_snr(sine_wave, noise)
print(f"信噪比:{snr:.2f} dB")

7. 文件读写

import numpy as np

# 保存和加载NumPy格式
arr = np.random.randn(100, 100)

# 单个数组
np.save('array.npy', arr)
loaded = np.load('array.npy')

# 多个数组
np.savez('arrays.npz', arr1=arr, arr2=arr*2)
data = np.load('arrays.npz')
print(data['arr1'])
print(data['arr2'])

# 压缩保存
np.savez_compressed('arrays_compressed.npz', arr1=arr)

# 文本文件
np.savetxt('array.txt', arr, delimiter=',', fmt='%.4f')
loaded_txt = np.loadtxt('array.txt', delimiter=',')

# 二进制文件(FPGA数据常用)
arr_int16 = np.array([1, 2, 3, 4, 5], dtype=np.int16)
arr_int16.tofile('data.bin')

# 读取二进制
loaded_bin = np.fromfile('data.bin', dtype=np.int16)

# 指定字节序
arr_be = np.array([0x1234, 0x5678], dtype='>i2')  # 大端
arr_le = np.array([0x1234, 0x5678], dtype='<i2')  # 小端

# 内存映射(大文件)
mmap = np.memmap('large_file.bin', dtype=np.float32, mode='r', shape=(1000, 1000))
# 只读取需要的部分,不加载整个文件

# 从字节转换
raw_bytes = b'\x00\x01\x00\x02\x00\x03'
arr_from_bytes = np.frombuffer(raw_bytes, dtype='>i2')
print(arr_from_bytes)  # [1 2 3]

# 转换为字节
arr_to_bytes = arr_int16.tobytes()
print(arr_to_bytes.hex())

8. FPGA应用场景

import numpy as np

# 1. ADC数据处理
def process_adc_data(raw_data: bytes, bits: int = 12, vref: float = 3.3) -> np.ndarray:
    """处理ADC原始数据"""
    # 假设数据是大端16位
    samples = np.frombuffer(raw_data, dtype='>i2')
    
    # 转换为电压
    max_code = 2 ** bits - 1
    voltage = samples.astype(float) / max_code * vref
    
    return voltage

# 2. 生成测试波形
def generate_test_waveform(freq: float, fs: float, duration: float, 
                           bits: int = 12) -> np.ndarray:
    """生成测试正弦波"""
    t = np.arange(0, duration, 1/fs)
    wave = np.sin(2 * np.pi * freq * t)
    
    # 量化
    max_code = 2 ** (bits - 1) - 1
    quantized = np.round(wave * max_code).astype(np.int16)
    
    return quantized

# 3. 生成查找表(LUT)
def generate_sine_lut(depth: int, width: int) -> np.ndarray:
    """生成正弦查找表"""
    # depth: 地址位数(表深度 = 2^depth)
    # width: 数据位数
    
    table_size = 2 ** depth
    max_value = 2 ** (width - 1) - 1
    
    # 生成一个周期的正弦波
    indices = np.arange(table_size)
    lut = np.round(max_value * np.sin(2 * np.pi * indices / table_size))
    
    return lut.astype(np.int16)

# 4. 生成滤波器系数
def generate_fir_coefficients(num_taps: int, cutoff: float, fs: float,
                              bits: int = 16) -> np.ndarray:
    """生成FIR低通滤波器系数"""
    # 归一化截止频率
    normalized_cutoff = cutoff / (fs / 2)
    
    # 理想低通滤波器
    n = np.arange(num_taps)
    center = (num_taps - 1) / 2
    
    # sinc函数
    h = np.sinc(2 * normalized_cutoff * (n - center))
    
    # 加窗
    window = np.hamming(num_taps)
    h = h * window
    
    # 归一化
    h = h / np.sum(h)
    
    # 量化为定点数
    scale = 2 ** (bits - 1) - 1
    h_quantized = np.round(h * scale).astype(np.int16)
    
    return h_quantized

# 5. 数据比对
def compare_data(expected: np.ndarray, actual: np.ndarray, 
                 tolerance: float = 0) -> dict:
    """比对数据"""
    if expected.shape != actual.shape:
        return {'match': False, 'error': 'Shape mismatch'}
    
    diff = np.abs(expected.astype(float) - actual.astype(float))
    
    if tolerance == 0:
        match = np.array_equal(expected, actual)
    else:
        match = np.all(diff <= tolerance)
    
    return {
        'match': match,
        'max_diff': np.max(diff),
        'mean_diff': np.mean(diff),
        'mismatch_count': np.sum(diff > tolerance),
        'mismatch_indices': np.where(diff > tolerance)[0]
    }

# 6. 生成测试向量
def generate_test_vectors(num_vectors: int, data_width: int) -> np.ndarray:
    """生成随机测试向量"""
    max_value = 2 ** data_width
    return np.random.randint(0, max_value, num_vectors, dtype=np.uint32)

# 7. CRC计算
def crc32_numpy(data: np.ndarray) -> int:
    """使用NumPy计算CRC32"""
    import binascii
    return binascii.crc32(data.tobytes()) & 0xFFFFFFFF

9. 实战案例

案例:FPGA信号分析工具

"""
实战案例:FPGA ADC信号分析工具
"""
import numpy as np
from dataclasses import dataclass
from typing import Tuple, Optional

@dataclass
class SignalAnalysisResult:
    """信号分析结果"""
    sample_count: int
    sample_rate: float
    min_value: float
    max_value: float
    mean_value: float
    std_value: float
    peak_to_peak: float
    rms_value: float
    dominant_freq: float
    snr_db: float
    thd_db: float

class FPGASignalAnalyzer:
    """FPGA信号分析器"""
    
    def __init__(self, sample_rate: float, adc_bits: int = 12, vref: float = 3.3):
        self.sample_rate = sample_rate
        self.adc_bits = adc_bits
        self.vref = vref
        self.max_code = 2 ** adc_bits - 1
    
    def load_data(self, filepath: str, dtype: str = '>i2') -> np.ndarray:
        """加载二进制数据"""
        raw = np.fromfile(filepath, dtype=dtype)
        return self.convert_to_voltage(raw)
    
    def convert_to_voltage(self, raw_data: np.ndarray) -> np.ndarray:
        """转换为电压"""
        return raw_data.astype(float) / self.max_code * self.vref
    
    def analyze(self, signal: np.ndarray) -> SignalAnalysisResult:
        """分析信号"""
        # 基本统计
        min_val = np.min(signal)
        max_val = np.max(signal)
        mean_val = np.mean(signal)
        std_val = np.std(signal)
        peak_to_peak = max_val - min_val
        rms_val = np.sqrt(np.mean(signal ** 2))
        
        # 频谱分析
        fft_result = np.fft.fft(signal)
        fft_magnitude = np.abs(fft_result[:len(signal)//2])
        fft_freq = np.fft.fftfreq(len(signal), 1/self.sample_rate)[:len(signal)//2]
        
        # 主频
        dominant_idx = np.argmax(fft_magnitude[1:]) + 1  # 排除DC
        dominant_freq = fft_freq[dominant_idx]
        
        # 信噪比
        snr_db = self._calculate_snr(fft_magnitude, dominant_idx)
        
        # 总谐波失真
        thd_db = self._calculate_thd(fft_magnitude, dominant_idx)
        
        return SignalAnalysisResult(
            sample_count=len(signal),
            sample_rate=self.sample_rate,
            min_value=min_val,
            max_value=max_val,
            mean_value=mean_val,
            std_value=std_val,
            peak_to_peak=peak_to_peak,
            rms_value=rms_val,
            dominant_freq=dominant_freq,
            snr_db=snr_db,
            thd_db=thd_db
        )
    
    def _calculate_snr(self, fft_mag: np.ndarray, signal_idx: int, 
                       window: int = 5) -> float:
        """计算信噪比"""
        # 信号功率(主频附近)
        signal_power = np.sum(fft_mag[signal_idx-window:signal_idx+window+1] ** 2)
        
        # 总功率
        total_power = np.sum(fft_mag ** 2)
        
        # 噪声功率
        noise_power = total_power - signal_power
        
        if noise_power <= 0:
            return float('inf')
        
        return 10 * np.log10(signal_power / noise_power)
    
    def _calculate_thd(self, fft_mag: np.ndarray, fundamental_idx: int,
                       num_harmonics: int = 5) -> float:
        """计算总谐波失真"""
        fundamental_power = fft_mag[fundamental_idx] ** 2
        
        harmonic_power = 0
        for i in range(2, num_harmonics + 2):
            harmonic_idx = fundamental_idx * i
            if harmonic_idx < len(fft_mag):
                harmonic_power += fft_mag[harmonic_idx] ** 2
        
        if fundamental_power <= 0:
            return float('inf')
        
        return 10 * np.log10(harmonic_power / fundamental_power)
    
    def generate_report(self, result: SignalAnalysisResult) -> str:
        """生成分析报告"""
        report = f"""
信号分析报告
============
采样点数:{result.sample_count}
采样率:{result.sample_rate/1000:.1f} kHz

电压统计
--------
最小值:{result.min_value:.4f} V
最大值:{result.max_value:.4f} V
均值:{result.mean_value:.4f} V
标准差:{result.std_value:.4f} V
峰峰值:{result.peak_to_peak:.4f} V
RMS值:{result.rms_value:.4f} V

频谱分析
--------
主频:{result.dominant_freq:.2f} Hz
信噪比:{result.snr_db:.2f} dB
THD:{result.thd_db:.2f} dB
"""
        return report

# 使用示例
def demo():
    # 创建分析器
    analyzer = FPGASignalAnalyzer(sample_rate=100000, adc_bits=12)
    
    # 生成测试信号
    t = np.arange(0, 0.1, 1/100000)
    signal = 1.5 + 1.0 * np.sin(2 * np.pi * 1000 * t)  # 1kHz正弦波
    signal += 0.1 * np.sin(2 * np.pi * 2000 * t)       # 二次谐波
    signal += 0.05 * np.random.randn(len(t))           # 噪声
    
    # 分析
    result = analyzer.analyze(signal)
    
    # 生成报告
    report = analyzer.generate_report(result)
    print(report)

demo()

10. 总结

🔑 核心要点

知识点要点
数组创建np.array(), np.zeros(), np.arange()
数组操作索引、切片、reshape、拼接
数学运算向量化运算、矩阵运算、FFT
统计函数mean, std, min, max, histogram
文件读写np.save(), np.fromfile(), np.frombuffer()
FPGA应用ADC数据处理、LUT生成、滤波器系数

✅ 学习检查清单

  • 能创建和操作NumPy数组
  • 掌握基本数学运算
  • 能进行简单的信号处理
  • 能读写二进制数据文件
  • 能应用于FPGA数据处理

📖 下一步学习

掌握了NumPy数据处理后,让我们学习数据可视化:


常见问题 FAQ

💬 NumPy数组和Python列表有什么区别?

NumPy数组元素类型必须相同(像C数组),支持向量化运算,速度快100倍。列表元素可以不同类型,灵活但慢。数值计算用NumPy,通用数据用列表。

💬 NumPy的广播机制是什么?

不同形状的数组运算时,NumPy自动扩展小数组以匹配大数组。比如array + 1会给每个元素加1,matrix * vector会逐行乘。理解广播是写高效NumPy代码的关键。


系列导航

End of file.