关于人工智能:FATE联邦学习自定义数据集自定义神经网络模型下的横向纵向训练

7次阅读

共计 10916 个字符,预计需要花费 28 分钟才能阅读完成。

前言

代码大部分来自

  • https://fate.readthedocs.io/en/latest/tutorial/pipeline/nn_tu…
  • https://fate.readthedocs.io/en/latest/tutorial/pipeline/nn_tu…

然而官网的文档不残缺,我本人记录残缺一下。

我用的是 mnist 数据集 https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/fate/examples/data/mnist.zip。
目录构造如下,横向的话,加载两次 mnist 就能够,而纵向一方加载 mnist_guest 带标签,一方加载 mnist_host 没有标签。mnist12 两个文件夹没有用,不必管。

因为官网 demo 中的须要应用 jupyter,不适宜一般 Python 代码,本文给出此例子。在 Python 的解释器上,要留神在环境变量里退出 FATE 的安装包里的 bin/init_env.sh 外面的 Python 解释器门路,否则 federatedml 库会找不到。

横向

自定义数据集

自定义数据集,而后再本地测试一下。

import os
from torchvision.datasets import ImageFolder
from torchvision import transforms
from federatedml.nn.dataset.base import Dataset

class MNISTDataset(Dataset):
    
    def __init__(self, flatten_feature=False): # flatten feature or not 
        super(MNISTDataset, self).__init__()
        self.image_folder = None
        self.ids = None
        self.flatten_feature = flatten_feature
        
    def load(self, path):  # read data from path, and set sample ids
        # read using ImageFolder
        self.image_folder = ImageFolder(root=path, transform=transforms.Compose([transforms.ToTensor()]))
        # filename as the image id
        ids = []
        for image_name in self.image_folder.imgs:
            ids.append(image_name[0].split('/')[-1].replace('.jpg', ''))
        self.ids = ids
        return self

    def get_sample_ids(self):  # implement the get sample id interface, simply return ids
        return self.ids
    
    def __len__(self,):  # return the length of the dataset
        return len(self.image_folder)
    
    def __getitem__(self, idx): # get item
        ret = self.image_folder[idx]
        if self.flatten_feature:
            img = ret[0][0].flatten() # return flatten tensor 784-dim
            return img, ret[1] # return tensor and label
        else:
            return ret



ds = MNISTDataset(flatten_feature=True)
ds.load('mnist/')
# print(len(ds))
# print(ds[0])
# print(ds.get_sample_ids()[0])

胜利输入后,要手动在 FAET/federatedml.nn.datasets 下新建数据集文件,把上文的代码扩充成组件类的模式,如下

import torch
from federatedml.nn.dataset.base import Dataset
from torchvision.datasets import ImageFolder
from torchvision import transforms
import numpy as np
# 这里的包缺什么补什么哈

class MNISTDataset(Dataset):
    
    def __init__(self, flatten_feature=False): # flatten feature or not 
        super(MNISTDataset, self).__init__()
        self.image_folder = None
        self.ids = None
        self.flatten_feature = flatten_feature
        
    def load(self, path):  # read data from path, and set sample ids
        # read using ImageFolder
        self.image_folder = ImageFolder(root=path, transform=transforms.Compose([transforms.ToTensor()]))
        # filename as the image id
        ids = []
        for image_name in self.image_folder.imgs:
            ids.append(image_name[0].split('/')[-1].replace('.jpg', ''))
        self.ids = ids
        return self

    def get_sample_ids(self):  # implement the get sample id interface, simply return ids
        return self.ids
    
    def __len__(self,):  # return the length of the dataset
        return len(self.image_folder)
    
    def __getitem__(self, idx): # get item
        ret = self.image_folder[idx]
        if self.flatten_feature:
            img = ret[0][0].flatten() # return flatten tensor 784-dim
            return img, ret[1] # return tensor and label
        else:
            return ret


if __name__ == '__main__':
    pass

这样就实现了他官网文档所谓的“手动增加”了。增加后 federatedml 的目录应该是这样的 文件名称要和下文的 dataset param 对应
增加后,FATE 就“意识”咱们自建的数据集了。
下文中的 local test 是不须要做手动增加的步骤的,然而 local 只是做个测试。生产中没什么用……

横向训练

import os
from torchvision.datasets import ImageFolder
from torchvision import transforms
from federatedml.nn.dataset.base import Dataset

# test local process
# from federatedml.nn.homo.trainer.fedavg_trainer import FedAVGTrainer
# trainer = FedAVGTrainer(epochs=3, batch_size=256, shuffle=True, data_loader_worker=8, pin_memory=False) # set parameter

# trainer.local_mode() 

# import torch as t
# from pipeline import fate_torch_hook
# fate_torch_hook(t)
# # our simple classification model:
# model = t.nn.Sequential(#     t.nn.Linear(784, 32),
#     t.nn.ReLU(),
#     t.nn.Linear(32, 10),
#     t.nn.Softmax(dim=1)
# )

