Python文件操作:with open一行读文件,告别C语言的fopen/fclose
文件操作是程序与外部数据交互的基础。Python提供了简洁的文件操作API,支持文本文件、二进制文件、CSV、JSON等多种格式。本篇将详细介绍文件的读写操作、路径处理,以及常见数据格式的处理方法。
1. 文件操作基础
1.1 打开和关闭文件
# 基本语法
file = open('filename.txt', 'r') # 打开文件
content = file.read() # 读取内容
file.close() # 关闭文件(重要!)
# 问题:如果读取时发生异常,close()不会执行
# 解决:使用try-finally
file = open('filename.txt', 'r')
try:
content = file.read()
finally:
file.close()
1.2 文件模式
| 模式 | 说明 |
|---|---|
'r' | 只读(默认),文件不存在则报错 |
'w' | 只写,文件不存在则创建,存在则清空 |
'a' | 追加,文件不存在则创建 |
'x' | 创建新文件,文件已存在则报错 |
'r+' | 读写,文件不存在则报错 |
'w+' | 读写,文件不存在则创建,存在则清空 |
'a+' | 读写追加 |
'b' | 二进制模式(与上述模式组合,如'rb') |
't' | 文本模式(默认) |
# 文本模式(默认)
f = open('file.txt', 'r') # 等同于 'rt'
# 二进制模式
f = open('image.png', 'rb')
# 写入模式
f = open('output.txt', 'w')
# 追加模式
f = open('log.txt', 'a')
1.3 with语句(推荐)
with语句会自动管理文件的关闭,即使发生异常也能正确关闭。
# 推荐写法
with open('file.txt', 'r') as f:
content = f.read()
# 文件自动关闭
# 同时打开多个文件
with open('input.txt', 'r') as fin, open('output.txt', 'w') as fout:
content = fin.read()
fout.write(content.upper())
# Python 3.10+可以使用括号
with (
open('file1.txt', 'r') as f1,
open('file2.txt', 'r') as f2,
open('output.txt', 'w') as fout
):
pass
2. 读取文件
2.1 读取全部内容
# read():读取全部内容为字符串
with open('file.txt', 'r', encoding='utf-8') as f:
content = f.read()
print(content)
# 注意:大文件会占用大量内存
# 不推荐对大文件使用read()
2.2 逐行读取
# readline():读取一行
with open('file.txt', 'r', encoding='utf-8') as f:
line1 = f.readline() # 第一行
line2 = f.readline() # 第二行
# readlines():读取所有行为列表
with open('file.txt', 'r', encoding='utf-8') as f:
lines = f.readlines() # ['line1\n', 'line2\n', ...]
# 迭代文件对象(推荐,内存效率高)
with open('file.txt', 'r', encoding='utf-8') as f:
for line in f:
print(line.strip()) # strip()去除换行符
# 带行号遍历
with open('file.txt', 'r', encoding='utf-8') as f:
for i, line in enumerate(f, 1):
print(f"第{i}行:{line.strip()}")
2.3 读取指定字节
# read(n):读取n个字符(文本模式)或n个字节(二进制模式)
with open('file.txt', 'r', encoding='utf-8') as f:
chunk = f.read(100) # 读取100个字符
# 分块读取大文件
def read_in_chunks(filename, chunk_size=1024):
with open(filename, 'r', encoding='utf-8') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
for chunk in read_in_chunks('large_file.txt'):
process(chunk)
# 文件指针操作
with open('file.txt', 'r', encoding='utf-8') as f:
print(f.tell()) # 当前位置
f.read(10)
print(f.tell()) # 读取后的位置
f.seek(0) # 回到开头
print(f.tell()) # 0
3. 写入文件
3.1 写入文本
# write():写入字符串
with open('output.txt', 'w', encoding='utf-8') as f:
f.write('Hello, World!\n')
f.write('你好,世界!\n')
# 注意:write()不会自动添加换行符
3.2 追加内容
# 追加模式
with open('log.txt', 'a', encoding='utf-8') as f:
f.write('新的日志条目\n')
# 带时间戳的日志
from datetime import datetime
def log(message):
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
with open('app.log', 'a', encoding='utf-8') as f:
f.write(f'[{timestamp}] {message}\n')
log('应用启动')
log('处理完成')
3.3 写入多行
# writelines():写入字符串列表
lines = ['第一行\n', '第二行\n', '第三行\n']
with open('output.txt', 'w', encoding='utf-8') as f:
f.writelines(lines)
# 注意:writelines()不会自动添加换行符
# 需要自己在每行末尾加\n
# 使用join
lines = ['第一行', '第二行', '第三行']
with open('output.txt', 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))
# print写入文件
with open('output.txt', 'w', encoding='utf-8') as f:
print('Hello', file=f)
print('World', file=f)
4. 文件编码
# 指定编码(推荐始终指定)
with open('file.txt', 'r', encoding='utf-8') as f:
content = f.read()
# 常见编码
# utf-8:通用,推荐
# gbk/gb2312:中文Windows
# latin-1:西欧语言
# ascii:纯英文
# 处理编码错误
with open('file.txt', 'r', encoding='utf-8', errors='ignore') as f:
content = f.read() # 忽略无法解码的字符
with open('file.txt', 'r', encoding='utf-8', errors='replace') as f:
content = f.read() # 用?替换无法解码的字符
# 检测文件编码
import chardet
def detect_encoding(filename):
with open(filename, 'rb') as f:
result = chardet.detect(f.read())
return result['encoding']
# 自动检测并读取
def read_file_auto_encoding(filename):
encoding = detect_encoding(filename)
with open(filename, 'r', encoding=encoding) as f:
return f.read()
5. 二进制文件
# 读取二进制文件
with open('image.png', 'rb') as f:
data = f.read()
print(type(data)) # <class 'bytes'>
print(data[:10]) # 前10个字节
# 写入二进制文件
with open('copy.png', 'wb') as f:
f.write(data)
# 复制文件
def copy_file(src, dst, chunk_size=1024*1024):
"""复制文件(支持大文件)"""
with open(src, 'rb') as fin, open(dst, 'wb') as fout:
while True:
chunk = fin.read(chunk_size)
if not chunk:
break
fout.write(chunk)
# 读取特定格式的二进制数据
import struct
with open('data.bin', 'rb') as f:
# 读取一个32位整数(小端序)
data = f.read(4)
value = struct.unpack('<I', data)[0]
# 读取多个值
data = f.read(12)
values = struct.unpack('<3I', data) # 3个32位整数
6. 路径处理(pathlib)
pathlib是Python 3.4+引入的面向对象的路径处理模块,比os.path更现代、更易用。
6.1 创建路径对象
from pathlib import Path
# 创建路径对象
p = Path('folder/file.txt')
p = Path('C:/Users/name/Documents')
p = Path.cwd() # 当前工作目录
p = Path.home() # 用户主目录
# 路径拼接
p = Path('folder') / 'subfolder' / 'file.txt'
print(p) # folder/subfolder/file.txt
# 从字符串创建
p = Path(r'C:\Users\name\Documents')
6.2 路径操作
from pathlib import Path
p = Path('folder/subfolder/file.txt')
# 路径组成部分
print(p.name) # 'file.txt'(文件名)
print(p.stem) # 'file'(不含扩展名)
print(p.suffix) # '.txt'(扩展名)
print(p.suffixes) # ['.txt'](所有扩展名)
print(p.parent) # 'folder/subfolder'(父目录)
print(p.parents) # 所有父目录
print(p.parts) # ('folder', 'subfolder', 'file.txt')
# 修改路径
print(p.with_name('new.txt')) # folder/subfolder/new.txt
print(p.with_stem('new')) # folder/subfolder/new.txt
print(p.with_suffix('.md')) # folder/subfolder/file.md
# 绝对路径
print(p.absolute())
print(p.resolve()) # 解析符号链接
# 相对路径
p1 = Path('/home/user/docs/file.txt')
p2 = Path('/home/user')
print(p1.relative_to(p2)) # docs/file.txt
6.3 文件系统操作
from pathlib import Path
p = Path('example')
# 检查存在性
print(p.exists()) # 是否存在
print(p.is_file()) # 是否是文件
print(p.is_dir()) # 是否是目录
# 创建目录
p.mkdir() # 创建目录
p.mkdir(parents=True) # 创建多级目录
p.mkdir(parents=True, exist_ok=True) # 已存在不报错
# 删除
p.unlink() # 删除文件
p.rmdir() # 删除空目录
# 重命名/移动
p.rename('new_name')
# 遍历目录
for item in Path('.').iterdir():
print(item)
# 递归遍历
for item in Path('.').rglob('*.py'):
print(item)
# 模式匹配
for item in Path('.').glob('**/*.txt'):
print(item)
# 读写文件(便捷方法)
p = Path('file.txt')
content = p.read_text(encoding='utf-8')
p.write_text('Hello', encoding='utf-8')
data = p.read_bytes()
p.write_bytes(b'Hello')
# 文件信息
stat = p.stat()
print(stat.st_size) # 文件大小
print(stat.st_mtime) # 修改时间
7. CSV文件处理
import csv
# 读取CSV
with open('data.csv', 'r', encoding='utf-8', newline='') as f:
reader = csv.reader(f)
for row in reader:
print(row) # 每行是一个列表
# 读取为字典
with open('data.csv', 'r', encoding='utf-8', newline='') as f:
reader = csv.DictReader(f)
for row in reader:
print(row) # 每行是一个字典
print(row['name'], row['age'])
# 写入CSV
data = [
['name', 'age', 'city'],
['张三', 25, '北京'],
['李四', 30, '上海'],
]
with open('output.csv', 'w', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
writer.writerows(data)
# 写入字典
data = [
{'name': '张三', 'age': 25, 'city': '北京'},
{'name': '李四', 'age': 30, 'city': '上海'},
]
with open('output.csv', 'w', encoding='utf-8', newline='') as f:
fieldnames = ['name', 'age', 'city']
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
# 处理特殊字符
with open('data.csv', 'w', encoding='utf-8', newline='') as f:
writer = csv.writer(f, quoting=csv.QUOTE_ALL) # 所有字段加引号
writer.writerow(['name', 'description'])
writer.writerow(['test', 'contains, comma'])
8. JSON文件处理
import json
# Python对象转JSON字符串
data = {
'name': '张三',
'age': 25,
'hobbies': ['reading', 'coding'],
'active': True,
'score': None
}
json_str = json.dumps(data, ensure_ascii=False, indent=2)
print(json_str)
# JSON字符串转Python对象
data = json.loads(json_str)
print(data['name'])
# 写入JSON文件
with open('data.json', 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# 读取JSON文件
with open('data.json', 'r', encoding='utf-8') as f:
data = json.load(f)
# 处理日期等特殊类型
from datetime import datetime
class DateTimeEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
return super().default(obj)
data = {'time': datetime.now()}
json_str = json.dumps(data, cls=DateTimeEncoder)
# JSON与Python类型对应
# JSON Python
# object dict
# array list
# string str
# number(int) int
# number(real) float
# true True
# false False
# null None
9. 常见错误与避坑
❌ 错误1:忘记关闭文件
# 错误
f = open('file.txt')
content = f.read()
# 忘记 f.close()
# 正确:使用with
with open('file.txt') as f:
content = f.read()
❌ 错误2:不指定编码
# 错误:可能乱码
with open('file.txt') as f:
content = f.read()
# 正确:指定编码
with open('file.txt', encoding='utf-8') as f:
content = f.read()
❌ 错误3:写模式打开已有文件
# 危险:会清空文件内容!
with open('important.txt', 'w') as f:
f.write('new content')
# 追加内容应使用'a'模式
with open('important.txt', 'a') as f:
f.write('new content')
❌ 错误4:路径分隔符问题
# 错误:Windows和Linux路径分隔符不同
path = 'folder\\file.txt' # 只在Windows有效
# 正确:使用pathlib或os.path.join
from pathlib import Path
path = Path('folder') / 'file.txt'
import os
path = os.path.join('folder', 'file.txt')
❌ 错误5:CSV的newline参数
# 错误:可能产生空行
with open('data.csv', 'w') as f:
writer = csv.writer(f)
writer.writerows(data)
# 正确:指定newline=''
with open('data.csv', 'w', newline='') as f:
writer = csv.writer(f)
writer.writerows(data)
10. 实战练习
练习1:日志文件分析
"""
练习:分析日志文件,统计各级别日志数量
"""
from pathlib import Path
from collections import Counter
import re
def analyze_log(log_file):
"""分析日志文件"""
level_pattern = re.compile(r'\[(INFO|WARNING|ERROR|DEBUG)\]')
levels = []
with open(log_file, 'r', encoding='utf-8') as f:
for line in f:
match = level_pattern.search(line)
if match:
levels.append(match.group(1))
return Counter(levels)
# 测试
# result = analyze_log('app.log')
# print(result)
练习2:批量处理文件
"""
练习:批量将目录下的txt文件转换编码
"""
from pathlib import Path
def convert_encoding(src_dir, src_encoding='gbk', dst_encoding='utf-8'):
"""批量转换文件编码"""
src_path = Path(src_dir)
for txt_file in src_path.glob('*.txt'):
# 读取原文件
content = txt_file.read_text(encoding=src_encoding)
# 备份原文件
backup = txt_file.with_suffix('.txt.bak')
txt_file.rename(backup)
# 写入新编码
txt_file.write_text(content, encoding=dst_encoding)
print(f"已转换:{txt_file}")
# convert_encoding('data_folder')
练习3:合并CSV文件
"""
练习:合并多个CSV文件
"""
import csv
from pathlib import Path
def merge_csv_files(input_dir, output_file):
"""合并目录下所有CSV文件"""
input_path = Path(input_dir)
csv_files = list(input_path.glob('*.csv'))
if not csv_files:
print("没有找到CSV文件")
return
all_data = []
header = None
for csv_file in csv_files:
with open(csv_file, 'r', encoding='utf-8', newline='') as f:
reader = csv.reader(f)
file_header = next(reader)
if header is None:
header = file_header
all_data.append(header)
for row in reader:
all_data.append(row)
with open(output_file, 'w', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
writer.writerows(all_data)
print(f"已合并{len(csv_files)}个文件到{output_file}")
# merge_csv_files('csv_folder', 'merged.csv')
11. 总结
🔑 核心要点
| 知识点 | 要点 |
|---|---|
| with语句 | 自动管理文件关闭,推荐使用 |
| 文件模式 | r/w/a/x,b表示二进制 |
| 编码 | 始终指定encoding=‘utf-8’ |
| pathlib | 现代的路径处理方式 |
| CSV | 使用csv模块,注意newline=” |
| JSON | json.dump/load处理文件 |
✅ 学习检查清单
- 掌握with语句读写文件
- 理解不同文件模式的区别
- 能正确处理文件编码
- 熟练使用pathlib处理路径
- 能读写CSV和JSON文件
📖 下一步学习
掌握了文件操作后,让我们学习Python的异常处理:
常见问题 FAQ
💬 读大文件会不会把内存撑爆?
用read()一次读全部内容确实会。大文件应该用for line in f:逐行读取,或者用生成器按块读取。readline()和迭代器是处理大文件的标准方式。
💬 路径用字符串还是pathlib?
新代码建议用pathlib.Path,跨平台且API更直观。Path("data") / "file.csv"比os.path.join("data", "file.csv")可读性好很多。
� 系列导航
- 上一篇:09 - Python字符串处理
- 当前:10 - Python文件操作
- 下一篇:11 - Python异常处理