Veloris.
返回索引
设计实战 2026-02-14

OA导出数据清洗:用Python把乱七八糟的Excel数据变成规范化表格

3 分钟
685 words

OA导出数据清洗:用Python把乱七八糟的Excel数据变成规范化表格

OA系统导出的数据往往存在格式不统一、列名不规范、数据缺失等问题。本篇将介绍如何使用Python对这些数据进行清洗和规范化处理,为后续的数据分析和报表生成打下基础。


1. 常见的OA数据问题

问题类型具体表现解决方案
列名不规范中文列名、空格、特殊字符重命名为规范的英文名
数据类型错误数字存为文本、日期格式混乱类型转换
缺失值空单元格、“无”、“N/A”统一处理缺失值
重复数据完全重复或部分重复去重
格式不一致同一字段多种格式标准化处理
多余空白前后空格、多余换行字符串清洗
编码问题乱码、特殊字符编码转换

2. pandas基础

2.1 安装和导入

pip install pandas openpyxl xlrd
import pandas as pd
import numpy as np

# 设置显示选项
pd.set_option('display.max_columns', None)  # 显示所有列
pd.set_option('display.max_rows', 100)      # 最多显示100行
pd.set_option('display.width', None)        # 自动调整宽度
pd.set_option('display.max_colwidth', 50)   # 列宽限制

2.2 读取数据

# 读取Excel文件
df = pd.read_excel('data.xlsx')
df = pd.read_excel('data.xlsx', sheet_name='Sheet1')
df = pd.read_excel('data.xlsx', sheet_name=0)  # 第一个sheet

# 读取多个sheet
all_sheets = pd.read_excel('data.xlsx', sheet_name=None)  # 返回字典
for sheet_name, sheet_df in all_sheets.items():
    print(f"Sheet: {sheet_name}, 行数: {len(sheet_df)}")

# 读取CSV文件
df = pd.read_csv('data.csv', encoding='utf-8')
df = pd.read_csv('data.csv', encoding='gbk')  # 中文Windows常用

# 常用参数
df = pd.read_excel(
    'data.xlsx',
    sheet_name='数据',
    header=0,           # 第一行作为列名
    skiprows=2,         # 跳过前2行
    usecols='A:E',      # 只读取A到E列
    dtype={'编号': str}, # 指定列类型
    na_values=['无', 'N/A', '-'],  # 视为缺失值的值
)

2.3 基本操作

# 查看数据基本信息
print(df.shape)        # (行数, 列数)
print(df.columns)      # 列名
print(df.dtypes)       # 数据类型
print(df.info())       # 详细信息
print(df.describe())   # 统计摘要

# 查看数据
print(df.head())       # 前5行
print(df.tail(10))     # 后10行
print(df.sample(5))    # 随机5行

# 选择数据
df['列名']             # 选择单列
df[['列1', '列2']]     # 选择多列
df.loc[0]              # 按标签选择行
df.iloc[0]             # 按位置选择行
df.loc[0:5, '列名']    # 选择行和列
df.iloc[0:5, 0:3]      # 按位置选择

# 条件筛选
df[df['年龄'] > 30]
df[(df['年龄'] > 30) & (df['部门'] == '技术部')]
df.query('年龄 > 30 and 部门 == "技术部"')

3. 列名规范化

# 查看当前列名
print(df.columns.tolist())

# 重命名列
df = df.rename(columns={
    '员工姓名': 'name',
    '员工编号': 'employee_id',
    '所属部门': 'department',
    '入职日期': 'hire_date',
    '联系电话': 'phone'
})

# 批量处理列名
def normalize_column_name(name):
    """规范化列名"""
    # 去除空白
    name = str(name).strip()
    # 替换空格为下划线
    name = name.replace(' ', '_')
    # 转小写
    name = name.lower()
    # 移除特殊字符
    import re
    name = re.sub(r'[^\w]', '_', name)
    # 移除连续下划线
    name = re.sub(r'_+', '_', name)
    # 移除首尾下划线
    name = name.strip('_')
    return name

