查看原文
其他

案例分享:使用 Python 批量处理统计年鉴数据(下)

分享技巧的 数据Seminar 2023-04-10

目录

一、引言

二、处理思路

  (1)提取所有子表

  (2)清洗所有子表

  (3)分情况合并所有子表

三、代码

  (1)定义提取所有子表的函数

  (2)定义清洗所有子表的函数

  (3)定义匹配/合并所有子表的函数

  (4)调用上述函数处理单个年鉴表

四、在不同结构的年鉴表上测试

  (1)测试样例一

  (2)测试样例二

  (3)测试样例三

五、总结


本文共9445个字,阅读大约需要24分钟,欢迎指正!

一、引言

上期文章我们向大家分享了一个使用 Python 批量处理统计年鉴表的案例,文章中我们以一个实际案例为背景向大家展示了使用 Python 处理(知网提供的)统计年鉴表的流程,并提供了一套处理思路。整个流程中,处理与清洗表格内容是最难实现的一步,这是因为统计年鉴表的内容形式非常多样,有的年鉴表中存放了多个续表,这些续表的分布没有绝对的规律,有的则存放了多个 Sheet 表。上期文章的案例中,我们在已知全部年鉴表的表结构的情况下对案例中的年鉴表做了针对性处理,也就是说,案例中的代码只能处理案例中的年鉴表。如果处理其他统计年鉴表,还要重新编写代码,这样就太麻烦了。

那有没有这样一种可能,无论当一份年鉴中是否包含续表,包含多少个续表,无论表头多么散乱,我们都可以用同一份代码将这些表格提取、清洗出来,并且根据表格内容合并为一张大表?当然可以!下面我们就将代码和处理思路分享给大家。


💡 后台回关键词“20230324”获取本期文章的所有代码和测试用的统计年鉴表(代码需要在 Jupyter Notebook 环境中使用)

注:本文代码由企研数据原创,仅作交流使用,未经同意不可用于商业目的,违者必究!

二、处理思路

(1)提取所有子表

处理年鉴表较为关键的一步就是从包含续表的年鉴表中提取出所有的子表。以笔者处理过几千份统计年鉴表的经验来看,含有续表的年鉴表都有一个特点,即续表的左上方都含有关键词“续表”,如下图所示。

这个特点就是我们使用 Python 提取所有子表的的突破口,我们可以根据这些关键词出现的位置将每一个子表都找出来。提取子表之前我们还要判断待处理的年鉴表是否含有多个 Sheet 表,如果有,则要对每个 Sheet 表都要做提取子表操作。当然,如果一个年鉴表或者年鉴表中的某个 Sheet 表只有一个表格,不含有续表,那只读取这张表即可,最后再将一份年鉴表中的所有子表提取出来并存放在一个列表中。

(2)清洗所有子表

提取出所有子表后,需要对每一个子表做清洗,主要是删除不需要的行、列;对表格的表头(字段名)做处理。由于年鉴表中表头处常常使用合并单元格,导致使用 Python 等工具读取表格后表头极其不规范,如下图所示。

这一步需要将散乱的字段名规范化,去除字段名中的换行符等空字符,不过为了不影响字段名中的英文语句,处理字段名时还得保留英文单词之间的空格。

同时,一些年鉴表的下方还含有一些注释信息,这些注释同样也需要删除,如下图所示。

(3)分情况合并所有子表

清洗所有子表后,已经搞清楚字段名,接下来需要根据字段名判断这些子表是需要纵向拼接为一张表,还是需要横向匹配为一张表,又或者不需要拼接匹配。最后根据子表的情况做合并处理并返回合并结果。如果没有要合并的表,那么返回清洗后的所有子表即可(这种情况下处理的结果是包含所有已清洗子表的列表)。

三、代码


💡 温馨提示:代码太宽看不全?微信公众号内代码框可以左右滑动哦~

(1)定义提取所有子表的函数

# 导入使用到的库
import re
import pandas as pd
import numpy as np

