Veloris.
返回索引
概念基础 2026-02-14

Python迭代器与生成器:yield一个关键字,让你处理10GB文件不爆内存

2 分钟
681 words

Python迭代器与生成器:yield一个关键字,让你处理10GB文件不爆内存

迭代器和生成器是Python的高级特性,用于高效处理序列数据。生成器特别适合处理大数据集,因为它们按需生成数据而不是一次性加载到内存。


1. 可迭代对象与迭代器

# 可迭代对象(Iterable):可以用for循环遍历的对象
# 列表、元组、字符串、字典、集合、文件等都是可迭代对象

my_list = [1, 2, 3]
for item in my_list:
    print(item)

# 迭代器(Iterator):实现了__iter__和__next__方法的对象
# 可以用iter()将可迭代对象转换为迭代器

my_iter = iter(my_list)
print(next(my_iter))  # 1
print(next(my_iter))  # 2
print(next(my_iter))  # 3
# print(next(my_iter))  # StopIteration异常

# for循环的本质
# for item in my_list:
#     print(item)
# 等价于:
my_iter = iter(my_list)
while True:
    try:
        item = next(my_iter)
        print(item)
    except StopIteration:
        break

# 检查是否可迭代
from collections.abc import Iterable, Iterator

print(isinstance([1, 2, 3], Iterable))  # True
print(isinstance([1, 2, 3], Iterator))  # False
print(isinstance(iter([1, 2, 3]), Iterator))  # True

2. 自定义迭代器

class CountDown:
    """倒计时迭代器"""
    
    def __init__(self, start):
        self.start = start
    
    def __iter__(self):
        return self
    
    def __next__(self):
        if self.start <= 0:
            raise StopIteration
        self.start -= 1
        return self.start + 1

# 使用
for num in CountDown(5):
    print(num)  # 5, 4, 3, 2, 1

# 更复杂的例子:斐波那契数列
class Fibonacci:
    """斐波那契数列迭代器"""
    
    def __init__(self, max_count):
        self.max_count = max_count
        self.count = 0
        self.a, self.b = 0, 1
    
    def __iter__(self):
        return self
    
    def __next__(self):
        if self.count >= self.max_count:
            raise StopIteration
        self.count += 1
        result = self.a
        self.a, self.b = self.b, self.a + self.b
        return result

# 使用
for num in Fibonacci(10):
    print(num, end=" ")  # 0 1 1 2 3 5 8 13 21 34

3. 生成器函数

生成器是一种特殊的迭代器,使用yield关键字定义。

# 生成器函数
def countdown(n):
    """倒计时生成器"""
    while n > 0:
        yield n  # 暂停并返回值
        n -= 1

# 使用
for num in countdown(5):
    print(num)  # 5, 4, 3, 2, 1

# 生成器对象
gen = countdown(3)
print(type(gen))     # <class 'generator'>
print(next(gen))     # 3
print(next(gen))     # 2
print(next(gen))     # 1
# print(next(gen))   # StopIteration

# 斐波那契生成器(比类更简洁)
def fibonacci(max_count):
    a, b = 0, 1
    count = 0
    while count < max_count:
        yield a
        a, b = b, a + b
        count += 1

print(list(fibonacci(10)))  # [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]

# 无限生成器
def infinite_counter(start=0):
    """无限计数器"""
    while True:
        yield start
        start += 1

# 使用时需要限制
counter = infinite_counter()
for i, num in enumerate(counter):
    if i >= 5:
        break
    print(num)  # 0, 1, 2, 3, 4

yield vs return

特性returnyield
返回次数一次多次
函数状态终止暂停,保留状态
返回类型具体值生成器对象
内存使用一次性按需生成

4. 生成器表达式

# 列表推导式(立即生成所有元素)
squares_list = [x**2 for x in range(10)]
print(type(squares_list))  # <class 'list'>

# 生成器表达式(惰性求值)
squares_gen = (x**2 for x in range(10))
print(type(squares_gen))   # <class 'generator'>

# 内存对比
import sys

list_comp = [x**2 for x in range(1000000)]
gen_exp = (x**2 for x in range(1000000))

print(f"列表大小:{sys.getsizeof(list_comp):,} 字节")  # 约8MB
print(f"生成器大小:{sys.getsizeof(gen_exp):,} 字节")  # 约120字节

# 在函数中使用(可省略括号)
total = sum(x**2 for x in range(10))  # 285
maximum = max(x**2 for x in range(10))  # 81

# 带条件的生成器表达式
even_squares = (x**2 for x in range(10) if x % 2 == 0)
print(list(even_squares))  # [0, 4, 16, 36, 64]