df.columns = [normalize_column_name(col) for col in df.columns]

# 列名映射表(推荐方式)
COLUMN_MAPPING = {
    '员工姓名': 'name',
    '姓名': 'name',
    '员工编号': 'employee_id',
    '工号': 'employee_id',
    '所属部门': 'department',
    '部门': 'department',
}

def standardize_columns(df, mapping):
    """根据映射表标准化列名"""
    new_columns = {}
    for col in df.columns:
        col_clean = str(col).strip()
        if col_clean in mapping:
            new_columns[col] = mapping[col_clean]
    return df.rename(columns=new_columns)

df = standardize_columns(df, COLUMN_MAPPING)

4. 数据类型转换

# 查看数据类型
print(df.dtypes)

# 转换为数值类型
df['金额'] = pd.to_numeric(df['金额'], errors='coerce')  # 无法转换的变为NaN

# 转换为字符串
df['编号'] = df['编号'].astype(str)

# 转换为日期
df['日期'] = pd.to_datetime(df['日期'], format='%Y-%m-%d', errors='coerce')

# 批量转换
df = df.astype({
    'employee_id': str,
    'age': int,
    'salary': float
})

# 处理数字中的特殊字符
def clean_numeric(value):
    """清洗数值字段"""
    if pd.isna(value):
        return np.nan
    # 转为字符串
    s = str(value)
    # 移除千位分隔符
    s = s.replace(',', '')
    # 移除货币符号
    s = s.replace('¥', '').replace('$', '')
    # 移除空格
    s = s.strip()
    try:
        return float(s)
    except ValueError:
        return np.nan

df['金额'] = df['金额'].apply(clean_numeric)

# 处理百分比
def parse_percentage(value):
    """解析百分比"""
    if pd.isna(value):
        return np.nan
    s = str(value).strip()
    if s.endswith('%'):
        return float(s[:-1]) / 100
    return float(s)

df['完成率'] = df['完成率'].apply(parse_percentage)

5. 缺失值处理

# 检查缺失值
print(df.isnull().sum())           # 每列缺失值数量
print(df.isnull().sum().sum())     # 总缺失值数量
print(df.isnull().any())           # 哪些列有缺失值

# 查看缺失值比例
missing_ratio = df.isnull().sum() / len(df) * 100
print(missing_ratio[missing_ratio > 0])

# 将特定值视为缺失值
df = df.replace(['无', 'N/A', '-', '', ' '], np.nan)

# 删除缺失值
df_clean = df.dropna()                    # 删除任何有缺失值的行
df_clean = df.dropna(subset=['name'])     # 只考虑特定列
df_clean = df.dropna(thresh=3)            # 至少有3个非空值才保留

# 填充缺失值
df['age'] = df['age'].fillna(0)                    # 填充固定值
df['age'] = df['age'].fillna(df['age'].mean())     # 填充均值
df['age'] = df['age'].fillna(df['age'].median())   # 填充中位数
df['department'] = df['department'].fillna('未知')  # 填充字符串

# 前向/后向填充
df['value'] = df['value'].fillna(method='ffill')   # 用前一个值填充
df['value'] = df['value'].fillna(method='bfill')   # 用后一个值填充

# 按组填充
df['salary'] = df.groupby('department')['salary'].transform(
    lambda x: x.fillna(x.mean())
)

# 插值填充
df['value'] = df['value'].interpolate(method='linear')

6. 重复数据处理

# 检查重复
print(df.duplicated().sum())              # 完全重复的行数
print(df.duplicated(subset=['id']).sum()) # 基于特定列的重复

# 查看重复数据
duplicates = df[df.duplicated(keep=False)]  # 显示所有重复行
print(duplicates)

# 删除重复
df_unique = df.drop_duplicates()                      # 删除完全重复
df_unique = df.drop_duplicates(subset=['id'])         # 基于特定列
df_unique = df.drop_duplicates(subset=['id'], keep='first')  # 保留第一个
df_unique = df.drop_duplicates(subset=['id'], keep='last')   # 保留最后一个