# 下面是用于提取所有子表的函数
def Find_XuBiao_index(df):
    '''找到年鉴中所有包含关键词【“续表”】的单元格的位置'''
    XB_index = []
    # 输入的表的表头
    df = df.fillna('')
    for row_ind, Row in df.iterrows():
        row_xb_ind = []
        # row_ind 是行索引, Row 第 row_ind 行的内容
        for col_ind, Val in zip(Row.index, Row.values):
            if bool(re.search('续\s*?表', Val)):
                row_xb_ind.append((row_ind, col_ind))
        if row_xb_ind:
            # 如果在某一行中找到“续表”,那么将续表出现的位置列表收集起来
            XB_index.append(row_xb_ind)
    if not XB_index or (len(XB_index)==1 and XB_index[0][0] in [(0,0), (1,0), (2,0)]):
        # 如果整表都没有出现“续表”,或者“续表”仅出现一次,且是在前三行首列出现的,
        # 那么说明该年鉴只有一个表, 那么处理结果等于输入的表
        return df
    if XB_index[0][0] in [(0,0), (1,0), (2,0)]:
        # 第一张表也有出现“续表”,直接返回即可
        ALL_XB_IND = pd.DataFrame(XB_index)
        return [ALL_XB_IND]
    df_ind = pd.DataFrame(XB_index)
    if df_ind.shape[1] > 1:
        # 横向、纵向都有续表
        R1 = XB_index[0]
        R1.insert(0, (0,0))
        XB_index[0] = R1
    else:
        if XB_index[0][0][1] > 1:
            # 横向有续表
            R1 = XB_index[0]
            R1.insert(0, (0,0))
            XB_index[0] = R1
        else:
            # 纵向有续表
            XB_index.insert(0, [(0,0)])
    ALL_XB_IND = pd.DataFrame(XB_index)
    return [ALL_XB_IND]    # 放在列表中,在返回值的类型与上一步区别开来
        
def Split_table(DF, indf):
    '''
    根据“续表”出现的位置,提取出所有子表,存放在一个列表中返回
    DF :Python 读取后的原始年鉴表
    indf: 续表出现的位置,从自定义函数 Find_XuBiao_index 中得到
    '
''
    Datas_list = []
    # 处理纵向单层续表
    if indf.shape[1] == 1:
        for row_ind, INDS in zip(indf[0].index, indf[0].values):
            if indf.shape[0]-1 > row_ind:
                # 当前续表下方还有续表
                Datas_list.append(DF.loc[INDS[0]:indf[0].values[row_ind+1][0]-1, :])
            else:
                # 当前续表下方就是表底
                Datas_list.append(DF.loc[INDS[0]:, :])
        return Datas_list
    # 处理横向单层续表
    elif indf.shape[0] == 1:
        first_row = indf.T[0]
        for col_ind, INDS in zip(first_row.index, first_row.values):
            if indf.shape[1]-1 > col_ind:
                # 当前续表右侧还有续表
                Datas_list.append(DF.loc[:, INDS[1]:first_row[col_ind+1][1]-1])
            else:
                # 当前续表右侧就是表底
                Datas_list.append(DF.loc[:, INDS[1]:])
        return Datas_list
    # 处理横向、纵向都有续表的情况
    else:
        # 一行一行处理
        for row_ind, row in indf.iterrows():
            if indf.shape[0]-1 > row_ind:
                # 当前续表下方还有续表
                for col_ind, INDS in zip(row.index, row.values):
                    if indf.shape[1]-1 > col_ind:
                        # 当前续表右侧还有续表
                        downINDS = indf[col_ind][row_ind+1]
                        rightINDS = indf[col_ind+1][row_ind]
                        Datas_list.append(DF.loc[INDS[0]:downINDS[0]-1, INDS[1]:rightINDS[1]-1])
                    else:
                        # 当前续表右侧就是表底
                        downINDS = indf[col_ind][row_ind+1]
                        if downINDS:
                            # 当前续表正下方还有续表
                            Datas_list.append(DF.loc[INDS[0]:downINDS[0]-1, INDS[1]:])
                        else:
                            # 当前续表左下方还有续表,但正下方没有续表
                            leftdown = indf[col_ind-1][row_ind+1]
                            Datas_list.append(DF.loc[INDS[0]:leftdown[0]-1, INDS[1]:])
            else:
                # 当前续表下方就是表底
                for col_ind, INDS in zip(row.index, row.values):
                    if indf.shape[1]-1 > col_ind:
                        # 当前续表右侧可能还有续表
                        rightINDS = indf[col_ind+1][row_ind]
                        if rightINDS:
                            # 最后一行有多个续表
                            Datas_list.append(DF.loc[INDS[0]:, INDS[1]:rightINDS[1]-1])
                        else:
                            # 最后一行仅有一个续表
                            Datas_list.append(DF.loc[INDS[0]:, INDS[1]:])
                    else:
                        # 当前续表右侧就是表底
                        if INDS:
                            # 当前存在续表
                            Datas_list.append(DF.loc[INDS[0]:, INDS[1]:])
                        else:
                            # 当前已经没有续表
                            pass
    return Datas_list
    # 返回的结果是一个列表

(2)定义清洗所有子表的函数

def Clean_single_table(table):
    '''
    对拆分后的子表做清洗 
    table : 提取后,待清洗的单个子表
    '