# trainer.set_model(model) # set model

# optimizer = t.optim.Adam(model.parameters(), lr=0.01)  # optimizer
# loss = t.nn.CrossEntropyLoss()  # loss function
# trainer.train(train_set=ds, optimizer=optimizer, loss=loss)  # use dataset we just developed



# 必须在 federatedml.nn.datasets 目录下  手动退出新的数据集的信息!https://blog.csdn.net/Yonggie/article/details/129404212
# real training
import torch as t
from torch import nn
from pipeline import fate_torch_hook
from pipeline.component import HomoNN
from pipeline.backend.pipeline import PipeLine
from pipeline.component import Reader, Evaluation, DataTransform
from pipeline.interface import Data, Model

t = fate_torch_hook(t)

import os
# bind data path to name & namespace
fate_project_path = os.path.abspath('./')
host = 1
guest = 2
arbiter = 3
pipeline = PipeLine().set_initiator(role='guest', party_id=guest).set_roles(guest=guest, host=host,
                                                                            arbiter=arbiter)

data_0 = {"name": "mnist_guest", "namespace": "experiment"}
data_1 = {"name": "mnist_host", "namespace": "experiment"}

data_path_0 = fate_project_path + '/mnist'
data_path_1 = fate_project_path + '/mnist'
pipeline.bind_table(name=data_0['name'], namespace=data_0['namespace'], path=data_path_0)
pipeline.bind_table(name=data_1['name'], namespace=data_1['namespace'], path=data_path_1)



reader_0 = Reader(name="reader_0")
reader_0.get_party_instance(role='guest', party_id=guest).component_param(table=data_0)
reader_0.get_party_instance(role='host', party_id=host).component_param(table=data_1)



from pipeline.component.nn import DatasetParam

dataset_param = DatasetParam(dataset_name='mnist_dataset', flatten_feature=True)  # specify dataset, and its init parameters


from pipeline.component.homo_nn import TrainerParam  # Interface

# our simple classification model:
model = t.nn.Sequential(t.nn.Linear(784, 32),
    t.nn.ReLU(),
    t.nn.Linear(32, 10),
    t.nn.Softmax(dim=1)
)

nn_component = HomoNN(name='nn_0',
                      model=model, # model
                      loss=t.nn.CrossEntropyLoss(),  # loss
                      optimizer=t.optim.Adam(model.parameters(), lr=0.01), # optimizer
                      dataset=dataset_param,  # dataset
                      trainer=TrainerParam(trainer_name='fedavg_trainer', epochs=2, batch_size=1024, validation_freqs=1),
                      torch_seed=100 # random seed
                      )


pipeline.add_component(reader_0)
pipeline.add_component(nn_component, data=Data(train_data=reader_0.output.data))
pipeline.add_component(Evaluation(name='eval_0', eval_type='multi'), data=Data(data=nn_component.output.data))


pipeline.compile()
pipeline.fit()


# print result and summary
pipeline.get_component('nn_0').get_output_data()
pipeline.get_component('nn_0').get_summary()

纵向

会用到 mnist_host 和 mnist guest,下载

guest data: https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/fa…

host data: https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/fa…

你查看一下数据集的格局。FATE 外面的纵向,都是一方有标签,一方没标签,跟我所认知的合并数据集那种场景有差异。

纵向数据集

做法参考横向那里,我这里只给出新建的类的代码,跟横向的有一点点差异。

import torch
from federatedml.nn.dataset.base import Dataset
from torchvision.datasets import ImageFolder
from torchvision import transforms
import numpy as np

class MNISTDataset(Dataset):
    
    def __init__(self, return_label=True):  
        super(MNISTDataset, self).__init__() 
        self.return_label = return_label
        self.image_folder = None
        self.ids = None
        
    def load(self, path):  
        
        self.image_folder = ImageFolder(root=path, transform=transforms.Compose([transforms.ToTensor()]))
        ids = []
        for image_name in self.image_folder.imgs:
            ids.append(image_name[0].split('/')[-1].replace('.jpg', ''))
        self.ids = ids

        return self

    def get_sample_ids(self,):
        return self.ids
        
    def get_classes(self,):
        return np.unique(self.image_folder.targets).tolist()
    
    def __len__(self,):  
        return len(self.image_folder)
    
    def __getitem__(self, idx): # get item 
        ret = self.image_folder[idx]
        img = ret[0][0].flatten() # flatten tensor 784 dims
        if self.return_label:
            return img, ret[1] # img & label
        else:
            return img # no label, for host

if __name__ == '__main__':
    pass

纵向训练

具体的正文都放在外面了。

import numpy as np
from federatedml.nn.dataset.base import Dataset
from torchvision.datasets import ImageFolder
from torchvision import transforms



# 本地定义的
# class MNISTHetero(Dataset):
    
#     def __init__(self, return_label=True):  
#         super(MNISTHetero, self).__init__() 
#         self.return_label = return_label
#         self.image_folder = None
#         self.ids = None
        