# 标记重复
df['is_duplicate'] = df.duplicated(subset=['id'], keep=False)

7. 字符串清洗

# 去除空白
df['name'] = df['name'].str.strip()        # 两端空白
df['name'] = df['name'].str.lstrip()       # 左侧空白
df['name'] = df['name'].str.rstrip()       # 右侧空白

# 大小写转换
df['code'] = df['code'].str.upper()        # 大写
df['code'] = df['code'].str.lower()        # 小写
df['name'] = df['name'].str.title()        # 首字母大写

# 替换
df['phone'] = df['phone'].str.replace('-', '')
df['phone'] = df['phone'].str.replace(r'\D', '', regex=True)  # 移除非数字

# 提取
df['area_code'] = df['phone'].str[:3]      # 前3个字符
df['year'] = df['date_str'].str.extract(r'(\d{4})')  # 正则提取

# 分割
df[['first_name', 'last_name']] = df['full_name'].str.split(' ', n=1, expand=True)

# 包含判断
df['is_manager'] = df['title'].str.contains('经理|主管', regex=True, na=False)

# 长度
df['name_length'] = df['name'].str.len()

# 填充
df['id'] = df['id'].str.zfill(6)           # 左侧补零到6位
df['id'] = df['id'].str.pad(6, fillchar='0')

# 批量清洗函数
def clean_string_column(series):
    """清洗字符串列"""
    return (series
            .astype(str)
            .str.strip()
            .str.replace(r'\s+', ' ', regex=True)  # 多个空白变一个
            .replace('nan', np.nan))

df['name'] = clean_string_column(df['name'])

8. 日期时间处理

# 转换为日期时间
df['date'] = pd.to_datetime(df['date_str'])

# 处理多种日期格式
def parse_date(date_str):
    """解析多种日期格式"""
    if pd.isna(date_str):
        return pd.NaT
    
    formats = [
        '%Y-%m-%d',
        '%Y/%m/%d',
        '%Y年%m月%d日',
        '%d/%m/%Y',
        '%Y-%m-%d %H:%M:%S',
    ]
    
    for fmt in formats:
        try:
            return pd.to_datetime(date_str, format=fmt)
        except:
            continue
    
    # 尝试自动解析
    try:
        return pd.to_datetime(date_str)
    except:
        return pd.NaT

df['date'] = df['date_str'].apply(parse_date)

# 提取日期组件
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
df['day'] = df['date'].dt.day
df['weekday'] = df['date'].dt.dayofweek  # 0=周一
df['week'] = df['date'].dt.isocalendar().week
df['quarter'] = df['date'].dt.quarter

# 日期格式化
df['date_str'] = df['date'].dt.strftime('%Y-%m-%d')

# 日期计算
df['days_since'] = (pd.Timestamp.now() - df['date']).dt.days
df['next_month'] = df['date'] + pd.DateOffset(months=1)

# 日期筛选
df_2024 = df[df['date'].dt.year == 2024]
df_recent = df[df['date'] >= '2024-01-01']

9. 数据合并

# 纵向合并(追加行)
df_all = pd.concat([df1, df2, df3], ignore_index=True)

# 横向合并(追加列)
df_combined = pd.concat([df1, df2], axis=1)

# 基于键合并(类似SQL JOIN)
df_merged = pd.merge(df1, df2, on='id')                    # 内连接
df_merged = pd.merge(df1, df2, on='id', how='left')        # 左连接
df_merged = pd.merge(df1, df2, on='id', how='right')       # 右连接
df_merged = pd.merge(df1, df2, on='id', how='outer')       # 外连接

# 多键合并
df_merged = pd.merge(df1, df2, on=['id', 'date'])

# 不同列名合并
df_merged = pd.merge(df1, df2, left_on='emp_id', right_on='employee_id')

# 合并多个文件
import glob

files = glob.glob('data/*.xlsx')
dfs = [pd.read_excel(f) for f in files]
df_all = pd.concat(dfs, ignore_index=True)

10. 完整的清洗流程

"""
OA数据清洗完整示例
"""
import pandas as pd
import numpy as np
from pathlib import Path