''
    # 先重置索引
    table = table.reset_index(drop=True)
    table.columns = list(range(table.shape[1]))
    # 去除所有单元格中的空格和换行符
    table = table.applymap(lambda x: re.sub('\n|\t|\r'' ', x))
    # 去除非英文字母前/后面的空格换行符,这样可以保留英文字母之间的空格
    table = table.applymap(lambda x: re.sub('\s+'' ', x))  # 多个空格变成一个空格,为了下一步服务
    table = table.applymap(lambda x: re.sub('(?<=[^A-Za-z])\s+|\s+(?=[^A-Za-z])''', x))     
    # 表格的前几行内容只有第一列有内容,基本上是介绍类文字或【续表】关键字,确认后删除这一行
    for i in table.index:
        first_row = list(table.loc[i, :])
        while '' in first_row: first_row.remove('')
        if len(first_row) <= 1:
            # 删除首行
            table = table.drop(i)
        else:
            break
    # 删除了一些行,这里需要重置索引
    table = table.reset_index(drop=True)
    # 删除所有的全空的行,列
    table = table.replace('', np.nan)
    table = table.dropna(how='all')
    table = table.dropna(how='all', axis=1).fillna('')
    # 再次重置索引
    table = table.reset_index(drop=True)

    ind_num = 0
    # 找到数值行第一行的行索引,该索引(含本身)下方都是指标值,上方就是散乱的表头
    table = table.applymap(str)
    for ind, row in table.iterrows():
        row_info = ' '.join(row)       # 将一行数据拼接在一起
        if bool(re.search('\d+\.\d+|\d{2,}', row_info))\
            and not bool(re.search('[\u4e00-\u9fa5]\d|\d[\u4e00-\u9fa5]|\d+\-\d+|[A-Za-z\((]\d+|\d+[A-Za-z\))]', row_info))\
            and row[0]:
            # 如果在这一行中找到了数字,且没有数字和汉字、字母、括号相邻的情况出现,且第一列内容不为空。那么这一行就是数值指标的第一行
            ind_num = ind
            break
    if ind_num:
        # 此外还需注意,一部分年鉴表的数值行第一行除了第一列之外,其他列是全空的,那么这一行也算作数值行
        # 所以找到数字行之后,再确认数字行的上一行是否也是我们需要的数字行
        if table.loc[ind_num-1, 0] and not ''.join(table.loc[ind_num-1, 1:]):
            if ind_num > 1:
                ind_num = ind_num - 1
    # 选取需要的数据行
    if ind_num:
        table_value = table.iloc[ind_num:, :].reset_index(drop=True)
    else:
        # 找不到数值所在行,这种情况一般不会发生
        return table
    # 处理表头,由于表头可能纵跨多个单元格,所以这里纵向拼接上方的文字作为表头
    table_head =  table.iloc[:ind_num, :]
    COLS = []
    for i in table_head.columns:
        Col = table_head[i]
        head = ''.join(Col)
        COLS.append(head)

    # 将得到的表头用在数值表上
    table_value.columns = COLS
    # 删除表格后几行的注释(可能存在)
    annotation_row = 0
    for ind, row in table_value.iterrows():
        if bool(re.search('^注:|^注:', row[0])) and not ''.join(list(row.values)[1:]):
            # 第一列出现“注:”或“注:”,且后面的值都为空,则认定是注释
            annotation_row = ind
            break
    if annotation_row:
        # 找到注释
        table_value = table_value.iloc[:annotation_row, :]
        # 再次删除全空的行
        table = table.replace('', np.nan)
        table = table.dropna(how='all').reset_index(drop=True)
    else:
        # 没有注释
        pass
    # 返回清洗结果
    return table_value
    

