关于人工智能:去中心化联邦学习Python实现的2个案例

4次阅读

共计 8755 个字符,预计需要花费 22 分钟才能阅读完成。

@TOC

介绍

随着人工智能技术的倒退,越来越多的数据被收集和应用。然而,这些数据通常扩散在不同的设施上,例如挪动设施、传感器等。联邦学习(Federated Learning)是一种解决这个问题的办法,它容许在不将数据传输到地方服务器的状况下进行模型训练。

然而,联邦学习也存在一些问题,例如数据隐衷爱护、通信效率等,并且有些利用场景须要更高的安全性和去中心化的个性。因而,呈现了去中心化的联邦学习(Decentralized Federated Learning)。

本文将简要介绍去中心化联邦学习的原理和实现形式,并提供 2 个 Python 代码案例。

原理

去中心化联邦学习的次要思维是将一个大的联邦学习零碎分解成多个小的联邦学习零碎,每个小零碎只蕴含一部分设施或节点。这样,每个设施或节点只须要与四周的几个设施或节点通信,就能够实现模型的训练和更新,从而保障了安全性和通信效率。

具体地,去中心化联邦学习能够分为两个阶段:选举和训练。在选举阶段,每个设施或节点都会向四周的几个设施或节点发送选票,选出一个首领设施或节点,作为本次训练的核心节点。在训练阶段,首领节点将模型参数播送给四周的设施或节点,这些设施或节点依据本地数据计算梯度并返回给首领节点,首领节点依据所有梯度更新模型参数。这个过程会循环迭代,直到模型收敛为止。


实现案例 1

上面咱们将演示如何应用 Python 实现一个简略的去中心化联邦学习零碎,该零碎包含三个设施或节点。每个设施或节点都有本人的数据集,咱们将应用 MNIST 手写数字数据集作为例子。

首先,咱们须要导入必要的库和模块:

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
import numpy as np
import random
import copy

而后,咱们定义一个设施或节点类,其中蕴含数据集、模型、优化器等信息:

class Device:
    def __init__(self, device_id, train_data, test_data):
        self.id = device_id
        self.train_data = train_data
        self.test_data = test_data
        self.model = nn.Sequential(nn.Linear(784, 128),
            nn.ReLU(),
            nn.Linear(128, 10),
            nn.LogSoftmax(dim=1)
        )
        self.optimizer = optim.SGD(self.model.parameters(), lr=0.01)

接下来,咱们定义一个简略的选举算法,选出一个首领设施或节点:

def elect_leader(devices):
    leaders = []
    for device in devices:
        if random.random() < 0.5:
            leaders.append(device)
    if len(leaders) == 0:
        return None
    else:
        return max(leaders, key=lambda x: x.id)

而后,咱们定义一个用于计算梯度的函数:

def calculate_gradients(device, model):
    loss_fn = nn.NLLLoss()
    data_loader = DataLoader(device.train_data, batch_size=32, shuffle=True)
    device_gradients = []
    for inputs, labels in data_loader:
        outputs = model(inputs.view(-1, 784))
        loss = loss_fn(outputs, labels)
        device.optimizer.zero_grad()
        loss.backward()
        device_gradients.append(copy.deepcopy(device.model.state_dict()))
    return device_gradients

最初,咱们定义一个训练函数,用于每个节点或设施的本地训练和参数更新:

def train_device(device, leader, num_rounds):
    for i in range(num_rounds):
        if leader is None or leader.id == device.id:
            leader = elect_leader(devices)
        device.model.load_state_dict(leader.model.state_dict())
        gradients = calculate_gradients(device, device.model)
        if leader.id != device.id:
            leader_gradients = leader.receive_gradients(gradients)
            device.model.load_state_dict(leader.model.state_dict())
        else:
            leader_gradients = gradients
        average_gradients = {}
        for key in leader_gradients[0].keys():
            average_gradients[key] = torch.mean(torch.stack([gradients[key] for gradients in leader_gradients]), dim=0)
        device.model.load_state_dict({key: value - 0.01 * average_gradients[key] for key, value in device.model.state_dict().items()})
    return device

在整个零碎中,每个节点或设施之间都须要通信,因而咱们还须要为设施类增加一个接管梯度的办法:

class Device:
    def __init__(self, device_id, train_data, test_data):
        self.id = device_id
        self.train_data = train_data
        self.test_data = test_data
        self.model = nn.Sequential(nn.Linear(784, 128),
            nn.ReLU(),
            nn.Linear(128, 10),
            nn.LogSoftmax(dim=1)
        )
        self.optimizer = optim.SGD(self.model.parameters(), lr=0.01)

    def receive_gradients(self, gradients):
        return gradients

当初,咱们能够创立三个设施或节点,并应用 MNIST 数据集进行训练:

train_set = datasets.MNIST(root='./data', train=True, download=True, transform=transforms.ToTensor())
test_set = datasets.MNIST(root='./data', train=False, download=True, transform=transforms.ToTensor())
train_size = len(train_set) // 3
test_size = len(test_set) // 3
device1 = Device(1, train_set[:train_size], test_set[:test_size])
device2 = Device(2, train_set[train_size:2*train_size], test_set[test_size:2*test_size])
device3 = Device(3, train_set[2*train_size:], test_set[2*test_size:])
devices = [device1, device2, device3]

for device in devices:
    train_device(device, None, 10)

test_loader = DataLoader(test_set, batch_size=32, shuffle=True)
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = device1.model(inputs.view(-1, 784))
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print('Accuracy of the network on the test set: %d %%' % (100 * correct / total))

这个代码会创立三个设施或节点,并将 MNIST 数据集分成三份调配给它们。而后,每个设施或节点将执行 10 轮的本地训练和参数更新,最初计算所有设施或节点的均匀准确率。

留神,在这个实现中,咱们应用了简略的选举算法和梯度均匀算法,理论中可能须要更简单和更平安的算法,以保证系统的可靠性和安全性。


实现案例 2

CIFAR-10 数据集进行去中心化联邦学习训练

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader

class Device:
    def __init__(self, device_id, train_data, test_data):
        self.id = device_id
        self.train_data = train_data
        self.test_data = test_data
        self.model = nn.Sequential(nn.Conv2d(3, 6, 5),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(6, 16, 5),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Flatten(),
            nn.Linear(16 * 5 * 5, 120),
            nn.ReLU(),
            nn.Linear(120, 84),
            nn.ReLU(),
            nn.Linear(84, 10)
        )
        self.optimizer = optim.SGD(self.model.parameters(), lr=0.01)

    def receive_gradients(self, gradients):
        return gradients

def calculate_gradients(device, model):
    device.model.load_state_dict(model.state_dict())
    device.optimizer.zero_grad()
    for inputs, labels in device.train_data:
        outputs = device.model(inputs)
        loss = nn.CrossEntropyLoss()(outputs, labels)
        loss.backward()
    gradients = copy.deepcopy(device.model.state_dict())
    device.optimizer.step()
    return gradients

def elect_leader(devices):
    return devices[torch.randint(0, len(devices), (1,)).item()]

def train_device(device, leader, num_rounds):
    for i in range(num_rounds):
        if leader is None or leader.id == device.id:
            leader = elect_leader(devices)
        device.model.load_state_dict(leader.model.state_dict())
        gradients = calculate_gradients(device, device.model)
        if leader.id != device.id:
            leader_gradients = leader.receive_gradients(gradients)
            device.model.load_state_dict(leader.model.state_dict())
        else:
            leader_gradients = gradients
        average_gradients = {}
        for key in leader_gradients.keys():
            average_gradients[key] = torch.mean(torch.stack([gradients[key] for gradients in leader_gradients]), dim=0)
        device.model.load_state_dict({key: value - 0.01 * average_gradients[key] for key, value in device.model.state_dict().items()})
    return device

class CIFAR10Dataset(Dataset):
    def __init__(self, data, labels):
        super().__init__()
        self.data = data
        self.labels = labels
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        img, label = self.data[index], self.labels[index]
        img = transforms.ToTensor()(img)
        return img, label

