第六章

零、练一练

练一练

请写出以下集合A和集合B的笛卡尔积:

  • \(A=\{0\}\)\(B=\{0\}\)

  • \(A=\{1, 2, 3\}\)\(B=\{4, 5, 6\}\)

  • \(A=\{\{1,2\}, 3\}\)\(B=\{4,\{5,6\}\}\)

  • \(\{(0, 0)\}\)

  • \(\{(1,4),(1,5),(1,6),(2,4),(2,5),(2,6),(3,4),(3,5),(3,6)\}\)

  • \(\{(\{1,2\},4),(\{1,2\},\{5,6\}),(3,4),(3,\{5,6\})\}\)

练一练

假设左表键的所在列元素列表为[1,2,2,3,3,3],右表键与左表相同,请问用4种方式连接得到的结果表分别具有多少行?

4种连接方式都是\(1^2+2^2+3^2=14\)

练一练

请构造两张表使它们在用merge()合并时能够通过“1:m”的检查但无法通过“m:1”模式的检查。

df_left = pd.DataFrame({"A":[1,2], "B":["a","b"]})
df_right = pd.DataFrame({"A":[1,1,2], "C":["c","d","e"]})
df_left.merge(df_right, on="A", validate="1:m")
A B C
0 1 a c
1 1 a d
2 2 b e
df_left.merge(df_right, on="A", validate="m:1")
---------------------------------------------------------------------------
MergeError                                Traceback (most recent call last)
Input In [4], in <module>
----> 1 df_left.merge(df_right, on="A", validate="m:1")

File ~\miniconda3\envs\final\lib\site-packages\pandas\core\frame.py:9329, in DataFrame.merge(self, right, how, on, left_on, right_on, left_index, right_index, sort, suffixes, copy, indicator, validate)
   9310 @Substitution("")
   9311 @Appender(_merge_doc, indents=2)
   9312 def merge(
   (...)
   9325     validate: str | None = None,
   9326 ) -> DataFrame:
   9327     from pandas.core.reshape.merge import merge
-> 9329     return merge(
   9330         self,
   9331         right,
   9332         how=how,
   9333         on=on,
   9334         left_on=left_on,
   9335         right_on=right_on,
   9336         left_index=left_index,
   9337         right_index=right_index,
   9338         sort=sort,
   9339         suffixes=suffixes,
   9340         copy=copy,
   9341         indicator=indicator,
   9342         validate=validate,
   9343     )

File ~\miniconda3\envs\final\lib\site-packages\pandas\core\reshape\merge.py:107, in merge(left, right, how, on, left_on, right_on, left_index, right_index, sort, suffixes, copy, indicator, validate)
     90 @Substitution("\nleft : DataFrame or named Series")
     91 @Appender(_merge_doc, indents=0)
     92 def merge(
   (...)
    105     validate: str | None = None,
    106 ) -> DataFrame:
--> 107     op = _MergeOperation(
    108         left,
    109         right,
    110         how=how,
    111         on=on,
    112         left_on=left_on,
    113         right_on=right_on,
    114         left_index=left_index,
    115         right_index=right_index,
    116         sort=sort,
    117         suffixes=suffixes,
    118         copy=copy,
    119         indicator=indicator,
    120         validate=validate,
    121     )
    122     return op.get_result()

File ~\miniconda3\envs\final\lib\site-packages\pandas\core\reshape\merge.py:710, in _MergeOperation.__init__(self, left, right, how, on, left_on, right_on, axis, left_index, right_index, sort, suffixes, copy, indicator, validate)
    706 # If argument passed to validate,
    707 # check if columns specified as unique
    708 # are in fact unique.
    709 if validate is not None:
--> 710     self._validate(validate)

File ~\miniconda3\envs\final\lib\site-packages\pandas\core\reshape\merge.py:1444, in _MergeOperation._validate(self, validate)
   1442 elif validate in ["many_to_one", "m:1"]:
   1443     if not right_unique:
-> 1444         raise MergeError(
   1445             "Merge keys are not unique in right dataset; "
   1446             "not a many-to-one merge"
   1447         )
   1449 elif validate in ["many_to_many", "m:m"]:
   1450     pass

MergeError: Merge keys are not unique in right dataset; not a many-to-one merge

练一练

join()函数没有实现merge()函数中的validate参数,请构造1个join_with_validate函数,其参数包含df1、df2、on、how和validate,完成与merge()类似的功能。(允许在join_with_validate内部调用join()函数)