def GET_TABLES(File_path):
    '''
    输入:统计年间的路径
    输出:拆分、处理后的所有续表
    '
''
    # 指定参数 sheet_name=None,将会读取文件中所有的 sheet 表
    # 所得结果是一个字典,键是 sheet 名,值是表格内容
    DATA = pd.read_excel(File_path, header=None, sheet_name=None, dtype=str)
    # 知网的年鉴表一般会隐藏一个名为 “CNKI”的 sheet, 里面存放的是知网的网址,这里选择删除
    if 'CNKI' in DATA: del(DATA['CNKI'])
    # 将所有存放在一个列表中
    DATA_LIST = list(DATA.values())

    if len(DATA_LIST) == 1:
        DATA = DATA_LIST[0].fillna('')
        # 获取续表的分布的情况
        XB_distribution = Find_XuBiao_index(DATA)

        if not isinstance(XB_distribution, list):
            # 返回的是表,说明该统计年鉴不存在续表
            # 只需要清洗该表即可
            Result = Clean_single_table(XB_distribution)
        else:
            # 返回的是列表,说明该统计年鉴存在续表
            XB_distribution = XB_distribution[0]
            # 根据续表的分布,提取出所有续表, 返回值是一个列表
            ALL_XB = Split_table(DATA, XB_distribution)
            # 依次清洗提取出来的续表
            Result = []
            for XB in ALL_XB:
                Result.append(Clean_single_table(XB))
    else:
        # 含有多个 sheet
        DATA = [data.fillna(''for data in DATA_LIST]
        Result = []
        for Data in DATA:
            XB_distribution = Find_XuBiao_index(Data)
            if not isinstance(XB_distribution, list):
                result_sheet = Clean_single_table(XB_distribution)
                # print(1)
                Result.append(result_sheet)
            else:
                XB_distribution = XB_distribution[0]
                ALL_XB = Split_table(Data, XB_distribution)
                result_sheet = []
                for XB in ALL_XB:
                    # print(1)
                    result_sheet.append(Clean_single_table(XB))
                Result.extend(result_sheet)
    return Result

(3)定义匹配/合并所有子表的函数

判断所有子表之间是否有继续进行匹配/合并的可能,如果可以,则匹配/合并为一张表,如果这些子表之间没有直接的关系,则将输入的所有子表原封不动返回(一般情况下这种情况不会发生)。

def Merge_XB(tables):
    """
    tables : 分离、清洗后的所有子表
    "
""
    if not isinstance(tables, list):
        # 如果只有一个表,说明没有续表,无需合并/匹配
        return tables
    else:
        # 含有多个子表
        # 1. 判断所有子表是否需要纵向拼接,所有子表的字段名完全一样时,执行此操作
        tables_cols = [list(df.columns) for df in tables]
        Headers = pd.DataFrame(tables_cols)          # 所有子表的表头组成一张表
        # return Headers
        if Headers.drop_duplicates().shape[0] == 1:
            # 所有子表的表头都一样,说明可以纵向拼接
            table_merge = pd.concat(tables).reset_index(drop=True)
            return table_merge
        # 找到所有可以用于匹配的字段,默认都是最左边的字段
        ppcol = 0
        for i in range(1, Headers.shape[1]):
            Headers_pp = Headers[list(range(i))]
            if Headers_pp.drop_duplicates().shape[0] == 1:
                ppcol = i
            else:
                break
        # 根据上一步的结果进行匹配
        if ppcol > 0:
            # 所有表的前 ppcol 是一样的,可以根据前 ppcol 列进行匹配
            # 匹配之前需要清洗所有表格的第一个字段,去除空格
            tables_new = []
            for table in tables:
                table = table.fillna('').applymap(lambda x: re.sub('\s+''', x))
                tables_new.append(table)
            # 表格清洗后,逐个匹配,合并为一张表
            data = tables_new[0]
            PPcol = list(data.columns)[:ppcol]     # 用于匹配的 字段名列表
            for table in tables_new[1:]:
                data = pd.merge(left=data, right=table, how='outer', on=PPcol)
            return data.fillna('')
        else:
            # 如果以上条件都不满足,那么返回输入的值
            return tables