if __name__ == "__main__":
    transform = transforms.Compose([transforms.RandomHorizontalFlip(),
        transforms.RandomCrop(32, padding=4),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    ])

    train_set = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
    test_set = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

    train_size = len(train_set) // 3
    test_size = len(test_set) // 3

    device1 = Device(1, DataLoader(CIFAR10Dataset(train_set.data[:train_size], train_set.targets[:train_size]), batch_size=16), DataLoader(CIFAR10Dataset(test_set.data[:test_size], test_set.targets[:test_size]), batch_size=16))
    device2 = Device(2, DataLoader(CIFAR10Dataset(train_set.data[train_size:2*train_size], train_set.targets[train_size:2*train_size]), batch_size=16), DataLoader(CIFAR10Dataset(test_set.data[test_size:2*test_size], test_set.targets[test_size:2*test_size]), batch_size=16))
    device3 = Device(3, DataLoader(CIFAR10Dataset(train_set.data[2*train_size:], train_set.targets[2*train_size:]), batch_size=16), DataLoader(CIFAR10Dataset(test_set.data[2*test_size:], test_set.targets[2*test_size:]), batch_size=16))

    devices = [device1, device2, device3]

    for device in devices:
        train_device(device, None, 10)
    
    # 测试模型并输入评估后果
    for device in devices:
        device.model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for data in device.test_data:
                images, labels = data
                outputs = device.model(images)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        print(f"Device {device.id}: Accuracy on test set: {100 * correct / total}%")
    
    # 汇总所有设施的评估后果并输入均匀准确率
    total_correct = sum(d.correct for d in devices)
    total_samples = sum(d.total for d in devices)
    print(f"Overall Accuracy on test set: {100 * total_correct / total_samples}%")

    # 将模型保留到文件中
    torch.save(devices[0].model.state_dict(), 'mnist_model.pt')

以上代码实现了一个应用 CIFAR-10 数据集进行去中心化联邦学习的案例。具体步骤如下:

定义了一个 Device 类,用于示意参加训练的设施,在初始化时须要指定设施 id、训练数据和测试数据,并创立了一个蕴含卷积层、全连贯层和激活函数的神经网络模型以及优化器。

实现了一个 calculate_gradients 函数,用于通过在本地设施上训练模型并计算梯度。

实现了一个 elect_leader 函数,用于从参加训练的设施中随机抉择一个设施作为 leader。

实现了一个 train_device 函数,用于训练指定设施的模型,并依据 leader 的抉择来执行去中心化联邦学习的过程,最终返回训练后的设施对象。

定义了一个 CIFAR10Dataset 类,用于将 CIFAR-10 数据集转换为 PyTorch Dataset 对象。

在 main 函数中,定义了三个设施对象,并应用 CIFAR10Dataset 将数据集划分成三份别离调配给这三个设施。而后,针对每个设施,调用 train_device 函数进行训练,最终失去训练好的模型。

须要留神的是,因为 CIFAR-10 数据集较大,上述代码仅应用了数据集中的 1 / 3 作为样本进行训练和测试。如果须要应用残缺的数据集进行训练,须要思考分布式训练等技术来提高效率。

该局部定义了一个 transforms.Compose 对象,其中蕴含两个转换操作:

transforms.ToTensor() 将输出的 PIL 图像或 ndarray 转换为张量,并进行标准化到范畴 [0, 1]。

transforms.Normalize(mean, std) 用于对张量进行标准化。在这里,咱们将每个通道的平均值设置为 0.5,标准差设置为 0.5。

这些操作的目标是确保模型可能承受雷同模式的输出数据并对其进行正确的预测。在这里,咱们应用标准化来使得数据的取值范畴更加稳固,并且有助于进步模型的训练成果。

  • 定义数据预处理的转换。
  • 加载 MNIST 数据集。
  • 将 MNIST 数据集调配给每个设施。
  • 在每个设施上进行本地训练。
  • 测试模型并输入评估后果。
  • 汇总所有设施的评估后果并输入均匀准确率。
  • 将模型保留到文件中。

须要留神的是,在这里咱们假如所有设施都具备雷同的计算能力和存储容量,并且在每个设施上训练的数据集大小雷同。在理论状况下,不同设施的硬件配置和网络速度可能不同,因而须要依据理论状况对训练数据进行划分,以保障训练成果和工夫开销的均衡。


论断

去中心化联邦学习是一种解决数据隐衷爱护和通信效率等问题的无效办法。在本文中,咱们介绍了去中心化联邦学习的原理和实现形式,并提供了 2 个 Python 代码案例,演示了如何应用数据集训练简略的联邦学习零碎。

本文由 mdnice 多平台公布

正文完
 0