案例引入
某银行 A 与某互联网公司 B 达成了企业级的单干。互联网公司 A 与银行 B 有着一大部分重合的用户,A 有着客户上网行为等特色信息。B 有着客户的存贷状况等特色信息以及客户的标签信息——客户的还贷状况(Y)。B 心愿可能将他所独有的特色信息与 A 所独有的特色信息相结合,训练出一个更弱小的辨认客户信用风险的模型,但因为不同行业之间的行政手续,用户数据隐衷平安等因素,企业 A,B 无奈间接互通数据,联邦学习应运而生。
联邦学习概述
联邦学习的定义
联邦学习旨在建设一个基于散布数据集的联邦学习模型。在模型训练的过程中,模型相干的信息可能在各方之间替换(或者是以加密模式替换),但原始数据不能。这一替换不会裸露每个站点上数据的任何受爱护的隐衷局部。已训练好的联邦学习模型能够置于联邦学习零碎的各参与方,也能够在多方之间共享。
设有 N 位参与方 合作应用各自的训练数据集 来训练机器学习模型。传统的办法是将所有的数据 收集起来并且存储在同一个中央,例如存储在某一台云端数据服务器上,从而在该服务器上应用集中后的数据集训练失去一个机器学习模型 。在传统办法的训练过程中,任何一位参与方会将本人的数据裸露给服务器甚至其余参与方。联邦学习是一种不须要收集各参与方所有的数据便能合作训练一个模型 的机器学习过程。
设 和别离为集中型模型 和联邦型模型 的性能度量。在应用平安的联邦学习在分布式数据源上构建机器学习模型时,咱们容许在爱护用户隐衷的状况下,联邦学习模型的性能略低于集中型模型的性能。
其中 即为容许的性能损失。
联邦学习的分类
依据联邦学习所应用数据在各参与方的不同散布状况,咱们能够将联邦学习划分为三类:横向联邦学习(Horizontal Federated Learning, HFL)、纵向联邦学习(Vertical Federated Learning, VFL)和联邦迁徙学习(Federated Transfer Learning, FTL)。上面是这三种类型联邦学习所针对的不同数据分布状况:
- 横向联邦学习:不同参与方的数据有较大的特色的重叠(横向),但数据样本(纵向),即特色所属的样本的重叠度不高。例如,联邦学习的参与方是两家服务于不同区域市场的银行,他们所服务的客户群体差异较大,但客户的特色可能会因为类似的商业模式而重叠度较高。
- 纵向联邦学习:不同参与方的数据样本有较大的重叠,但样本特色的重叠度不高。例如,两家公司(银行和电子商务公司)向客户提供不同的服务,领有客户不同方面的数据,但他们所服务的客户群体有较大的重叠。
- 联邦迁徙学习:不同参与方的数据在特色和样本维度重叠度都不是十分高。
纵向联邦学习算法
纵向联邦学习算法有利于各企业之间建设单干,应用各自的特有数据,独特建设更加弱小的模型。本篇将着重介绍一种基于加法同态加密的纵向联邦学习算法。
利用情景
细化结尾的案例,企业 B 有特色 X3 和 Y(标签),可独立建模,企业 A 有特色 X1、X2,不足 Y,无奈独立建模,当初企业 A,B 单干,建设联结模型,显然成果会超过企业 B 单边数据建模。
但两方之间如何单干来独特训练一个模型呢?以逻辑回归为例,一个经典的逻辑回归的损失函数和梯度公式如下所示:
能够看到,梯度的计算离不开特色数据(x)和标签数据(y)。因而,一种最间接的数据交互方向就是其中一方将本人独有的数据间接以明文的形式发送给对方,由对方计算出梯度后再返回。但这样的交互方式会产生信息的泄露,其中一方会取得全副的信息,这显然是不符合规范的。
既然明文的传输不行,一种解决思路就是将须要的数据以密文的模式发送,但这又会产生另一个问题,其中一方取得另一方的密文数据后无奈解密,又如何进行计算呢?这时就须要引入同态加密算法。
同态加密算法简介
因为篇幅所限,这里将只介绍同态加密算法的作用,而不介绍其具体细节。
同态加密(Homomorphic Encryption)是一种非凡的加密办法,容许对密文进行解决失去依然是加密的后果,即对密文间接进行解决,跟对明文进行解决后再对处理结果加密,失去的后果雷同。从抽象代数的角度讲,放弃了同态性。
假如存在两个数 x、y,OP(x,y)示意 x 与 y 之间的一种操作运算(加、减、乘、除、指数……)。E(x)示意对 x 的加密操作,D(x)示意对 x 的解密操作,则当某种加密算法对某个操作 OP 满足同态性时,表达式如下:
或
依据算法所能反对的操作运算的范畴和次数的大小,能够将同态加密算法分为局部同态加密算法 (PHE)、些许同态加密算法(SHE) 和全同态加密算法(FHE),其反对的运算范畴与次数顺次扩充。本文之后的纵向联邦学习算法将基于 Paillier 算法实现,它是一种局部同态加密算法,反对加法以及与常数的乘法运算。上面我将基于 Python 的 phe 库演示 Paillier 算法的作用。
#phe 库须要装置
from phe import paillier
#生成公钥与私钥
public_key, private_key = paillier.generate_paillier_keypair()
#须要加密的数据
secret_number_list = [3.141592653, 300, -4.6e-12]
#公钥加密
encrypted_number_list = [public_key.encrypt(x) for x in secret_number_list]
#私钥解密
[private_key.decrypt(x) for x in encrypted_number_list]
反对加减法以及与常数的乘除法
a, b, c = encrypted_number_list
a_plus_5 = a + 5 #= a + 5
print("a + 5 =",private_key.decrypt(a_plus_5))
a_plus_b = a + b #= a + b
print("a + b =",private_key.decrypt(a_plus_b))
a_times_3_5 = a * 3.5 #= a * 3.5
print("a * 3.5 =",private_key.decrypt(a_times_3_5))
a_minus_1 = a - 1 #= a + (-1)
print("a - 1=",private_key.decrypt(a_minus_1))
a_div_minus_3_1 = a / -3.1 #= a * (-1/3.1)
print("a / -3.1 =",private_key.decrypt(a_div_minus_3_1))
a_minus_b = a - b #= a + (b*-1)
print("a - b =",private_key.decrypt(a_minus_b))
若一些函数外部的逻辑是加法或者是与常数的乘法,同样反对。
import numpy as np
enc_mean = np.mean(encrypted_number_list)
enc_dot = np.dot(encrypted_number_list, [2, -400.1, 5318008])
print("enc_mean:", private_key.decrypt(enc_mean))
print("enc_dot:", private_key.decrypt(enc_dot))
算法流程
逻辑回归的损失和梯度的公式中蕴含着指数运算,因而,如果要用 Paillier 算法进行加密,须要对原公式进行肯定的革新,使其仅用加法和乘法来示意。将指数运算革新为加法与乘法运算的一个罕用办法就是用泰勒开展来进行近似。
最终失去的转化后的梯度矩阵的上半局部就是参与方 A 更新其参数须要的梯度(其中蕴含了正则项),下半局部对应 B。咱们的指标是心愿参与方 A、B 可能尽量地进行独自的计算,再通过加密信息的交互取得各自的梯度计算结果,因而咱们须要对计算的工作进行肯定的划分,能够采纳以下的一种设计流程。
在每一轮参数更新中,各参与方须要按序进行如下的计算和交互:
- 参与方 A 和 B 各自初始化本人的参数,参与方 C 生成秘钥对并散发公钥给 A 和 B。
- 参与方 A 计算,应用公钥加密后发送给 B。参与方 B 计算,应用公钥加密后发送给 A。
- 此时 A 和 B 能各自计算 以及 ([[x]] 示意 x 的同态加密模式)。
- A 和 B 须要加密的梯度发送给 C 来进行解密,但为了防止 C 间接取得梯度信息,A 和 B 能够将梯度加上一个随机数 与再发送给 C。C 取得加密梯度进行后进行解密再返还 A 和 B。
- A 和 B 只须要再减去之间加的随机数就能取得实在的梯度,更新其参数。
代码实现
上面咱们将基于 Python 代码来实现这整个算法流程。为了更清晰地展示算法的流程,将极度简化交互流程的实现。
导入所需模块
import math
import numpy as np
from phe import paillier
import pandas as pd
from sklearn import datasets
from sklearn.datasets import load_diabetes
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
各参与方的定义
设置参与方的父类,各参与方都须要保留模型的参数、一些两头计算结果以及与其余参与方的连贯情况。
class Client:
def __init__(self, config):
## 模型参数
self.config = config
## 两头计算结果
self.data = {}
## 与其余节点的连贯情况
self.other_client = {}
## 与其余参与方建设连贯
def connect(self, client_name, target_client):
self.other_client[client_name] = target_client
## 向特定参与方发送数据
def send_data(self, data, target_client):
target_client.data.update(data)
参与方 A 在训练过程中仅提供特色数据。
class ClientA(Client):
def __init__(self, X, config):
super().__init__(config)
self.X = X
self.weights = np.zeros(X.shape[1])
def compute_z_a(self):
z_a = np.dot(self.X, self.weights)
return z_a
## 加密梯度的计算,对应 step4
def compute_encrypted_dJ_a(self, encrypted_u):
encrypted_dJ_a = self.X.T.dot(encrypted_u) + self.config['lambda'] * self.weights
return encrypted_dJ_a
## 参数的更新
def update_weight(self, dJ_a):
self.weights = self.weights - self.config["lr"] * dJ_a / len(self.X)
return
## A: step2
def task_1(self, client_B_name):
dt = self.data
assert "public_key" in dt.keys(), "Error:'public_key'from C in step 1 not successfully received."
public_key = dt['public_key']
z_a = self.compute_z_a()
u_a = 0.25 * z_a
z_a_square = z_a ** 2
encrypted_u_a = np.asarray([public_key.encrypt(x) for x in u_a])
encrypted_z_a_square = np.asarray([public_key.encrypt(x) for x in z_a_square])
dt.update({"encrypted_u_a": encrypted_u_a})
data_to_B = {"encrypted_u_a": encrypted_u_a, "encrypted_z_a_square": encrypted_z_a_square}
self.send_data(data_to_B, self.other_client[client_B_name])
## A: step3、4
def task_2(self, client_C_name):
dt = self.data
assert "encrypted_u_b" in dt.keys(), "Error:'encrypted_u_b'from B in step 1 not successfully received."
encrypted_u_b = dt['encrypted_u_b']
encrypted_u = encrypted_u_b + dt['encrypted_u_a']
encrypted_dJ_a = self.compute_encrypted_dJ_a(encrypted_u)
mask = np.random.rand(len(encrypted_dJ_a))
encrypted_masked_dJ_a = encrypted_dJ_a + mask
dt.update({"mask": mask})
data_to_C = {'encrypted_masked_dJ_a': encrypted_masked_dJ_a}
self.send_data(data_to_C, self.other_client[client_C_name])
## A: step6
def task_3(self):
dt = self.data
assert "masked_dJ_a" in dt.keys(), "Error:'masked_dJ_a'from C in step 2 not successfully received."
masked_dJ_a = dt['masked_dJ_a']
dJ_a = masked_dJ_a - dt['mask']
self.update_weight(dJ_a)
print(f"A weight: {self.weights}")
return
参与方 B 在训练过程中既提供特色数据,又提供标签数据。
class ClientB(Client):
def __init__(self, X, y, config):
super().__init__(config)
self.X = X
self.y = y
self.weights = np.zeros(X.shape[1])
self.data = {}
def compute_u_b(self):
z_b = np.dot(self.X, self.weights)
u_b = 0.25 * z_b - self.y + 0.5
return z_b, u_b
def compute_encrypted_dJ_b(self, encrypted_u):
encrypted_dJ_b = self.X.T.dot(encrypted_u) + self.config['lambda'] * self.weights
return encrypted_dJ_b
def update_weight(self, dJ_b):
self.weights = self.weights - self.config["lr"] * dJ_b / len(self.X)
## B: step2
def task_1(self, client_A_name):
try:
dt = self.data
assert "public_key" in dt.keys(), "Error:'public_key'from C in step 1 not successfully received."
public_key = dt['public_key']
except Exception as e:
print("B step 1 exception: %s" % e)
try:
z_b, u_b = self.compute_u_b()
encrypted_u_b = np.asarray([public_key.encrypt(x) for x in u_b])
dt.update({"encrypted_u_b": encrypted_u_b})
dt.update({"z_b": z_b})
except Exception as e:
print("Wrong 1 in B: %s" % e)
data_to_A= {"encrypted_u_b": encrypted_u_b}
self.send_data(data_to_A, self.other_client[client_A_name])
## B: step3、4
def task_2(self,client_C_name):
try:
dt = self.data
assert "encrypted_u_a" in dt.keys(), "Error:'encrypt_u_a'from A in step 1 not successfully received."
encrypted_u_a = dt['encrypted_u_a']
encrypted_u = encrypted_u_a + dt['encrypted_u_b']
encrypted_dJ_b = self.compute_encrypted_dJ_b(encrypted_u)
mask = np.random.rand(len(encrypted_dJ_b))
encrypted_masked_dJ_b = encrypted_dJ_b + mask
dt.update({"mask": mask})
except Exception as e:
print("B step 2 exception: %s" % e)
try:
assert "encrypted_z_a_square" in dt.keys(), "Error:'encrypted_z_a_square'from A in step 1 not successfully received."
encrypted_z = 4*encrypted_u_a + dt['z_b']
encrypted_loss = np.sum((0.5-self.y)*encrypted_z + 0.125*dt["encrypted_z_a_square"] + 0.125*dt["z_b"] * (encrypted_z+4*encrypted_u_a))
except Exception as e:
print("B step 2 exception: %s" % e)
data_to_C = {"encrypted_masked_dJ_b": encrypted_masked_dJ_b, "encrypted_loss": encrypted_loss}
self.send_data(data_to_C, self.other_client[client_C_name])
## B: step6
def task_3(self):
try:
dt = self.data
assert "masked_dJ_b" in dt.keys(), "Error:'masked_dJ_b'from C in step 2 not successfully received."
masked_dJ_b = dt['masked_dJ_b']
dJ_b = masked_dJ_b - dt['mask']
self.update_weight(dJ_b)
except Exception as e:
print("A step 3 exception: %s" % e)
print(f"B weight: {self.weights}")
return
参与方 C 在整个训练过程中次要的作用就是散发秘钥,以及最初的对 A 和 B 加密梯度的解密。
class ClientC(Client):
"""Client C as trusted dealer."""
def __init__(self, A_d_shape, B_d_shape, config):
super().__init__(config)
self.A_data_shape = A_d_shape
self.B_data_shape = B_d_shape
self.public_key = None
self.private_key = None
## 保留训练中的损失值(泰开展近似)self.loss = []
## C: step1
def task_1(self, client_A_name, client_B_name):
try:
public_key, private_key = paillier.generate_paillier_keypair()
self.public_key = public_key
self.private_key = private_key
except Exception as e:
print("C step 1 error 1: %s" % e)
data_to_AB = {"public_key": public_key}
self.send_data(data_to_AB, self.other_client[client_A_name])
self.send_data(data_to_AB, self.other_client[client_B_name])
return
## C: step5
def task_2(self, client_A_name, client_B_name):
try:
dt = self.data
assert "encrypted_masked_dJ_a" in dt.keys() and "encrypted_masked_dJ_b" in dt.keys(), "Error:'masked_dJ_a'from A or'masked_dJ_b'from B in step 2 not successfully received."
encrypted_masked_dJ_a = dt['encrypted_masked_dJ_a']
encrypted_masked_dJ_b = dt['encrypted_masked_dJ_b']
masked_dJ_a = np.asarray([self.private_key.decrypt(x) for x in encrypted_masked_dJ_a])
masked_dJ_b = np.asarray([self.private_key.decrypt(x) for x in encrypted_masked_dJ_b])
except Exception as e:
print("C step 2 exception: %s" % e)
try:
assert "encrypted_loss" in dt.keys(), "Error:'encrypted_loss'from B in step 2 not successfully received."
encrypted_loss = dt['encrypted_loss']
loss = self.private_key.decrypt(encrypted_loss) / self.A_data_shape[0] + math.log(2)
print("******loss:", loss, "******")
self.loss.append(loss)
except Exception as e:
print("C step 2 exception: %s" % e)
data_to_A = {"masked_dJ_a": masked_dJ_a}
data_to_B = {"masked_dJ_b": masked_dJ_b}
self.send_data(data_to_A, self.other_client[client_A_name])
self.send_data(data_to_B, self.other_client[client_B_name])
return
模仿数据的生成
这里将基于 sklearn 中的乳腺癌数据集生成一组模仿数据,参与方 A 取得局部特色数据,参与方 B 取得局部特色数据与标签数据。
def load_data():
# 加载数据
breast = load_breast_cancer()
# 数据拆分
X_train, X_test, y_train, y_test = train_test_split(breast.data, breast.target, random_state=1)
# 数据标准化
std = StandardScaler()
X_train = std.fit_transform(X_train)
X_test = std.transform(X_test)
return X_train, y_train, X_test, y_test
## 将特色调配给 A 和 B
def vertically_partition_data(X, X_test, A_idx, B_idx):
"""
Vertically partition feature for party A and B
:param X: train feature
:param X_test: test feature
:param A_idx: feature index of party A
:param B_idx: feature index of party B
:return: train data for A, B; test data for A, B
"""
XA = X[:, A_idx]
XB = X[:, B_idx]
XB = np.c_[np.ones(X.shape[0]), XB]
XA_test = X_test[:, A_idx]
XB_test = X_test[:, B_idx]
XB_test = np.c_[np.ones(XB_test.shape[0]), XB_test]
return XA, XB, XA_test, XB_test
训练流程的实现
def vertical_logistic_regression(X, y, X_test, y_test, config):
"""
Start the processes of the three clients: A, B and C.
:param X: features of the training dataset
:param y: labels of the training dataset
:param X_test: features of the test dataset
:param y_test: labels of the test dataset
:param config: the config dict
:return: True
"""
## 获取数据
XA, XB, XA_test, XB_test = vertically_partition_data(X, X_test, config['A_idx'], config['B_idx'])
print('XA:',XA.shape, 'XB:',XB.shape)
## 各参与方的初始化
client_A = ClientA(XA, config)
print("Client_A successfully initialized.")
client_B = ClientB(XB, y, config)
print("Client_B successfully initialized.")
client_C = ClientC(XA.shape, XB.shape, config)
print("Client_C successfully initialized.")
## 各参与方之间连贯的建设
client_A.connect("B", client_B)
client_A.connect("C", client_C)
client_B.connect("A", client_A)
client_B.connect("C", client_C)
client_C.connect("A", client_A)
client_C.connect("B", client_B)
## 训练
for i in range(config['n_iter']):
client_C.task_1("A", "B")
client_A.task_1("B")
client_B.task_1("A")
client_A.task_2("C")
client_B.task_2("C")
client_C.task_2("A", "B")
client_A.task_3()
client_B.task_3()
print("All process done.")
return True
config = {
'n_iter': 100,
'lambda': 10,
'lr': 0.05,
'A_idx': [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29],
'B_idx': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
}
X, y, X_test, y_test = load_data()
vertical_logistic_regression(X, y, X_test, y_test, config)
训练成果
为测试该纵向联邦学习算法的训练成果。能够设置一般的集中式训练的逻辑回归算法作为对照组,基于乳腺癌数据集,应用雷同的训练集数据及雷同的逻辑回归模型来进行训练,察看其损失值的降落曲线以及在雷同测试集上的预测准确率。
以下是两种状况下,训练的损失值的降落状况:
各曲线代表的情景:
Logistic: 一般逻辑回归的损失值变动曲线,应用的是失常的损失函数
Taylor_Logistic: 一般逻辑回归的损失值变动曲线,应用的是泰勒开展拟合的损失函数
Taylor_Taylor:纵向逻辑回归的损失值变动曲线,应用的是泰勒开展拟合的损失函数
以下是在 sklearn 中不同数据集上,一般逻辑回归与纵向逻辑回归的训练后果的正确率及 AUC 的差别,其中 rows 代表样本数量,feat 代表特色数量,logistic 代表集中式逻辑回归的训练后果,Vertical 代表纵向联邦学习算法的训练成果。
由训练后果的比拟能够看到,与一般的逻辑回归相比,该纵向逻辑回归算法在保障各方数据隐衷性的同时,在试验数据集上可能达到不错的训练成果。
参考文献
[1] Yang Q , Liu Y , Chen T , et al. Federated Machine Learning: Concept and Applications[J]. ACM Transactions on Intelligent Systems and Technology, 2019, 10(2):1-19.
[2] Hardy S , Henecka W , Ivey-Law H , et al. Private federated learning on vertically partitioned data via entity resolution and additively homomorphic encryption[J]. 2017.
[3] https://zhuanlan.zhihu.com/p/94105330