第六章
内容
第六章¶
零、练一练¶
练一练
请写出以下集合A和集合B的笛卡尔积:
\(A=\{0\}\);\(B=\{0\}\)
\(A=\{1, 2, 3\}\);\(B=\{4, 5, 6\}\)
\(A=\{\{1,2\}, 3\}\);\(B=\{4,\{5,6\}\}\)
\(\{(0, 0)\}\)
\(\{(1,4),(1,5),(1,6),(2,4),(2,5),(2,6),(3,4),(3,5),(3,6)\}\)
\(\{(\{1,2\},4),(\{1,2\},\{5,6\}),(3,4),(3,\{5,6\})\}\)
练一练
假设左表键的所在列元素列表为[1,2,2,3,3,3],右表键与左表相同,请问用4种方式连接得到的结果表分别具有多少行?
4种连接方式都是\(1^2+2^2+3^2=14\)行
练一练
请构造两张表使它们在用merge()合并时能够通过“1:m”的检查但无法通过“m:1”模式的检查。
df_left = pd.DataFrame({"A":[1,2], "B":["a","b"]})
df_right = pd.DataFrame({"A":[1,1,2], "C":["c","d","e"]})
df_left.merge(df_right, on="A", validate="1:m")
A | B | C | |
---|---|---|---|
0 | 1 | a | c |
1 | 1 | a | d |
2 | 2 | b | e |
df_left.merge(df_right, on="A", validate="m:1")
---------------------------------------------------------------------------
MergeError Traceback (most recent call last)
Input In [4], in <module>
----> 1 df_left.merge(df_right, on="A", validate="m:1")
File ~\miniconda3\envs\final\lib\site-packages\pandas\core\frame.py:9329, in DataFrame.merge(self, right, how, on, left_on, right_on, left_index, right_index, sort, suffixes, copy, indicator, validate)
9310 @Substitution("")
9311 @Appender(_merge_doc, indents=2)
9312 def merge(
(...)
9325 validate: str | None = None,
9326 ) -> DataFrame:
9327 from pandas.core.reshape.merge import merge
-> 9329 return merge(
9330 self,
9331 right,
9332 how=how,
9333 on=on,
9334 left_on=left_on,
9335 right_on=right_on,
9336 left_index=left_index,
9337 right_index=right_index,
9338 sort=sort,
9339 suffixes=suffixes,
9340 copy=copy,
9341 indicator=indicator,
9342 validate=validate,
9343 )
File ~\miniconda3\envs\final\lib\site-packages\pandas\core\reshape\merge.py:107, in merge(left, right, how, on, left_on, right_on, left_index, right_index, sort, suffixes, copy, indicator, validate)
90 @Substitution("\nleft : DataFrame or named Series")
91 @Appender(_merge_doc, indents=0)
92 def merge(
(...)
105 validate: str | None = None,
106 ) -> DataFrame:
--> 107 op = _MergeOperation(
108 left,
109 right,
110 how=how,
111 on=on,
112 left_on=left_on,
113 right_on=right_on,
114 left_index=left_index,
115 right_index=right_index,
116 sort=sort,
117 suffixes=suffixes,
118 copy=copy,
119 indicator=indicator,
120 validate=validate,
121 )
122 return op.get_result()
File ~\miniconda3\envs\final\lib\site-packages\pandas\core\reshape\merge.py:710, in _MergeOperation.__init__(self, left, right, how, on, left_on, right_on, axis, left_index, right_index, sort, suffixes, copy, indicator, validate)
706 # If argument passed to validate,
707 # check if columns specified as unique
708 # are in fact unique.
709 if validate is not None:
--> 710 self._validate(validate)
File ~\miniconda3\envs\final\lib\site-packages\pandas\core\reshape\merge.py:1444, in _MergeOperation._validate(self, validate)
1442 elif validate in ["many_to_one", "m:1"]:
1443 if not right_unique:
-> 1444 raise MergeError(
1445 "Merge keys are not unique in right dataset; "
1446 "not a many-to-one merge"
1447 )
1449 elif validate in ["many_to_many", "m:m"]:
1450 pass
MergeError: Merge keys are not unique in right dataset; not a many-to-one merge
练一练
join()函数没有实现merge()函数中的validate参数,请构造1个join_with_validate函数,其参数包含df1、df2、on、how和validate,完成与merge()类似的功能。(允许在join_with_validate内部调用join()函数)
# is_unique未在教材中提到,读者可以使用集合去重再判断元素数量的方法
def join_with_validate(df1, df2, on, how="left", validate="m:m"):
df1_index_not_unique = not df1.index.is_unique
df2_index_not_unique = not df2.index.is_unique
if validate=="1:1" and (df1_index_not_unique or df2_index_not_unique):
raise ValueError(
"Join keys are not unique in dataset; "
"not a one-to-one merge"
)
if validate=="1:m" and df1_index_not_unique:
raise ValueError(
"Join keys are not unique in left dataset; "
"not a one-to-many merge"
)
if validate=="m:1" and df2_index_not_unique:
raise ValueError(
"Join keys are not unique in right dataset; "
"not a many-to-one merge"
)
return df1.join(df2, on=on, how=how)
练一练
给定两个具有相同行列索引的DataFrame,设s1和s2分别是传入规则函数的左表列和右表列,请依次根据如下规则更新进行组合:
当s1中的元素值为0时,使用s2对应位置元素更新,否则保持不变。
当s1中的元素值超过s2的均值时,使用s1+s2的对应位置元素更新,否则使用s1-s2的对应位置元素更新。
1
df1 = pd.DataFrame({"A":[0,2],"B":[4,1]})
df2 = pd.DataFrame({"A":[1,0],"B":[3,-1]})
df1.combine(df2, lambda s1, s2: s1.mask(s1==0, s2))
A | B | |
---|---|---|
0 | 1 | 4 |
1 | 2 | 1 |
2
df1.combine(df2, lambda s1, s2: (s1-s2).mask(s1>s2.mean(), s1+s2))
A | B | |
---|---|---|
0 | -1 | 7 |
1 | 2 | 2 |
一、合并员工信息表¶
在data/ch6/employee文件夹下,存放了某公司的员工信息。其中,salary目录下存放了从2018年1月至2020年12月时间段内,每月员工的基本工资数额,award目录下存放了每月员工的奖金数额,员工最终的工资等于基本工资加上奖金。(注:本题中数据均为随机生成,与现实无关)
info_a.csv、info_b.csv和info_c.csv中分别存放了公司员工的不同信息,请提取ID从“ID-000001”至“ID-025000”对应员工的邮箱、性别、年龄和学历,并将它们合并成一张表,索引为员工ID。
对所有ID在上题范围内的员工统计从18年1月至20年12月每个季度的工资总数,并将这12个季度的结果作为新的列添加至上一问的结果表中。统计2018年第1季度的结果时,列名即为“2018-Q1”,其他季度对应列的名字以此类推。
【解答】
1
res = pd.concat(
[
pd.read_csv(
"data/ch6/employee/info_%s.csv"%i
).set_index("ID")
for i in list("abc")
], axis=1
)
res = res.reindex(["ID-%06d"%(i) for i in range(1, 25001)])
res = res[["邮箱","性别","年龄","学历"]]
res.head()
邮箱 | 性别 | 年龄 | 学历 | |
---|---|---|---|---|
ID | ||||
ID-000001 | oZuSkKV@qq.com | 男 | 38.0 | 研究生 |
ID-000002 | IWmJnPR@yahoo.com | 女 | 37.0 | 高中 |
ID-000003 | NWIIsdP@hotmail.com | 男 | 49.0 | 高中 |
ID-000004 | lypSDjU@sina.com | 女 | 27.0 | 研究生 |
ID-000005 | ibvJneI@hotmail.com | 男 | 24.0 | 研究生 |
2
data = pd.concat(
[
pd.read_csv(
"data/ch6/employee/award/%d-%02d.csv"%(y,m), index_col="ID"
) + pd.read_csv(
"data/ch6/employee/salary/%d-%02d.csv"%(y,m), index_col="ID"
) for y in range(2018,2021) for m in range(1,13)
], axis=1
)
values = data.values.reshape(25000, -1, 3).sum(-1)
res_values = pd.DataFrame(
values,
index=res.index,
columns=["%d-Q%d"%(y,q) for y in range(2018,2021) for q in range(1,5)]
)
res = pd.concat([res, res_values], axis=1)
res.iloc[:5, :8] # 展示部分
邮箱 | 性别 | 年龄 | 学历 | 2018-Q1 | 2018-Q2 | 2018-Q3 | 2018-Q4 | |
---|---|---|---|---|---|---|---|---|
ID | ||||||||
ID-000001 | oZuSkKV@qq.com | 男 | 38.0 | 研究生 | 37200 | 40100 | 50400 | 43900 |
ID-000002 | IWmJnPR@yahoo.com | 女 | 37.0 | 高中 | 43400 | 40000 | 41900 | 44100 |
ID-000003 | NWIIsdP@hotmail.com | 男 | 49.0 | 高中 | 27800 | 43700 | 21100 | 48300 |
ID-000004 | lypSDjU@sina.com | 女 | 27.0 | 研究生 | 50900 | 33300 | 42700 | 49900 |
ID-000005 | ibvJneI@hotmail.com | 男 | 24.0 | 研究生 | 44100 | 40300 | 29600 | 36800 |
二、实现join函数¶
请按照如下要求实现join()函数:
函数的调用方式为join(df1, df2, how, lsuffix, rsuffix)
传入的df1和df2参数都为单级索引的DataFrame
how参数支持left、right、inner、outer和cross
给出测试样例,并与pandas中join()的运行结果进行对比
在实现过程中允许使用pd.concat()
备注
由于合并时可能产生缺失值,导致自定义join函数和pandas内置的join函数在列的dtype上会产生差别,此时使用equals()函数在这种情况下不能进行判定,可以使用pd.testing.assert_frame_equal(my_result, pandas_result, check_dtype=False)来进行对比。assert_frame_equal()通过check_dtype参数可以关闭列类型的检查,它在两个表存在差异时会报错,在两个表相同时不进行任何操作。
【解答】
def join(df1, df2, how, lsuffix=None, rsuffix=None):
idx1, idx2 = df1.columns, df2.columns
idx_intersect = idx1.intersection(idx2)
if len(idx_intersect) > 0:
if lsuffix==None or rsuffix==None:
raise ValueError(
"columns overlap but get suffix not specified: "
+ str(idx_intersect))
df1 = df1.rename(columns={i:i+lsuffix for i in idx_intersect})
df2 = df2.rename(columns={i:i+rsuffix for i in idx_intersect})
idx1, idx2 = df1.index, df2.index
idx = idx1.union(idx2).unique().sort_values()
columns = pd.Index(df1.columns.tolist() + df2.columns.tolist())
res = pd.DataFrame(columns=columns, index=pd.Index([], name=idx1.name))
for x in idx:
_idx1, _idx2 = idx1 == x, idx2 == x
in1, in2 = bool(_idx1.sum()), bool(_idx2.sum())
if in1 and in2:
_df1, _df2 = df1.loc[[x]], df2.loc[[x]]
if how in ["right"]:
for i in range(_df2.shape[0]):
_res = pd.concat([_df2.iloc[[i]]] * _df1.shape[0])
_res = pd.concat([_df1, _res], axis=1)
res = pd.concat([res, _res])
else:
for i in range(_df1.shape[0]):
_res = pd.concat([_df1.iloc[[i]]] * _df2.shape[0])
_res = pd.concat([_res, _df2], axis=1)
res = pd.concat([res, _res])
elif not in1 and how in ["right", "outer"]:
_res = df2.loc[[x]].copy()
for c in df1.columns:
_res[c] = np.nan
res = pd.concat([res, _res.reindex(columns, axis=1)])
elif not in2 and how in ["left", "outer"]:
_res = df1.loc[[x]].copy()
for c in df2.columns:
_res[c] = np.nan
res = pd.concat([res, _res])
return res
my_res_left = join(df1, df2, how="left", lsuffix="_x", rsuffix="_y")
my_res_right = join(df1, df2, how="right", lsuffix="_x", rsuffix="_y")
my_res_inner = join(df1, df2, how="inner", lsuffix="_x", rsuffix="_y")
my_res_outer = join(df1, df2, how="outer", lsuffix="_x", rsuffix="_y")
pd_res_left = df1.join(df2, lsuffix="_x", rsuffix="_y", how="left")
pd_res_right = df1.join(df2, lsuffix="_x", rsuffix="_y", how="right")
pd_res_inner = df1.join(df2, lsuffix="_x", rsuffix="_y", how="inner")
pd_res_outer = df1.join(df2, lsuffix="_x", rsuffix="_y", how="outer")
from pandas.testing import assert_frame_equal
assert_frame_equal(my_res_left, pd_res_left, check_dtype=False)
assert_frame_equal(my_res_right, pd_res_right, check_dtype=False)
assert_frame_equal(my_res_inner, pd_res_inner, check_dtype=False)
assert_frame_equal(my_res_outer, pd_res_outer, check_dtype=False)
三、条件连接¶
在本章介绍的关系型连接中,merge()、join()和concat()都是等值连接,即每一个左表键中的label只会与右表键中完全相同的label进行笛卡尔积的匹配。现在,我们希望左表中的键只要与右表中的键满足一定条件就进行匹配,下面给出一种根据大小关系匹配的例子。
假设df1和df2的构造如下:
df1 = pd.DataFrame({"Key":[0,1,1,2], "Col1":[10,20,30,40]})
df1
Key | Col1 | |
---|---|---|
0 | 0 | 10 |
1 | 1 | 20 |
2 | 1 | 30 |
3 | 2 | 40 |
df2 = pd.DataFrame({"Key":[1,1,2,3], "Col2":[50,60,70,80]})
df2
Key | Col2 | |
---|---|---|
0 | 1 | 50 |
1 | 1 | 60 |
2 | 2 | 70 |
3 | 3 | 80 |
我们希望通过conditional_merge()函数对df1和df2进行左连接,连接规则是左键元素值不得小于右键元素值,即conditional_merge(df1, df2, on="Key", how="left", rule="x>=y")的期望结果如下所示:
Key_x | Col1 | Key_y | Col2 | |
---|---|---|---|---|
0 | 0 | 10 | NaN | NaN |
1 | 1 | 20 | 1.0 | 50.0 |
2 | 1 | 20 | 1.0 | 60.0 |
3 | 1 | 30 | 1.0 | 50.0 |
4 | 1 | 30 | 1.0 | 60.0 |
5 | 2 | 40 | 1.0 | 50.0 |
6 | 2 | 40 | 1.0 | 60.0 |
7 | 2 | 40 | 2.0 | 70.0 |
实现上述根据大小关系连接的conditional_merge()函数,其中rule参数可取"x>=y"、"x>y"、"x==y"、"x!=y"、"x<=y"和"x<y"。此处仅实现左连接版本即可,即无需考虑how参数。
在data/ch6/left.csv和data/ch6/right.csv中分别存放了两张表,我们希望对两张表以经纬度(Longitude和Latitude)为键进行条件连接,连接规则是左键元素值和右键元素值的球面距离不得超过\(d\)千米,请实现这个连接函数spherical_merge(df1, df2, distance=d, on=["Longitude", "Latitude"])。此处由于左键和右键均无重复值,故无需考虑连接方式。球面距离的计算可以通过sklearn库的haversine_distances()函数实现,其安装方式为conda install scikit-learn,使用方法如代码所示。
from sklearn.metrics.pairwise import haversine_distances
df1 = pd.read_csv("data/ch6/left.csv").head()
df2 = pd.read_csv("data/ch6/right.csv").head()
def get_distance(df1, df2):
rad1 = np.stack([np.radians(df1.Latitude), np.radians(df1.Longitude)], axis=-1)
rad2 = np.stack([np.radians(df2.Latitude), np.radians(df2.Longitude)], axis=-1)
result = haversine_distances(rad1, rad2) * 6371000 / 1000 # 乘以地球半径并转为km
return result
get_distance(df1, df2) # 第i行第j列代表df1的第i个点到df2的第j个点的球面距离
array([[ 655.96280506, 1449.73574186, 276.84030423, 802.01737182,
540.95925895],
[1272.59889369, 387.16558823, 1358.01183355, 1582.95966523,
1069.38156885],
[ 669.35082711, 467.07489446, 742.49209123, 1002.65697447,
458.35353128],
[1104.42662432, 146.24538034, 1254.17621209, 1393.16408848,
973.28985989],
[ 388.63154244, 754.15164741, 427.75601403, 735.42275194,
172.40958435]])
【解答】
1
df1 = pd.DataFrame({"Key":[0,1,1,2], "Col1":[10,20,30,40]})
df2 = pd.DataFrame({"Key":[1,1,2,3], "Col2":[50,60,70,80]})
rule_helper = lambda rule: lambda x, y: eval(rule)
def conditional_merge(df1, df2, on, rule):
grouper = df1.groupby(on)
rule_func = rule_helper(rule)
def merge_helper(_df1):
left_key = _df1[on].iloc[0]
right_key = df2[on]
_df2 = df2[rule_func(left_key, right_key)]
if _df2.shape[0] == 0:
_df1 = _df1.rename(columns={on: on+"_x"})
_df2 = _df2.rename(columns={on: on+"_y"})
return pd.concat([_df1, _df2], axis=1)
else:
return _df1.merge(_df2, how="cross")
return grouper.apply(merge_helper).reset_index(drop=True)
conditional_merge(df1, df2, "Key", "x>=y")
Key_x | Col1 | Key_y | Col2 | |
---|---|---|---|---|
0 | 0 | 10 | NaN | NaN |
1 | 1 | 20 | 1.0 | 50.0 |
2 | 1 | 20 | 1.0 | 60.0 |
3 | 1 | 30 | 1.0 | 50.0 |
4 | 1 | 30 | 1.0 | 60.0 |
5 | 2 | 40 | 1.0 | 50.0 |
6 | 2 | 40 | 1.0 | 60.0 |
7 | 2 | 40 | 2.0 | 70.0 |
2
df1 = pd.read_csv("data/ch6/left.csv")
df2 = pd.read_csv("data/ch6/right.csv")
def spherical_merge(df1, df2, d=200, on=["Longitude", "Latitude"]):
if on[0] not in df1.columns or on[0] not in df2.columns:
raise ValueError("Longitude not in df1's columns or df2's columns.")
if on[1] not in df1.columns or on[1] not in df2.columns:
raise ValueError("Latitude not in df1's columns or df2's columns.")
distance = get_distance(df1, df2)
res = pd.concat(
[
df1.iloc[[i]].merge(df2.loc[distance[i] <= d], how="cross")
for i in range(df1.shape[0])
]
)
return res.reset_index(drop=True)
res = spherical_merge(df1, df2, 30)
res.head()
Longitude_x | Latitude_x | Col1 | Longitude_y | Latitude_y | Col2 | |
---|---|---|---|---|---|---|
0 | 121.102062 | 30.216142 | 0 | 121.243898 | 30.353029 | 7 |
1 | 115.885544 | 36.079370 | 0 | 115.947204 | 35.922811 | 2 |
2 | 113.791836 | 36.724240 | 9 | 113.709950 | 36.608670 | 4 |
3 | 128.633906 | 39.729151 | 1 | 128.536029 | 39.631795 | 2 |
4 | 128.633906 | 39.729151 | 1 | 128.636761 | 39.898670 | 5 |