# 生成器只能遍历一次
gen = (x for x in range(3))
print(list(gen))  # [0, 1, 2]
print(list(gen))  # [](已耗尽)

5. yield from

yield from用于委托给子生成器。

# 不使用yield from
def chain_manual(*iterables):
    for iterable in iterables:
        for item in iterable:
            yield item

# 使用yield from(更简洁)
def chain(*iterables):
    for iterable in iterables:
        yield from iterable

# 使用
result = list(chain([1, 2], [3, 4], [5, 6]))
print(result)  # [1, 2, 3, 4, 5, 6]

# 递归生成器
def flatten(nested):
    """展平嵌套列表"""
    for item in nested:
        if isinstance(item, list):
            yield from flatten(item)
        else:
            yield item

nested = [1, [2, 3, [4, 5]], 6, [7, [8, 9]]]
print(list(flatten(nested)))  # [1, 2, 3, 4, 5, 6, 7, 8, 9]

# 树遍历
class Node:
    def __init__(self, value, children=None):
        self.value = value
        self.children = children or []

def traverse(node):
    """前序遍历"""
    yield node.value
    for child in node.children:
        yield from traverse(child)

# 创建树
root = Node(1, [
    Node(2, [Node(4), Node(5)]),
    Node(3, [Node(6)])
])

print(list(traverse(root)))  # [1, 2, 4, 5, 3, 6]

6. 生成器的高级用法

# send():向生成器发送值
def echo():
    while True:
        received = yield
        print(f"收到:{received}")

gen = echo()
next(gen)  # 启动生成器
gen.send("Hello")  # 收到:Hello
gen.send("World")  # 收到:World

# 带返回值的send
def accumulator():
    total = 0
    while True:
        value = yield total
        if value is None:
            break
        total += value

gen = accumulator()
print(next(gen))      # 0
print(gen.send(10))   # 10
print(gen.send(20))   # 30
print(gen.send(30))   # 60

# close():关闭生成器
def counter():
    n = 0
    try:
        while True:
            yield n
            n += 1
    except GeneratorExit:
        print("生成器被关闭")

gen = counter()
print(next(gen))  # 0
print(next(gen))  # 1
gen.close()       # 生成器被关闭

# throw():向生成器抛出异常
def careful_generator():
    try:
        yield 1
        yield 2
        yield 3
    except ValueError:
        yield "捕获到ValueError"

gen = careful_generator()
print(next(gen))  # 1
print(gen.throw(ValueError))  # 捕获到ValueError

7. itertools模块

itertools提供了高效的迭代器工具。

import itertools

# count:无限计数器
for i in itertools.count(10, 2):  # 从10开始,步长2
    if i > 20:
        break
    print(i, end=" ")  # 10 12 14 16 18 20

# cycle:无限循环
colors = itertools.cycle(['红', '绿', '蓝'])
for i, color in enumerate(colors):
    if i >= 6:
        break
    print(color, end=" ")  # 红 绿 蓝 红 绿 蓝

# repeat:重复
print(list(itertools.repeat('A', 3)))  # ['A', 'A', 'A']

# chain:连接多个迭代器
print(list(itertools.chain([1, 2], [3, 4], [5])))  # [1, 2, 3, 4, 5]

# islice:切片迭代器
print(list(itertools.islice(range(100), 5, 10)))  # [5, 6, 7, 8, 9]

# takewhile/dropwhile:条件过滤
print(list(itertools.takewhile(lambda x: x < 5, [1, 3, 5, 2, 1])))  # [1, 3]
print(list(itertools.dropwhile(lambda x: x < 5, [1, 3, 5, 2, 1])))  # [5, 2, 1]

# groupby:分组
data = [('A', 1), ('A', 2), ('B', 3), ('B', 4), ('A', 5)]
data.sort(key=lambda x: x[0])  # 必须先排序
for key, group in itertools.groupby(data, key=lambda x: x[0]):
    print(f"{key}: {list(group)}")

# permutations:排列
print(list(itertools.permutations([1, 2, 3], 2)))
# [(1, 2), (1, 3), (2, 1), (2, 3), (3, 1), (3, 2)]

# combinations:组合
print(list(itertools.combinations([1, 2, 3], 2)))
# [(1, 2), (1, 3), (2, 3)]

# product:笛卡尔积
print(list(itertools.product([1, 2], ['a', 'b'])))
# [(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]

# accumulate:累积
print(list(itertools.accumulate([1, 2, 3, 4, 5])))  # [1, 3, 6, 10, 15]

8. 实际应用场景

8.1 读取大文件

def read_large_file(filepath, chunk_size=1024*1024):
    """分块读取大文件"""
    with open(filepath, 'r', encoding='utf-8') as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            yield chunk

