OA导出数据清洗:用Python把乱七八糟的Excel数据变成规范化表格
OA系统导出的数据往往存在格式不统一、列名不规范、数据缺失等问题。本篇将介绍如何使用Python对这些数据进行清洗和规范化处理,为后续的数据分析和报表生成打下基础。
1. 常见的OA数据问题
| 问题类型 | 具体表现 | 解决方案 |
|---|---|---|
| 列名不规范 | 中文列名、空格、特殊字符 | 重命名为规范的英文名 |
| 数据类型错误 | 数字存为文本、日期格式混乱 | 类型转换 |
| 缺失值 | 空单元格、“无”、“N/A” | 统一处理缺失值 |
| 重复数据 | 完全重复或部分重复 | 去重 |
| 格式不一致 | 同一字段多种格式 | 标准化处理 |
| 多余空白 | 前后空格、多余换行 | 字符串清洗 |
| 编码问题 | 乱码、特殊字符 | 编码转换 |
2. pandas基础
2.1 安装和导入
pip install pandas openpyxl xlrd
import pandas as pd
import numpy as np
# 设置显示选项
pd.set_option('display.max_columns', None) # 显示所有列
pd.set_option('display.max_rows', 100) # 最多显示100行
pd.set_option('display.width', None) # 自动调整宽度
pd.set_option('display.max_colwidth', 50) # 列宽限制
2.2 读取数据
# 读取Excel文件
df = pd.read_excel('data.xlsx')
df = pd.read_excel('data.xlsx', sheet_name='Sheet1')
df = pd.read_excel('data.xlsx', sheet_name=0) # 第一个sheet
# 读取多个sheet
all_sheets = pd.read_excel('data.xlsx', sheet_name=None) # 返回字典
for sheet_name, sheet_df in all_sheets.items():
print(f"Sheet: {sheet_name}, 行数: {len(sheet_df)}")
# 读取CSV文件
df = pd.read_csv('data.csv', encoding='utf-8')
df = pd.read_csv('data.csv', encoding='gbk') # 中文Windows常用
# 常用参数
df = pd.read_excel(
'data.xlsx',
sheet_name='数据',
header=0, # 第一行作为列名
skiprows=2, # 跳过前2行
usecols='A:E', # 只读取A到E列
dtype={'编号': str}, # 指定列类型
na_values=['无', 'N/A', '-'], # 视为缺失值的值
)
2.3 基本操作
# 查看数据基本信息
print(df.shape) # (行数, 列数)
print(df.columns) # 列名
print(df.dtypes) # 数据类型
print(df.info()) # 详细信息
print(df.describe()) # 统计摘要
# 查看数据
print(df.head()) # 前5行
print(df.tail(10)) # 后10行
print(df.sample(5)) # 随机5行
# 选择数据
df['列名'] # 选择单列
df[['列1', '列2']] # 选择多列
df.loc[0] # 按标签选择行
df.iloc[0] # 按位置选择行
df.loc[0:5, '列名'] # 选择行和列
df.iloc[0:5, 0:3] # 按位置选择
# 条件筛选
df[df['年龄'] > 30]
df[(df['年龄'] > 30) & (df['部门'] == '技术部')]
df.query('年龄 > 30 and 部门 == "技术部"')
3. 列名规范化
# 查看当前列名
print(df.columns.tolist())
# 重命名列
df = df.rename(columns={
'员工姓名': 'name',
'员工编号': 'employee_id',
'所属部门': 'department',
'入职日期': 'hire_date',
'联系电话': 'phone'
})
# 批量处理列名
def normalize_column_name(name):
"""规范化列名"""
# 去除空白
name = str(name).strip()
# 替换空格为下划线
name = name.replace(' ', '_')
# 转小写
name = name.lower()
# 移除特殊字符
import re
name = re.sub(r'[^\w]', '_', name)
# 移除连续下划线
name = re.sub(r'_+', '_', name)
# 移除首尾下划线
name = name.strip('_')
return name
df.columns = [normalize_column_name(col) for col in df.columns]
# 列名映射表(推荐方式)
COLUMN_MAPPING = {
'员工姓名': 'name',
'姓名': 'name',
'员工编号': 'employee_id',
'工号': 'employee_id',
'所属部门': 'department',
'部门': 'department',
}
def standardize_columns(df, mapping):
"""根据映射表标准化列名"""
new_columns = {}
for col in df.columns:
col_clean = str(col).strip()
if col_clean in mapping:
new_columns[col] = mapping[col_clean]
return df.rename(columns=new_columns)
df = standardize_columns(df, COLUMN_MAPPING)
4. 数据类型转换
# 查看数据类型
print(df.dtypes)
# 转换为数值类型
df['金额'] = pd.to_numeric(df['金额'], errors='coerce') # 无法转换的变为NaN
# 转换为字符串
df['编号'] = df['编号'].astype(str)
# 转换为日期
df['日期'] = pd.to_datetime(df['日期'], format='%Y-%m-%d', errors='coerce')
# 批量转换
df = df.astype({
'employee_id': str,
'age': int,
'salary': float
})
# 处理数字中的特殊字符
def clean_numeric(value):
"""清洗数值字段"""
if pd.isna(value):
return np.nan
# 转为字符串
s = str(value)
# 移除千位分隔符
s = s.replace(',', '')
# 移除货币符号
s = s.replace('¥', '').replace('$', '')
# 移除空格
s = s.strip()
try:
return float(s)
except ValueError:
return np.nan
df['金额'] = df['金额'].apply(clean_numeric)
# 处理百分比
def parse_percentage(value):
"""解析百分比"""
if pd.isna(value):
return np.nan
s = str(value).strip()
if s.endswith('%'):
return float(s[:-1]) / 100
return float(s)
df['完成率'] = df['完成率'].apply(parse_percentage)
5. 缺失值处理
# 检查缺失值
print(df.isnull().sum()) # 每列缺失值数量
print(df.isnull().sum().sum()) # 总缺失值数量
print(df.isnull().any()) # 哪些列有缺失值
# 查看缺失值比例
missing_ratio = df.isnull().sum() / len(df) * 100
print(missing_ratio[missing_ratio > 0])
# 将特定值视为缺失值
df = df.replace(['无', 'N/A', '-', '', ' '], np.nan)
# 删除缺失值
df_clean = df.dropna() # 删除任何有缺失值的行
df_clean = df.dropna(subset=['name']) # 只考虑特定列
df_clean = df.dropna(thresh=3) # 至少有3个非空值才保留
# 填充缺失值
df['age'] = df['age'].fillna(0) # 填充固定值
df['age'] = df['age'].fillna(df['age'].mean()) # 填充均值
df['age'] = df['age'].fillna(df['age'].median()) # 填充中位数
df['department'] = df['department'].fillna('未知') # 填充字符串
# 前向/后向填充
df['value'] = df['value'].fillna(method='ffill') # 用前一个值填充
df['value'] = df['value'].fillna(method='bfill') # 用后一个值填充
# 按组填充
df['salary'] = df.groupby('department')['salary'].transform(
lambda x: x.fillna(x.mean())
)
# 插值填充
df['value'] = df['value'].interpolate(method='linear')
6. 重复数据处理
# 检查重复
print(df.duplicated().sum()) # 完全重复的行数
print(df.duplicated(subset=['id']).sum()) # 基于特定列的重复
# 查看重复数据
duplicates = df[df.duplicated(keep=False)] # 显示所有重复行
print(duplicates)
# 删除重复
df_unique = df.drop_duplicates() # 删除完全重复
df_unique = df.drop_duplicates(subset=['id']) # 基于特定列
df_unique = df.drop_duplicates(subset=['id'], keep='first') # 保留第一个
df_unique = df.drop_duplicates(subset=['id'], keep='last') # 保留最后一个
# 标记重复
df['is_duplicate'] = df.duplicated(subset=['id'], keep=False)
7. 字符串清洗
# 去除空白
df['name'] = df['name'].str.strip() # 两端空白
df['name'] = df['name'].str.lstrip() # 左侧空白
df['name'] = df['name'].str.rstrip() # 右侧空白
# 大小写转换
df['code'] = df['code'].str.upper() # 大写
df['code'] = df['code'].str.lower() # 小写
df['name'] = df['name'].str.title() # 首字母大写
# 替换
df['phone'] = df['phone'].str.replace('-', '')
df['phone'] = df['phone'].str.replace(r'\D', '', regex=True) # 移除非数字
# 提取
df['area_code'] = df['phone'].str[:3] # 前3个字符
df['year'] = df['date_str'].str.extract(r'(\d{4})') # 正则提取
# 分割
df[['first_name', 'last_name']] = df['full_name'].str.split(' ', n=1, expand=True)
# 包含判断
df['is_manager'] = df['title'].str.contains('经理|主管', regex=True, na=False)
# 长度
df['name_length'] = df['name'].str.len()
# 填充
df['id'] = df['id'].str.zfill(6) # 左侧补零到6位
df['id'] = df['id'].str.pad(6, fillchar='0')
# 批量清洗函数
def clean_string_column(series):
"""清洗字符串列"""
return (series
.astype(str)
.str.strip()
.str.replace(r'\s+', ' ', regex=True) # 多个空白变一个
.replace('nan', np.nan))
df['name'] = clean_string_column(df['name'])
8. 日期时间处理
# 转换为日期时间
df['date'] = pd.to_datetime(df['date_str'])
# 处理多种日期格式
def parse_date(date_str):
"""解析多种日期格式"""
if pd.isna(date_str):
return pd.NaT
formats = [
'%Y-%m-%d',
'%Y/%m/%d',
'%Y年%m月%d日',
'%d/%m/%Y',
'%Y-%m-%d %H:%M:%S',
]
for fmt in formats:
try:
return pd.to_datetime(date_str, format=fmt)
except:
continue
# 尝试自动解析
try:
return pd.to_datetime(date_str)
except:
return pd.NaT
df['date'] = df['date_str'].apply(parse_date)
# 提取日期组件
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
df['day'] = df['date'].dt.day
df['weekday'] = df['date'].dt.dayofweek # 0=周一
df['week'] = df['date'].dt.isocalendar().week
df['quarter'] = df['date'].dt.quarter
# 日期格式化
df['date_str'] = df['date'].dt.strftime('%Y-%m-%d')
# 日期计算
df['days_since'] = (pd.Timestamp.now() - df['date']).dt.days
df['next_month'] = df['date'] + pd.DateOffset(months=1)
# 日期筛选
df_2024 = df[df['date'].dt.year == 2024]
df_recent = df[df['date'] >= '2024-01-01']
9. 数据合并
# 纵向合并(追加行)
df_all = pd.concat([df1, df2, df3], ignore_index=True)
# 横向合并(追加列)
df_combined = pd.concat([df1, df2], axis=1)
# 基于键合并(类似SQL JOIN)
df_merged = pd.merge(df1, df2, on='id') # 内连接
df_merged = pd.merge(df1, df2, on='id', how='left') # 左连接
df_merged = pd.merge(df1, df2, on='id', how='right') # 右连接
df_merged = pd.merge(df1, df2, on='id', how='outer') # 外连接
# 多键合并
df_merged = pd.merge(df1, df2, on=['id', 'date'])
# 不同列名合并
df_merged = pd.merge(df1, df2, left_on='emp_id', right_on='employee_id')
# 合并多个文件
import glob
files = glob.glob('data/*.xlsx')
dfs = [pd.read_excel(f) for f in files]
df_all = pd.concat(dfs, ignore_index=True)
10. 完整的清洗流程
"""
OA数据清洗完整示例
"""
import pandas as pd
import numpy as np
from pathlib import Path
class OADataCleaner:
"""OA数据清洗器"""
# 列名映射
COLUMN_MAPPING = {
'员工姓名': 'name',
'姓名': 'name',
'员工编号': 'employee_id',
'工号': 'employee_id',
'所属部门': 'department',
'部门': 'department',
'入职日期': 'hire_date',
'联系电话': 'phone',
'电话': 'phone',
'邮箱': 'email',
'职位': 'position',
'薪资': 'salary',
'工资': 'salary',
}
def __init__(self, filepath):
self.filepath = Path(filepath)
self.df = None
self.load_data()
def load_data(self):
"""加载数据"""
suffix = self.filepath.suffix.lower()
if suffix in ['.xlsx', '.xls']:
self.df = pd.read_excel(self.filepath)
elif suffix == '.csv':
# 尝试不同编码
for encoding in ['utf-8', 'gbk', 'gb2312']:
try:
self.df = pd.read_csv(self.filepath, encoding=encoding)
break
except UnicodeDecodeError:
continue
print(f"加载数据:{len(self.df)} 行, {len(self.df.columns)} 列")
def standardize_columns(self):
"""标准化列名"""
new_columns = {}
for col in self.df.columns:
col_clean = str(col).strip()
if col_clean in self.COLUMN_MAPPING:
new_columns[col] = self.COLUMN_MAPPING[col_clean]
self.df = self.df.rename(columns=new_columns)
print(f"列名标准化完成:{list(self.df.columns)}")
return self
def clean_strings(self):
"""清洗字符串列"""
string_cols = self.df.select_dtypes(include=['object']).columns
for col in string_cols:
self.df[col] = (self.df[col]
.astype(str)
.str.strip()
.replace(['nan', 'None', '无', 'N/A', '-'], np.nan))
print(f"字符串清洗完成:{len(string_cols)} 列")
return self
def handle_missing(self, strategy='drop'):
"""处理缺失值"""
missing_before = self.df.isnull().sum().sum()
if strategy == 'drop':
self.df = self.df.dropna(subset=['name', 'employee_id'])
elif strategy == 'fill':
# 数值列填充0
numeric_cols = self.df.select_dtypes(include=[np.number]).columns
self.df[numeric_cols] = self.df[numeric_cols].fillna(0)
# 字符串列填充'未知'
string_cols = self.df.select_dtypes(include=['object']).columns
self.df[string_cols] = self.df[string_cols].fillna('未知')
missing_after = self.df.isnull().sum().sum()
print(f"缺失值处理:{missing_before} -> {missing_after}")
return self
def remove_duplicates(self):
"""删除重复数据"""
rows_before = len(self.df)
self.df = self.df.drop_duplicates(subset=['employee_id'], keep='first')
rows_after = len(self.df)
print(f"去重:{rows_before} -> {rows_after} 行")
return self
def convert_types(self):
"""转换数据类型"""
# 日期列
if 'hire_date' in self.df.columns:
self.df['hire_date'] = pd.to_datetime(
self.df['hire_date'], errors='coerce'
)
# 数值列
if 'salary' in self.df.columns:
self.df['salary'] = pd.to_numeric(
self.df['salary'].astype(str).str.replace(',', ''),
errors='coerce'
)
# 字符串列
if 'employee_id' in self.df.columns:
self.df['employee_id'] = self.df['employee_id'].astype(str)
print("数据类型转换完成")
return self
def clean(self):
"""执行完整清洗流程"""
return (self
.standardize_columns()
.clean_strings()
.handle_missing()
.remove_duplicates()
.convert_types())
def save(self, output_path):
"""保存清洗后的数据"""
output_path = Path(output_path)
suffix = output_path.suffix.lower()
if suffix in ['.xlsx', '.xls']:
self.df.to_excel(output_path, index=False)
elif suffix == '.csv':
self.df.to_csv(output_path, index=False, encoding='utf-8-sig')
print(f"数据已保存到:{output_path}")
def report(self):
"""生成清洗报告"""
print("\n" + "=" * 50)
print("数据清洗报告")
print("=" * 50)
print(f"总行数:{len(self.df)}")
print(f"总列数:{len(self.df.columns)}")
print(f"\n列信息:")
print(self.df.dtypes)
print(f"\n缺失值:")
print(self.df.isnull().sum())
print("=" * 50)
# 使用示例
if __name__ == "__main__":
cleaner = OADataCleaner('员工数据.xlsx')
cleaner.clean()
cleaner.report()
cleaner.save('员工数据_清洗后.xlsx')
11. 实战练习
练习:清洗考勤数据
"""
练习:清洗OA导出的考勤数据
"""
import pandas as pd
import numpy as np
def clean_attendance_data(filepath):
"""清洗考勤数据"""
# 读取数据
df = pd.read_excel(filepath)
# 1. 标准化列名
column_mapping = {
'员工姓名': 'name',
'员工工号': 'employee_id',
'考勤日期': 'date',
'上班打卡': 'check_in',
'下班打卡': 'check_out',
'工作时长': 'work_hours',
'考勤状态': 'status',
}
df = df.rename(columns=column_mapping)
# 2. 清洗字符串
df['name'] = df['name'].str.strip()
df['employee_id'] = df['employee_id'].astype(str).str.strip()
# 3. 处理日期时间
df['date'] = pd.to_datetime(df['date'], errors='coerce')
df['check_in'] = pd.to_datetime(df['check_in'], errors='coerce')
df['check_out'] = pd.to_datetime(df['check_out'], errors='coerce')
# 4. 计算工作时长
df['work_hours_calc'] = (
df['check_out'] - df['check_in']
).dt.total_seconds() / 3600
# 5. 判断考勤状态
def determine_status(row):
if pd.isna(row['check_in']) or pd.isna(row['check_out']):
return '缺卡'
check_in_time = row['check_in'].time()
check_out_time = row['check_out'].time()
from datetime import time
if check_in_time > time(9, 0):
return '迟到'
if check_out_time < time(18, 0):
return '早退'
return '正常'
df['status_calc'] = df.apply(determine_status, axis=1)
# 6. 删除无效数据
df = df.dropna(subset=['name', 'employee_id', 'date'])
return df
# 使用
# df_clean = clean_attendance_data('考勤数据.xlsx')
# df_clean.to_excel('考勤数据_清洗后.xlsx', index=False)
12. 总结
🔑 核心要点
| 知识点 | 要点 |
|---|---|
| 读取数据 | pd.read_excel(), pd.read_csv() |
| 列名规范化 | rename(), 映射表 |
| 类型转换 | astype(), to_numeric(), to_datetime() |
| 缺失值 | dropna(), fillna() |
| 去重 | drop_duplicates() |
| 字符串清洗 | .str访问器系列方法 |
| 日期处理 | .dt访问器系列方法 |
✅ 学习检查清单
- 能读取Excel和CSV文件
- 能规范化列名
- 能处理缺失值和重复数据
- 能进行字符串清洗
- 能处理日期时间数据
- 能编写完整的数据清洗流程
📖 下一步学习
掌握了数据清洗后,让我们学习Excel自动化操作:
常见问题 FAQ
💬 pandas和openpyxl怎么选?
pandas适合数据分析和批量处理,openpyxl适合精确控制Excel格式。数据清洗用pandas,生成带格式的报表用openpyxl,两者经常配合使用。
💬 处理大Excel文件很慢怎么办?
pandas默认用openpyxl引擎,大文件可以用engine='xlrd'(.xls)或分块读取chunksize参数。超大文件考虑先转CSV再处理。
� 系列导航
- 上一篇:14 - Python迭代器与生成器
- 当前:15 - OA导出数据清洗与字段规范化
- 下一篇:16 - Python操作Excel