class OADataCleaner:
    """OA数据清洗器"""
    
    # 列名映射
    COLUMN_MAPPING = {
        '员工姓名': 'name',
        '姓名': 'name',
        '员工编号': 'employee_id',
        '工号': 'employee_id',
        '所属部门': 'department',
        '部门': 'department',
        '入职日期': 'hire_date',
        '联系电话': 'phone',
        '电话': 'phone',
        '邮箱': 'email',
        '职位': 'position',
        '薪资': 'salary',
        '工资': 'salary',
    }
    
    def __init__(self, filepath):
        self.filepath = Path(filepath)
        self.df = None
        self.load_data()
    
    def load_data(self):
        """加载数据"""
        suffix = self.filepath.suffix.lower()
        if suffix in ['.xlsx', '.xls']:
            self.df = pd.read_excel(self.filepath)
        elif suffix == '.csv':
            # 尝试不同编码
            for encoding in ['utf-8', 'gbk', 'gb2312']:
                try:
                    self.df = pd.read_csv(self.filepath, encoding=encoding)
                    break
                except UnicodeDecodeError:
                    continue
        print(f"加载数据:{len(self.df)} 行, {len(self.df.columns)} 列")
    
    def standardize_columns(self):
        """标准化列名"""
        new_columns = {}
        for col in self.df.columns:
            col_clean = str(col).strip()
            if col_clean in self.COLUMN_MAPPING:
                new_columns[col] = self.COLUMN_MAPPING[col_clean]
        self.df = self.df.rename(columns=new_columns)
        print(f"列名标准化完成:{list(self.df.columns)}")
        return self
    
    def clean_strings(self):
        """清洗字符串列"""
        string_cols = self.df.select_dtypes(include=['object']).columns
        for col in string_cols:
            self.df[col] = (self.df[col]
                          .astype(str)
                          .str.strip()
                          .replace(['nan', 'None', '无', 'N/A', '-'], np.nan))
        print(f"字符串清洗完成:{len(string_cols)} 列")
        return self
    
    def handle_missing(self, strategy='drop'):
        """处理缺失值"""
        missing_before = self.df.isnull().sum().sum()
        
        if strategy == 'drop':
            self.df = self.df.dropna(subset=['name', 'employee_id'])
        elif strategy == 'fill':
            # 数值列填充0
            numeric_cols = self.df.select_dtypes(include=[np.number]).columns
            self.df[numeric_cols] = self.df[numeric_cols].fillna(0)
            # 字符串列填充'未知'
            string_cols = self.df.select_dtypes(include=['object']).columns
            self.df[string_cols] = self.df[string_cols].fillna('未知')
        
        missing_after = self.df.isnull().sum().sum()
        print(f"缺失值处理:{missing_before} -> {missing_after}")
        return self
    
    def remove_duplicates(self):
        """删除重复数据"""
        rows_before = len(self.df)
        self.df = self.df.drop_duplicates(subset=['employee_id'], keep='first')
        rows_after = len(self.df)
        print(f"去重:{rows_before} -> {rows_after} 行")
        return self
    
    def convert_types(self):
        """转换数据类型"""
        # 日期列
        if 'hire_date' in self.df.columns:
            self.df['hire_date'] = pd.to_datetime(
                self.df['hire_date'], errors='coerce'
            )
        
        # 数值列
        if 'salary' in self.df.columns:
            self.df['salary'] = pd.to_numeric(
                self.df['salary'].astype(str).str.replace(',', ''),
                errors='coerce'
            )
        
        # 字符串列
        if 'employee_id' in self.df.columns:
            self.df['employee_id'] = self.df['employee_id'].astype(str)
        
        print("数据类型转换完成")
        return self
    
    def clean(self):
        """执行完整清洗流程"""
        return (self
                .standardize_columns()
                .clean_strings()
                .handle_missing()
                .remove_duplicates()
                .convert_types())
    
    def save(self, output_path):
        """保存清洗后的数据"""
        output_path = Path(output_path)
        suffix = output_path.suffix.lower()
        
        if suffix in ['.xlsx', '.xls']:
            self.df.to_excel(output_path, index=False)
        elif suffix == '.csv':
            self.df.to_csv(output_path, index=False, encoding='utf-8-sig')
        
        print(f"数据已保存到:{output_path}")
    
    def report(self):
        """生成清洗报告"""
        print("\n" + "=" * 50)
        print("数据清洗报告")
        print("=" * 50)
        print(f"总行数:{len(self.df)}")
        print(f"总列数:{len(self.df.columns)}")
        print(f"\n列信息:")
        print(self.df.dtypes)
        print(f"\n缺失值:")
        print(self.df.isnull().sum())
        print("=" * 50)