# is_unique未在教材中提到,读者可以使用集合去重再判断元素数量的方法
def join_with_validate(df1, df2, on, how="left", validate="m:m"):
    df1_index_not_unique = not df1.index.is_unique
    df2_index_not_unique = not df2.index.is_unique
    if validate=="1:1" and (df1_index_not_unique or df2_index_not_unique):
        raise ValueError(
            "Join keys are not unique in dataset; "
            "not a one-to-one merge"
        )
    if validate=="1:m" and df1_index_not_unique:
        raise ValueError(
            "Join keys are not unique in left dataset; "
            "not a one-to-many merge"
        )
    if validate=="m:1" and df2_index_not_unique:
        raise ValueError(
            "Join keys are not unique in right dataset; "
            "not a many-to-one merge"
        )
    return df1.join(df2, on=on, how=how)

练一练

给定两个具有相同行列索引的DataFrame,设s1和s2分别是传入规则函数的左表列和右表列,请依次根据如下规则更新进行组合:

  • 当s1中的元素值为0时,使用s2对应位置元素更新,否则保持不变。

  • 当s1中的元素值超过s2的均值时,使用s1+s2的对应位置元素更新,否则使用s1-s2的对应位置元素更新。

  • 1

df1 = pd.DataFrame({"A":[0,2],"B":[4,1]})
df2 = pd.DataFrame({"A":[1,0],"B":[3,-1]})
df1.combine(df2, lambda s1, s2: s1.mask(s1==0, s2))
A B
0 1 4
1 2 1
  • 2

df1.combine(df2, lambda s1, s2: (s1-s2).mask(s1>s2.mean(), s1+s2))
A B
0 -1 7
1 2 2

一、合并员工信息表

在data/ch6/employee文件夹下,存放了某公司的员工信息。其中,salary目录下存放了从2018年1月至2020年12月时间段内,每月员工的基本工资数额,award目录下存放了每月员工的奖金数额,员工最终的工资等于基本工资加上奖金。(注:本题中数据均为随机生成,与现实无关)

  • info_a.csv、info_b.csv和info_c.csv中分别存放了公司员工的不同信息,请提取ID从“ID-000001”至“ID-025000”对应员工的邮箱、性别、年龄和学历,并将它们合并成一张表,索引为员工ID。

  • 对所有ID在上题范围内的员工统计从18年1月至20年12月每个季度的工资总数,并将这12个季度的结果作为新的列添加至上一问的结果表中。统计2018年第1季度的结果时,列名即为“2018-Q1”,其他季度对应列的名字以此类推。

【解答】
  • 1

res = pd.concat(
    [
        pd.read_csv(
            "data/ch6/employee/info_%s.csv"%i
        ).set_index("ID")
        for i in list("abc")
    ], axis=1
)
res = res.reindex(["ID-%06d"%(i) for i in range(1, 25001)])
res = res[["邮箱","性别","年龄","学历"]]
res.head()
邮箱 性别 年龄 学历
ID
ID-000001 oZuSkKV@qq.com 38.0 研究生
ID-000002 IWmJnPR@yahoo.com 37.0 高中
ID-000003 NWIIsdP@hotmail.com 49.0 高中
ID-000004 lypSDjU@sina.com 27.0 研究生
ID-000005 ibvJneI@hotmail.com 24.0 研究生
  • 2

data = pd.concat(
    [
        pd.read_csv(
            "data/ch6/employee/award/%d-%02d.csv"%(y,m), index_col="ID"
        ) + pd.read_csv(
            "data/ch6/employee/salary/%d-%02d.csv"%(y,m), index_col="ID"
        ) for y in range(2018,2021) for m in range(1,13)
    ], axis=1
)
values = data.values.reshape(25000, -1, 3).sum(-1)
res_values = pd.DataFrame(
    values,
    index=res.index,
    columns=["%d-Q%d"%(y,q) for y in range(2018,2021) for q in range(1,5)]
)
res = pd.concat([res, res_values], axis=1)
res.iloc[:5, :8] # 展示部分
邮箱 性别 年龄 学历 2018-Q1 2018-Q2 2018-Q3 2018-Q4
ID
ID-000001 oZuSkKV@qq.com 38.0 研究生 37200 40100 50400 43900
ID-000002 IWmJnPR@yahoo.com 37.0 高中 43400 40000 41900 44100
ID-000003 NWIIsdP@hotmail.com 49.0 高中 27800 43700 21100 48300
ID-000004 lypSDjU@sina.com 27.0 研究生 50900 33300 42700 49900
ID-000005 ibvJneI@hotmail.com 24.0 研究生 44100 40300 29600 36800

二、实现join函数

请按照如下要求实现join()函数:

  • 函数的调用方式为join(df1, df2, how, lsuffix, rsuffix)

  • 传入的df1和df2参数都为单级索引的DataFrame

  • how参数支持left、right、inner、outer和cross

  • 给出测试样例,并与pandas中join()的运行结果进行对比

  • 在实现过程中允许使用pd.concat()