#     def load(self, path):  
        
#         self.image_folder = ImageFolder(root=path, transform=transforms.Compose([transforms.ToTensor()]))
#         ids = []
#         for image_name in self.image_folder.imgs:
#             ids.append(image_name[0].split('/')[-1].replace('.jpg', ''))
#         self.ids = ids

#         return self

#     def get_sample_ids(self,):
#         return self.ids
        
#     def get_classes(self,):
#         return np.unique(self.image_folder.targets).tolist()
    
#     def __len__(self,):  
#         return len(self.image_folder)
    
#     def __getitem__(self, idx): # get item 
#         ret = self.image_folder[idx]
#         img = ret[0][0].flatten() # flatten tensor 784 dims
#         if self.return_label:
#             return img, ret[1] # img & label
#         else:
#             return img # no label, for host


# test guest dataset
# ds = MNISTHetero().load('mnist_guest/')
# print(len(ds))
# print(ds[0][0]) 
# print(ds.get_classes())
# print(ds.get_sample_ids()[0: 10])

# test host dataset
# ds = MNISTHetero(return_label=False).load('mnist_host')
# print(len(ds))
# print(ds[0]) # no label


import os
import torch as t
from torch import nn
from pipeline import fate_torch_hook
from pipeline.component import HeteroNN
from pipeline.component.hetero_nn import DatasetParam
from pipeline.backend.pipeline import PipeLine
from pipeline.component import Reader, Evaluation, DataTransform
from pipeline.interface import Data, Model

fate_torch_hook(t)

# bind path to fate name&namespace
fate_project_path = os.path.abspath('./')
guest = 4
host = 3

pipeline_img = PipeLine().set_initiator(role='guest', party_id=guest).set_roles(guest=guest, host=host)

guest_data = {"name": "mnist_guest", "namespace": "experiment"}
host_data = {"name": "mnist_host", "namespace": "experiment"}

guest_data_path = fate_project_path + '/mnist_guest'
host_data_path = fate_project_path + '/mnist_host'
pipeline_img.bind_table(name='mnist_guest', namespace='experiment', path=guest_data_path)
pipeline_img.bind_table(name='mnist_host', namespace='experiment', path=host_data_path)



guest_data = {"name": "mnist_guest", "namespace": "experiment"}
host_data = {"name": "mnist_host", "namespace": "experiment"}
reader_0 = Reader(name="reader_0")
reader_0.get_party_instance(role='guest', party_id=guest).component_param(table=guest_data)
reader_0.get_party_instance(role='host', party_id=host).component_param(table=host_data)


# 这里为什么要这样定义,能够看文档,有模型 https://fate.readthedocs.io/en/latest/federatedml_component/hetero_nn/
hetero_nn_0 = HeteroNN(name="hetero_nn_0", epochs=3,
                       interactive_layer_lr=0.01, batch_size=512, task_type='classification', seed=100
                       )
guest_nn_0 = hetero_nn_0.get_party_instance(role='guest', party_id=guest)
host_nn_0 = hetero_nn_0.get_party_instance(role='host', party_id=host)

# define model
# image features 784, guest bottom model
# our simple classification model:
guest_bottom = t.nn.Sequential(t.nn.Linear(784, 8),
    t.nn.ReLU())

# image features 784, host bottom model
host_bottom = t.nn.Sequential(t.nn.Linear(784, 8),
    t.nn.ReLU())

# Top Model, a classifier
guest_top = t.nn.Sequential(nn.Linear(8, 10),
    nn.Softmax(dim=1)
)

# interactive layer define
interactive_layer = t.nn.InteractiveLayer(out_dim=8, guest_dim=8, host_dim=8)

# add models,依据文档定义,guest 要 add2 个,host 只有一个
guest_nn_0.add_top_model(guest_top)
guest_nn_0.add_bottom_model(guest_bottom)
host_nn_0.add_bottom_model(host_bottom)

# opt, loss
optimizer = t.optim.Adam(lr=0.01) 
loss = t.nn.CrossEntropyLoss()

# use DatasetParam to specify dataset and pass parameters
# 留神和你手动退出的文件库名字要对应
guest_nn_0.add_dataset(DatasetParam(dataset_name='mnist_hetero', return_label=True))
host_nn_0.add_dataset(DatasetParam(dataset_name='mnist_hetero', return_label=False))

hetero_nn_0.set_interactive_layer(interactive_layer)
hetero_nn_0.compile(optimizer=optimizer, loss=loss)




pipeline_img.add_component(reader_0)
pipeline_img.add_component(hetero_nn_0, data=Data(train_data=reader_0.output.data))
pipeline_img.add_component(Evaluation(name='eval_0', eval_type='multi'), data=Data(data=hetero_nn_0.output.data))
pipeline_img.compile()

pipeline_img.fit()


# 取得后果
pipeline_img.get_component('hetero_nn_0').get_output_data()  # get result
正文完
 0