def read_lines(filepath):
    """逐行读取文件"""
    with open(filepath, 'r', encoding='utf-8') as f:
        for line in f:
            yield line.strip()

# 使用
# for line in read_lines('huge_file.txt'):
#     process(line)

8.2 数据管道

def read_data(filename):
    """读取数据"""
    with open(filename) as f:
        for line in f:
            yield line.strip()

def parse_data(lines):
    """解析数据"""
    for line in lines:
        fields = line.split(',')
        yield {'name': fields[0], 'value': int(fields[1])}

def filter_data(records, min_value):
    """过滤数据"""
    for record in records:
        if record['value'] >= min_value:
            yield record

def transform_data(records):
    """转换数据"""
    for record in records:
        record['value'] *= 2
        yield record

# 构建管道
# pipeline = transform_data(
#     filter_data(
#         parse_data(
#             read_data('data.csv')
#         ),
#         min_value=10
#     )
# )
# for item in pipeline:
#     print(item)

8.3 批量处理

def batch(iterable, size):
    """将迭代器分批"""
    batch = []
    for item in iterable:
        batch.append(item)
        if len(batch) == size:
            yield batch
            batch = []
    if batch:
        yield batch

# 使用
data = range(10)
for b in batch(data, 3):
    print(b)
# [0, 1, 2]
# [3, 4, 5]
# [6, 7, 8]
# [9]

9. 常见错误与避坑

❌ 错误1:生成器只能遍历一次

gen = (x for x in range(3))
print(list(gen))  # [0, 1, 2]
print(list(gen))  # [](已耗尽!)

# 解决:重新创建生成器或转换为列表

❌ 错误2:在生成器中修改外部变量

# 问题:闭包陷阱
funcs = [lambda: i for i in range(3)]
print([f() for f in funcs])  # [2, 2, 2](不是[0, 1, 2])

# 解决
funcs = [lambda i=i: i for i in range(3)]
print([f() for f in funcs])  # [0, 1, 2]

❌ 错误3:忘记启动生成器

def coroutine():
    while True:
        value = yield
        print(f"收到:{value}")

gen = coroutine()
# gen.send("Hello")  # TypeError: can't send non-None value

# 正确:先调用next()启动
gen = coroutine()
next(gen)  # 启动
gen.send("Hello")  # 收到:Hello

10. 实战练习

练习:日志文件分析器

"""
练习:使用生成器实现日志分析器
"""
import re
from datetime import datetime
from collections import Counter

def read_log_lines(filepath):
    """读取日志行"""
    with open(filepath, 'r', encoding='utf-8') as f:
        for line in f:
            yield line.strip()

def parse_log_entry(lines):
    """解析日志条目"""
    pattern = r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\] \[(\w+)\] (.+)'
    for line in lines:
        match = re.match(pattern, line)
        if match:
            yield {
                'timestamp': datetime.strptime(match.group(1), '%Y-%m-%d %H:%M:%S'),
                'level': match.group(2),
                'message': match.group(3)
            }

def filter_by_level(entries, level):
    """按级别过滤"""
    for entry in entries:
        if entry['level'] == level:
            yield entry

def filter_by_time(entries, start_time, end_time):
    """按时间范围过滤"""
    for entry in entries:
        if start_time <= entry['timestamp'] <= end_time:
            yield entry

def count_by_level(entries):
    """统计各级别数量"""
    counter = Counter()
    for entry in entries:
        counter[entry['level']] += 1
    return counter

# 使用示例
# pipeline = parse_log_entry(read_log_lines('app.log'))
# errors = filter_by_level(pipeline, 'ERROR')
# for error in errors:
#     print(error)

11. 总结

🔑 核心要点

知识点要点
可迭代对象实现__iter__方法
迭代器实现__iter__和__next__方法
生成器函数使用yield关键字
生成器表达式(expr for x in iterable)
yield from委托给子生成器
itertools高效的迭代器工具库
内存效率生成器按需生成,节省内存

✅ 学习检查清单

  • 理解可迭代对象和迭代器的区别
  • 能编写生成器函数
  • 掌握生成器表达式
  • 了解yield from的用法
  • 能使用itertools常用函数
  • 理解生成器的内存优势

📖 下一步学习

掌握了迭代器与生成器后,让我们进入办公自动化实战部分:


常见问题 FAQ

💬 生成器和列表推导式有什么区别?

[x for x in range(1000000)]创建100万个元素的列表,占用大量内存。(x for x in range(1000000))是生成器表达式,几乎不占内存,按需产出。处理大数据时用生成器。

💬 yield和return有什么区别?

return结束函数并返回值,yield暂停函数并产出值,下次调用next()时从暂停点继续执行。一个函数里有yield就自动变成生成器函数。


系列导航

End of file.