(4)调用上述函数处理单个年鉴表

# 1.先指定统计年鉴表的存放路径
File_path = './样例年鉴/山西省_93209.xlsx'

# 2.调用自定义函数根据年鉴表的路径进行处理
Final_result = Merge_XB(GET_TABLES(File_path))     # 变量 Final_result 就是最终的处理结果

# 3. 将处理后的数据写入 excel 表
Final_result.to_excel('你要存放的路径以及文件名', index=False)



💡注意:这里处理的统计年鉴表指的是从知网下载的,且已经过另存操作的年鉴表文件。否则无法正常读取和处理表格。如果对“另存操作”有疑问,您可能需要移步至上期文章:案例分享|使用 Python 批量处理统计年鉴表的案例

另外,此代码一次只能处理一份统计年鉴表,如果你有许多年鉴表需要处理,请根据实际情况在循环中调用上述函数。

四、在不同结构的年鉴表上测试

笔者使用上述代码对七种表结构不同的年鉴进行处理,均取得了较为理想的效果,下面我们分享几个测试样例。

(1)测试样例一

待处理统计年鉴表为2013年浙江省各市城镇居民平均每人全年收支情况,表中只有一个 Sheet 表,该表内共有两个子表,两子表横向分布,从两表内容上看,可以根据“城市”“city”两个字段匹配为一张表。如下图所示。

使用上文中的代码处理此年鉴表:

File_path = './样例年鉴/5-26 各市城镇居民平均每人全年收支情况(2013年)_30681.xlsx'
Final_result = Merge_XB(GET_TABLES(File_path))

得到如下图所示处理结果。

(2)测试样例二

处理含有 6 个子表的统计年鉴表——2020年按地区分组主要工业产品产量,原始数据表中只有一个 Sheet 表,续表的分布形式为 3行*2列,部分数据如下图所示。

使用本文代码处理后,所得结果如下图所示(没有截全)。

(3)测试样例三

处理统计年鉴表2019年浙江省各市、县居民消费价格指数(上年=100),该年鉴表含有两个 Sheet 表,第一个 Sheet 表中有两个子表,第二个 Sheet 表中有一个子表,如下图所示。

处理结果如下图所示。


💡 后台回关键词“20230324”获取本期文章的所有代码和测试用的统计年鉴表(代码需要在 Jupyter Notebook 环境中使用)

注:本文代码由企研数据原创,仅作交流使用,未经同意不可用于商业目的,违者必究!

总结

最后笔者想说,由于知网提供的统计年鉴表的内容样式不胜枚举,并且许多年鉴表的内容本身就存在问题,例如年鉴表的表头(字段名)具有层级关系,如下图所示。

内容不连续(正常的话应该存放在一个单元格里面):

错别字:

以上问题导致统计年鉴表的批量处理和匹配/合并经常出现问题。由于本文提供的代码并没有对以上问题做针对性处理(实际上也不好处理……),所以处理这类年鉴表时,所得结果会有一些瑕疵。因此笔者不能向大家保证这份代码可以用于所有统计年鉴表的处理。不过,处理 90% 以上的统计年鉴表应该没有问题。




星标⭐我们不迷路!想要文章及时到,文末“在看”少不了!

点击搜索你感兴趣的内容吧





数据Seminar




这里是大数据、分析技术与学术研究的三叉路口


文 | 两米哥


    欢迎扫描👇二维码添加关注    

点击下方“阅读全文”了解更多

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存