Python迭代器与生成器:yield一个关键字,让你处理10GB文件不爆内存
迭代器和生成器是Python的高级特性,用于高效处理序列数据。生成器特别适合处理大数据集,因为它们按需生成数据而不是一次性加载到内存。
1. 可迭代对象与迭代器
# 可迭代对象(Iterable):可以用for循环遍历的对象
# 列表、元组、字符串、字典、集合、文件等都是可迭代对象
my_list = [1, 2, 3]
for item in my_list:
print(item)
# 迭代器(Iterator):实现了__iter__和__next__方法的对象
# 可以用iter()将可迭代对象转换为迭代器
my_iter = iter(my_list)
print(next(my_iter)) # 1
print(next(my_iter)) # 2
print(next(my_iter)) # 3
# print(next(my_iter)) # StopIteration异常
# for循环的本质
# for item in my_list:
# print(item)
# 等价于:
my_iter = iter(my_list)
while True:
try:
item = next(my_iter)
print(item)
except StopIteration:
break
# 检查是否可迭代
from collections.abc import Iterable, Iterator
print(isinstance([1, 2, 3], Iterable)) # True
print(isinstance([1, 2, 3], Iterator)) # False
print(isinstance(iter([1, 2, 3]), Iterator)) # True
2. 自定义迭代器
class CountDown:
"""倒计时迭代器"""
def __init__(self, start):
self.start = start
def __iter__(self):
return self
def __next__(self):
if self.start <= 0:
raise StopIteration
self.start -= 1
return self.start + 1
# 使用
for num in CountDown(5):
print(num) # 5, 4, 3, 2, 1
# 更复杂的例子:斐波那契数列
class Fibonacci:
"""斐波那契数列迭代器"""
def __init__(self, max_count):
self.max_count = max_count
self.count = 0
self.a, self.b = 0, 1
def __iter__(self):
return self
def __next__(self):
if self.count >= self.max_count:
raise StopIteration
self.count += 1
result = self.a
self.a, self.b = self.b, self.a + self.b
return result
# 使用
for num in Fibonacci(10):
print(num, end=" ") # 0 1 1 2 3 5 8 13 21 34
3. 生成器函数
生成器是一种特殊的迭代器,使用yield关键字定义。
# 生成器函数
def countdown(n):
"""倒计时生成器"""
while n > 0:
yield n # 暂停并返回值
n -= 1
# 使用
for num in countdown(5):
print(num) # 5, 4, 3, 2, 1
# 生成器对象
gen = countdown(3)
print(type(gen)) # <class 'generator'>
print(next(gen)) # 3
print(next(gen)) # 2
print(next(gen)) # 1
# print(next(gen)) # StopIteration
# 斐波那契生成器(比类更简洁)
def fibonacci(max_count):
a, b = 0, 1
count = 0
while count < max_count:
yield a
a, b = b, a + b
count += 1
print(list(fibonacci(10))) # [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
# 无限生成器
def infinite_counter(start=0):
"""无限计数器"""
while True:
yield start
start += 1
# 使用时需要限制
counter = infinite_counter()
for i, num in enumerate(counter):
if i >= 5:
break
print(num) # 0, 1, 2, 3, 4
yield vs return:
| 特性 | return | yield |
|---|---|---|
| 返回次数 | 一次 | 多次 |
| 函数状态 | 终止 | 暂停,保留状态 |
| 返回类型 | 具体值 | 生成器对象 |
| 内存使用 | 一次性 | 按需生成 |
4. 生成器表达式
# 列表推导式(立即生成所有元素)
squares_list = [x**2 for x in range(10)]
print(type(squares_list)) # <class 'list'>
# 生成器表达式(惰性求值)
squares_gen = (x**2 for x in range(10))
print(type(squares_gen)) # <class 'generator'>
# 内存对比
import sys
list_comp = [x**2 for x in range(1000000)]
gen_exp = (x**2 for x in range(1000000))
print(f"列表大小:{sys.getsizeof(list_comp):,} 字节") # 约8MB
print(f"生成器大小:{sys.getsizeof(gen_exp):,} 字节") # 约120字节
# 在函数中使用(可省略括号)
total = sum(x**2 for x in range(10)) # 285
maximum = max(x**2 for x in range(10)) # 81
# 带条件的生成器表达式
even_squares = (x**2 for x in range(10) if x % 2 == 0)
print(list(even_squares)) # [0, 4, 16, 36, 64]
# 生成器只能遍历一次
gen = (x for x in range(3))
print(list(gen)) # [0, 1, 2]
print(list(gen)) # [](已耗尽)
5. yield from
yield from用于委托给子生成器。
# 不使用yield from
def chain_manual(*iterables):
for iterable in iterables:
for item in iterable:
yield item
# 使用yield from(更简洁)
def chain(*iterables):
for iterable in iterables:
yield from iterable
# 使用
result = list(chain([1, 2], [3, 4], [5, 6]))
print(result) # [1, 2, 3, 4, 5, 6]
# 递归生成器
def flatten(nested):
"""展平嵌套列表"""
for item in nested:
if isinstance(item, list):
yield from flatten(item)
else:
yield item
nested = [1, [2, 3, [4, 5]], 6, [7, [8, 9]]]
print(list(flatten(nested))) # [1, 2, 3, 4, 5, 6, 7, 8, 9]
# 树遍历
class Node:
def __init__(self, value, children=None):
self.value = value
self.children = children or []
def traverse(node):
"""前序遍历"""
yield node.value
for child in node.children:
yield from traverse(child)
# 创建树
root = Node(1, [
Node(2, [Node(4), Node(5)]),
Node(3, [Node(6)])
])
print(list(traverse(root))) # [1, 2, 4, 5, 3, 6]
6. 生成器的高级用法
# send():向生成器发送值
def echo():
while True:
received = yield
print(f"收到:{received}")
gen = echo()
next(gen) # 启动生成器
gen.send("Hello") # 收到:Hello
gen.send("World") # 收到:World
# 带返回值的send
def accumulator():
total = 0
while True:
value = yield total
if value is None:
break
total += value
gen = accumulator()
print(next(gen)) # 0
print(gen.send(10)) # 10
print(gen.send(20)) # 30
print(gen.send(30)) # 60
# close():关闭生成器
def counter():
n = 0
try:
while True:
yield n
n += 1
except GeneratorExit:
print("生成器被关闭")
gen = counter()
print(next(gen)) # 0
print(next(gen)) # 1
gen.close() # 生成器被关闭
# throw():向生成器抛出异常
def careful_generator():
try:
yield 1
yield 2
yield 3
except ValueError:
yield "捕获到ValueError"
gen = careful_generator()
print(next(gen)) # 1
print(gen.throw(ValueError)) # 捕获到ValueError
7. itertools模块
itertools提供了高效的迭代器工具。
import itertools
# count:无限计数器
for i in itertools.count(10, 2): # 从10开始,步长2
if i > 20:
break
print(i, end=" ") # 10 12 14 16 18 20
# cycle:无限循环
colors = itertools.cycle(['红', '绿', '蓝'])
for i, color in enumerate(colors):
if i >= 6:
break
print(color, end=" ") # 红 绿 蓝 红 绿 蓝
# repeat:重复
print(list(itertools.repeat('A', 3))) # ['A', 'A', 'A']
# chain:连接多个迭代器
print(list(itertools.chain([1, 2], [3, 4], [5]))) # [1, 2, 3, 4, 5]
# islice:切片迭代器
print(list(itertools.islice(range(100), 5, 10))) # [5, 6, 7, 8, 9]
# takewhile/dropwhile:条件过滤
print(list(itertools.takewhile(lambda x: x < 5, [1, 3, 5, 2, 1]))) # [1, 3]
print(list(itertools.dropwhile(lambda x: x < 5, [1, 3, 5, 2, 1]))) # [5, 2, 1]
# groupby:分组
data = [('A', 1), ('A', 2), ('B', 3), ('B', 4), ('A', 5)]
data.sort(key=lambda x: x[0]) # 必须先排序
for key, group in itertools.groupby(data, key=lambda x: x[0]):
print(f"{key}: {list(group)}")
# permutations:排列
print(list(itertools.permutations([1, 2, 3], 2)))
# [(1, 2), (1, 3), (2, 1), (2, 3), (3, 1), (3, 2)]
# combinations:组合
print(list(itertools.combinations([1, 2, 3], 2)))
# [(1, 2), (1, 3), (2, 3)]
# product:笛卡尔积
print(list(itertools.product([1, 2], ['a', 'b'])))
# [(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]
# accumulate:累积
print(list(itertools.accumulate([1, 2, 3, 4, 5]))) # [1, 3, 6, 10, 15]
8. 实际应用场景
8.1 读取大文件
def read_large_file(filepath, chunk_size=1024*1024):
"""分块读取大文件"""
with open(filepath, 'r', encoding='utf-8') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
def read_lines(filepath):
"""逐行读取文件"""
with open(filepath, 'r', encoding='utf-8') as f:
for line in f:
yield line.strip()
# 使用
# for line in read_lines('huge_file.txt'):
# process(line)
8.2 数据管道
def read_data(filename):
"""读取数据"""
with open(filename) as f:
for line in f:
yield line.strip()
def parse_data(lines):
"""解析数据"""
for line in lines:
fields = line.split(',')
yield {'name': fields[0], 'value': int(fields[1])}
def filter_data(records, min_value):
"""过滤数据"""
for record in records:
if record['value'] >= min_value:
yield record
def transform_data(records):
"""转换数据"""
for record in records:
record['value'] *= 2
yield record
# 构建管道
# pipeline = transform_data(
# filter_data(
# parse_data(
# read_data('data.csv')
# ),
# min_value=10
# )
# )
# for item in pipeline:
# print(item)
8.3 批量处理
def batch(iterable, size):
"""将迭代器分批"""
batch = []
for item in iterable:
batch.append(item)
if len(batch) == size:
yield batch
batch = []
if batch:
yield batch
# 使用
data = range(10)
for b in batch(data, 3):
print(b)
# [0, 1, 2]
# [3, 4, 5]
# [6, 7, 8]
# [9]
9. 常见错误与避坑
❌ 错误1:生成器只能遍历一次
gen = (x for x in range(3))
print(list(gen)) # [0, 1, 2]
print(list(gen)) # [](已耗尽!)
# 解决:重新创建生成器或转换为列表
❌ 错误2:在生成器中修改外部变量
# 问题:闭包陷阱
funcs = [lambda: i for i in range(3)]
print([f() for f in funcs]) # [2, 2, 2](不是[0, 1, 2])
# 解决
funcs = [lambda i=i: i for i in range(3)]
print([f() for f in funcs]) # [0, 1, 2]
❌ 错误3:忘记启动生成器
def coroutine():
while True:
value = yield
print(f"收到:{value}")
gen = coroutine()
# gen.send("Hello") # TypeError: can't send non-None value
# 正确:先调用next()启动
gen = coroutine()
next(gen) # 启动
gen.send("Hello") # 收到:Hello
10. 实战练习
练习:日志文件分析器
"""
练习:使用生成器实现日志分析器
"""
import re
from datetime import datetime
from collections import Counter
def read_log_lines(filepath):
"""读取日志行"""
with open(filepath, 'r', encoding='utf-8') as f:
for line in f:
yield line.strip()
def parse_log_entry(lines):
"""解析日志条目"""
pattern = r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\] \[(\w+)\] (.+)'
for line in lines:
match = re.match(pattern, line)
if match:
yield {
'timestamp': datetime.strptime(match.group(1), '%Y-%m-%d %H:%M:%S'),
'level': match.group(2),
'message': match.group(3)
}
def filter_by_level(entries, level):
"""按级别过滤"""
for entry in entries:
if entry['level'] == level:
yield entry
def filter_by_time(entries, start_time, end_time):
"""按时间范围过滤"""
for entry in entries:
if start_time <= entry['timestamp'] <= end_time:
yield entry
def count_by_level(entries):
"""统计各级别数量"""
counter = Counter()
for entry in entries:
counter[entry['level']] += 1
return counter
# 使用示例
# pipeline = parse_log_entry(read_log_lines('app.log'))
# errors = filter_by_level(pipeline, 'ERROR')
# for error in errors:
# print(error)
11. 总结
🔑 核心要点
| 知识点 | 要点 |
|---|---|
| 可迭代对象 | 实现__iter__方法 |
| 迭代器 | 实现__iter__和__next__方法 |
| 生成器函数 | 使用yield关键字 |
| 生成器表达式 | (expr for x in iterable) |
| yield from | 委托给子生成器 |
| itertools | 高效的迭代器工具库 |
| 内存效率 | 生成器按需生成,节省内存 |
✅ 学习检查清单
- 理解可迭代对象和迭代器的区别
- 能编写生成器函数
- 掌握生成器表达式
- 了解yield from的用法
- 能使用itertools常用函数
- 理解生成器的内存优势
📖 下一步学习
掌握了迭代器与生成器后,让我们进入办公自动化实战部分:
常见问题 FAQ
💬 生成器和列表推导式有什么区别?
[x for x in range(1000000)]创建100万个元素的列表,占用大量内存。(x for x in range(1000000))是生成器表达式,几乎不占内存,按需产出。处理大数据时用生成器。
💬 yield和return有什么区别?
return结束函数并返回值,yield暂停函数并产出值,下次调用next()时从暂停点继续执行。一个函数里有yield就自动变成生成器函数。
� 系列导航
- 上一篇:13 - Python面向对象编程
- 当前:14 - Python迭代器与生成器
- 下一篇:15 - OA导出数据清洗与字段规范化