# 使用示例
if __name__ == "__main__":
    cleaner = OADataCleaner('员工数据.xlsx')
    cleaner.clean()
    cleaner.report()
    cleaner.save('员工数据_清洗后.xlsx')

11. 实战练习

练习:清洗考勤数据

"""
练习:清洗OA导出的考勤数据
"""
import pandas as pd
import numpy as np

def clean_attendance_data(filepath):
    """清洗考勤数据"""
    # 读取数据
    df = pd.read_excel(filepath)
    
    # 1. 标准化列名
    column_mapping = {
        '员工姓名': 'name',
        '员工工号': 'employee_id',
        '考勤日期': 'date',
        '上班打卡': 'check_in',
        '下班打卡': 'check_out',
        '工作时长': 'work_hours',
        '考勤状态': 'status',
    }
    df = df.rename(columns=column_mapping)
    
    # 2. 清洗字符串
    df['name'] = df['name'].str.strip()
    df['employee_id'] = df['employee_id'].astype(str).str.strip()
    
    # 3. 处理日期时间
    df['date'] = pd.to_datetime(df['date'], errors='coerce')
    df['check_in'] = pd.to_datetime(df['check_in'], errors='coerce')
    df['check_out'] = pd.to_datetime(df['check_out'], errors='coerce')
    
    # 4. 计算工作时长
    df['work_hours_calc'] = (
        df['check_out'] - df['check_in']
    ).dt.total_seconds() / 3600
    
    # 5. 判断考勤状态
    def determine_status(row):
        if pd.isna(row['check_in']) or pd.isna(row['check_out']):
            return '缺卡'
        check_in_time = row['check_in'].time()
        check_out_time = row['check_out'].time()
        
        from datetime import time
        if check_in_time > time(9, 0):
            return '迟到'
        if check_out_time < time(18, 0):
            return '早退'
        return '正常'
    
    df['status_calc'] = df.apply(determine_status, axis=1)
    
    # 6. 删除无效数据
    df = df.dropna(subset=['name', 'employee_id', 'date'])
    
    return df

# 使用
# df_clean = clean_attendance_data('考勤数据.xlsx')
# df_clean.to_excel('考勤数据_清洗后.xlsx', index=False)

12. 总结

🔑 核心要点

知识点要点
读取数据pd.read_excel(), pd.read_csv()
列名规范化rename(), 映射表
类型转换astype(), to_numeric(), to_datetime()
缺失值dropna(), fillna()
去重drop_duplicates()
字符串清洗.str访问器系列方法
日期处理.dt访问器系列方法

✅ 学习检查清单

  • 能读取Excel和CSV文件
  • 能规范化列名
  • 能处理缺失值和重复数据
  • 能进行字符串清洗
  • 能处理日期时间数据
  • 能编写完整的数据清洗流程

📖 下一步学习

掌握了数据清洗后,让我们学习Excel自动化操作:


常见问题 FAQ

💬 pandas和openpyxl怎么选?

pandas适合数据分析和批量处理,openpyxl适合精确控制Excel格式。数据清洗用pandas,生成带格式的报表用openpyxl,两者经常配合使用。

💬 处理大Excel文件很慢怎么办?

pandas默认用openpyxl引擎,大文件可以用engine='xlrd'(.xls)或分块读取chunksize参数。超大文件考虑先转CSV再处理。


系列导航

End of file.