备注

由于合并时可能产生缺失值,导致自定义join函数和pandas内置的join函数在列的dtype上会产生差别,此时使用equals()函数在这种情况下不能进行判定,可以使用pd.testing.assert_frame_equal(my_result, pandas_result, check_dtype=False)来进行对比。assert_frame_equal()通过check_dtype参数可以关闭列类型的检查,它在两个表存在差异时会报错,在两个表相同时不进行任何操作。

【解答】
def join(df1, df2, how, lsuffix=None, rsuffix=None):
    idx1, idx2 = df1.columns, df2.columns
    idx_intersect = idx1.intersection(idx2)
    if len(idx_intersect) > 0:
        if lsuffix==None or rsuffix==None:
            raise ValueError(
                "columns overlap but get suffix not specified: " 
                + str(idx_intersect))
        df1 = df1.rename(columns={i:i+lsuffix for i in idx_intersect})
        df2 = df2.rename(columns={i:i+rsuffix for i in idx_intersect})
    idx1, idx2 = df1.index, df2.index
    idx = idx1.union(idx2).unique().sort_values()
    columns = pd.Index(df1.columns.tolist() + df2.columns.tolist())
    res = pd.DataFrame(columns=columns, index=pd.Index([], name=idx1.name))
    for x in idx:
        _idx1, _idx2 = idx1 == x, idx2 == x
        in1, in2 = bool(_idx1.sum()), bool(_idx2.sum())
        if in1 and in2:
            _df1, _df2 = df1.loc[[x]], df2.loc[[x]]
            if how in ["right"]:
                for i in range(_df2.shape[0]):
                    _res = pd.concat([_df2.iloc[[i]]] * _df1.shape[0])
                    _res = pd.concat([_df1, _res], axis=1)
                    res = pd.concat([res, _res])
            else:
                for i in range(_df1.shape[0]):
                    _res = pd.concat([_df1.iloc[[i]]] * _df2.shape[0])
                    _res = pd.concat([_res, _df2], axis=1)
                    res = pd.concat([res, _res])
        elif not in1 and how in ["right", "outer"]:
            _res = df2.loc[[x]].copy()
            for c in df1.columns:
                _res[c] = np.nan
            res = pd.concat([res, _res.reindex(columns, axis=1)])
        elif not in2 and how in ["left", "outer"]:
            _res = df1.loc[[x]].copy()
            for c in df2.columns:
                _res[c] = np.nan
            res = pd.concat([res, _res])
    return res
my_res_left = join(df1, df2, how="left", lsuffix="_x", rsuffix="_y")
my_res_right = join(df1, df2, how="right", lsuffix="_x", rsuffix="_y")
my_res_inner = join(df1, df2, how="inner", lsuffix="_x", rsuffix="_y")
my_res_outer = join(df1, df2, how="outer", lsuffix="_x", rsuffix="_y")
pd_res_left = df1.join(df2, lsuffix="_x", rsuffix="_y", how="left")
pd_res_right = df1.join(df2, lsuffix="_x", rsuffix="_y", how="right")
pd_res_inner = df1.join(df2, lsuffix="_x", rsuffix="_y", how="inner")
pd_res_outer = df1.join(df2, lsuffix="_x", rsuffix="_y", how="outer")
from pandas.testing import assert_frame_equal
assert_frame_equal(my_res_left, pd_res_left, check_dtype=False)
assert_frame_equal(my_res_right, pd_res_right, check_dtype=False)
assert_frame_equal(my_res_inner, pd_res_inner, check_dtype=False)
assert_frame_equal(my_res_outer, pd_res_outer, check_dtype=False)

三、条件连接

在本章介绍的关系型连接中,merge()、join()和concat()都是等值连接,即每一个左表键中的label只会与右表键中完全相同的label进行笛卡尔积的匹配。现在,我们希望左表中的键只要与右表中的键满足一定条件就进行匹配,下面给出一种根据大小关系匹配的例子。

假设df1和df2的构造如下:

df1 = pd.DataFrame({"Key":[0,1,1,2], "Col1":[10,20,30,40]})
df1
Key Col1
0 0 10
1 1 20
2 1 30
3 2 40
df2 = pd.DataFrame({"Key":[1,1,2,3], "Col2":[50,60,70,80]})
df2
Key Col2
0 1 50
1 1 60
2 2 70
3 3 80

我们希望通过conditional_merge()函数对df1和df2进行左连接,连接规则是左键元素值不得小于右键元素值,即conditional_merge(df1, df2, on="Key", how="left", rule="x>=y")的期望结果如下所示:

Key_x Col1 Key_y Col2
0 0 10 NaN NaN
1 1 20 1.0 50.0
2 1 20 1.0 60.0
3 1 30 1.0 50.0
4 1 30 1.0 60.0
5 2 40 1.0 50.0
6 2 40 1.0 60.0
7 2 40 2.0 70.0
  • 实现上述根据大小关系连接的conditional_merge()函数,其中rule参数可取"x>=y"、"x>y"、"x==y"、"x!=y"、"x<=y"和"x<y"。此处仅实现左连接版本即可,即无需考虑how参数。

  • 在data/ch6/left.csv和data/ch6/right.csv中分别存放了两张表,我们希望对两张表以经纬度(Longitude和Latitude)为键进行条件连接,连接规则是左键元素值和右键元素值的球面距离不得超过\(d\)千米,请实现这个连接函数spherical_merge(df1, df2, distance=d, on=["Longitude", "Latitude"])。此处由于左键和右键均无重复值,故无需考虑连接方式。球面距离的计算可以通过sklearn库的haversine_distances()函数实现,其安装方式为conda install scikit-learn,使用方法如代码所示。

from sklearn.metrics.pairwise import haversine_distances
df1 = pd.read_csv("data/ch6/left.csv").head()
df2 = pd.read_csv("data/ch6/right.csv").head()
def get_distance(df1, df2):
    rad1 = np.stack([np.radians(df1.Latitude), np.radians(df1.Longitude)], axis=-1)
    rad2 = np.stack([np.radians(df2.Latitude), np.radians(df2.Longitude)], axis=-1)
    result = haversine_distances(rad1, rad2) * 6371000 / 1000 # 乘以地球半径并转为km
    return result
get_distance(df1, df2) # 第i行第j列代表df1的第i个点到df2的第j个点的球面距离
array([[ 655.96280506, 1449.73574186,  276.84030423,  802.01737182,
         540.95925895],
       [1272.59889369,  387.16558823, 1358.01183355, 1582.95966523,
        1069.38156885],
       [ 669.35082711,  467.07489446,  742.49209123, 1002.65697447,
         458.35353128],
       [1104.42662432,  146.24538034, 1254.17621209, 1393.16408848,
         973.28985989],
       [ 388.63154244,  754.15164741,  427.75601403,  735.42275194,
         172.40958435]])
【解答】
  • 1

df1 = pd.DataFrame({"Key":[0,1,1,2], "Col1":[10,20,30,40]})
df2 = pd.DataFrame({"Key":[1,1,2,3], "Col2":[50,60,70,80]})
rule_helper = lambda rule: lambda x, y: eval(rule)
def conditional_merge(df1, df2, on, rule):
    grouper = df1.groupby(on)
    rule_func = rule_helper(rule)
    def merge_helper(_df1):
        left_key = _df1[on].iloc[0]
        right_key = df2[on]
        _df2 = df2[rule_func(left_key, right_key)]
        if _df2.shape[0] == 0:
            _df1 = _df1.rename(columns={on: on+"_x"})
            _df2 = _df2.rename(columns={on: on+"_y"})
            return pd.concat([_df1, _df2], axis=1)
        else:
            return _df1.merge(_df2, how="cross")
    return grouper.apply(merge_helper).reset_index(drop=True)
conditional_merge(df1, df2, "Key", "x>=y")
Key_x Col1 Key_y Col2
0 0 10 NaN NaN
1 1 20 1.0 50.0
2 1 20 1.0 60.0
3 1 30 1.0 50.0
4 1 30 1.0 60.0
5 2 40 1.0 50.0
6 2 40 1.0 60.0
7 2 40 2.0 70.0
  • 2

df1 = pd.read_csv("data/ch6/left.csv")
df2 = pd.read_csv("data/ch6/right.csv")

def spherical_merge(df1, df2, d=200, on=["Longitude", "Latitude"]):
    if on[0] not in df1.columns or on[0] not in df2.columns:
        raise ValueError("Longitude not in df1's columns or df2's columns.")
    if on[1] not in df1.columns or on[1] not in df2.columns:
        raise ValueError("Latitude not in df1's columns or df2's columns.")
    distance = get_distance(df1, df2)
    res = pd.concat(
        [
            df1.iloc[[i]].merge(df2.loc[distance[i] <= d], how="cross")
            for i in range(df1.shape[0])
        ]
    )
    return res.reset_index(drop=True)

res = spherical_merge(df1, df2, 30)
res.head()
Longitude_x Latitude_x Col1 Longitude_y Latitude_y Col2
0 121.102062 30.216142 0 121.243898 30.353029 7
1 115.885544 36.079370 0 115.947204 35.922811 2
2 113.791836 36.724240 9 113.709950 36.608670 4
3 128.633906 39.729151 1 128.536029 39.631795 2
4 128.633906 39.729151 1 128.636761 39.898670 5