@TOC
介绍
随着人工智能技术的倒退,越来越多的数据被收集和应用。然而,这些数据通常扩散在不同的设施上,例如挪动设施、传感器等。联邦学习(Federated Learning)是一种解决这个问题的办法,它容许在不将数据传输到地方服务器的状况下进行模型训练。
然而,联邦学习也存在一些问题,例如数据隐衷爱护、通信效率等,并且有些利用场景须要更高的安全性和去中心化的个性。因而,呈现了去中心化的联邦学习(Decentralized Federated Learning)。
本文将简要介绍去中心化联邦学习的原理和实现形式,并提供2个Python代码案例。
原理
去中心化联邦学习的次要思维是将一个大的联邦学习零碎分解成多个小的联邦学习零碎,每个小零碎只蕴含一部分设施或节点。这样,每个设施或节点只须要与四周的几个设施或节点通信,就能够实现模型的训练和更新,从而保障了安全性和通信效率。
具体地,去中心化联邦学习能够分为两个阶段:选举和训练。在选举阶段,每个设施或节点都会向四周的几个设施或节点发送选票,选出一个首领设施或节点,作为本次训练的核心节点。在训练阶段,首领节点将模型参数播送给四周的设施或节点,这些设施或节点依据本地数据计算梯度并返回给首领节点,首领节点依据所有梯度更新模型参数。这个过程会循环迭代,直到模型收敛为止。
实现案例1
上面咱们将演示如何应用Python实现一个简略的去中心化联邦学习零碎,该零碎包含三个设施或节点。每个设施或节点都有本人的数据集,咱们将应用MNIST手写数字数据集作为例子。
首先,咱们须要导入必要的库和模块:
import torchimport torch.nn as nnimport torch.optim as optimimport torchvision.datasets as datasetsimport torchvision.transforms as transformsfrom torch.utils.data import DataLoader, Datasetimport numpy as npimport randomimport copy
而后,咱们定义一个设施或节点类,其中蕴含数据集、模型、优化器等信息:
class Device: def __init__(self, device_id, train_data, test_data): self.id = device_id self.train_data = train_data self.test_data = test_data self.model = nn.Sequential( nn.Linear(784, 128), nn.ReLU(), nn.Linear(128, 10), nn.LogSoftmax(dim=1) ) self.optimizer = optim.SGD(self.model.parameters(), lr=0.01)
接下来,咱们定义一个简略的选举算法,选出一个首领设施或节点:
def elect_leader(devices): leaders = [] for device in devices: if random.random() < 0.5: leaders.append(device) if len(leaders) == 0: return None else: return max(leaders, key=lambda x: x.id)
而后,咱们定义一个用于计算梯度的函数:
def calculate_gradients(device, model): loss_fn = nn.NLLLoss() data_loader = DataLoader(device.train_data, batch_size=32, shuffle=True) device_gradients = [] for inputs, labels in data_loader: outputs = model(inputs.view(-1, 784)) loss = loss_fn(outputs, labels) device.optimizer.zero_grad() loss.backward() device_gradients.append(copy.deepcopy(device.model.state_dict())) return device_gradients
最初,咱们定义一个训练函数,用于每个节点或设施的本地训练和参数更新:
def train_device(device, leader, num_rounds): for i in range(num_rounds): if leader is None or leader.id == device.id: leader = elect_leader(devices) device.model.load_state_dict(leader.model.state_dict()) gradients = calculate_gradients(device, device.model) if leader.id != device.id: leader_gradients = leader.receive_gradients(gradients) device.model.load_state_dict(leader.model.state_dict()) else: leader_gradients = gradients average_gradients = {} for key in leader_gradients[0].keys(): average_gradients[key] = torch.mean(torch.stack([gradients[key] for gradients in leader_gradients]), dim=0) device.model.load_state_dict({key: value - 0.01 * average_gradients[key] for key, value in device.model.state_dict().items()}) return device
在整个零碎中,每个节点或设施之间都须要通信,因而咱们还须要为设施类增加一个接管梯度的办法:
class Device: def __init__(self, device_id, train_data, test_data): self.id = device_id self.train_data = train_data self.test_data = test_data self.model = nn.Sequential( nn.Linear(784, 128), nn.ReLU(), nn.Linear(128, 10), nn.LogSoftmax(dim=1) ) self.optimizer = optim.SGD(self.model.parameters(), lr=0.01) def receive_gradients(self, gradients): return gradients
当初,咱们能够创立三个设施或节点,并应用MNIST数据集进行训练:
train_set = datasets.MNIST(root='./data', train=True, download=True, transform=transforms.ToTensor())test_set = datasets.MNIST(root='./data', train=False, download=True, transform=transforms.ToTensor())train_size = len(train_set) // 3test_size = len(test_set) // 3device1 = Device(1, train_set[:train_size], test_set[:test_size])device2 = Device(2, train_set[train_size:2*train_size], test_set[test_size:2*test_size])device3 = Device(3, train_set[2*train_size:], test_set[2*test_size:])devices = [device1, device2, device3]for device in devices: train_device(device, None, 10)test_loader = DataLoader(test_set, batch_size=32, shuffle=True)correct = 0total = 0with torch.no_grad(): for inputs, labels in test_loader: outputs = device1.model(inputs.view(-1, 784)) _, predicted = torch.max(outputs.data, 1) total += labels.size(0) correct += (predicted == labels).sum().item()print('Accuracy of the network on the test set: %d %%' % (100 * correct / total))
这个代码会创立三个设施或节点,并将MNIST数据集分成三份调配给它们。而后,每个设施或节点将执行10轮的本地训练和参数更新,最初计算所有设施或节点的均匀准确率。
留神,在这个实现中,咱们应用了简略的选举算法和梯度均匀算法,理论中可能须要更简单和更平安的算法,以保证系统的可靠性和安全性。
实现案例2
CIFAR-10数据集进行去中心化联邦学习训练
import torchimport torch.nn as nnimport torch.optim as optimimport torchvisionimport torchvision.transforms as transformsfrom torch.utils.data import Dataset, DataLoaderclass Device: def __init__(self, device_id, train_data, test_data): self.id = device_id self.train_data = train_data self.test_data = test_data self.model = nn.Sequential( nn.Conv2d(3, 6, 5), nn.ReLU(), nn.MaxPool2d(2, 2), nn.Conv2d(6, 16, 5), nn.ReLU(), nn.MaxPool2d(2, 2), nn.Flatten(), nn.Linear(16 * 5 * 5, 120), nn.ReLU(), nn.Linear(120, 84), nn.ReLU(), nn.Linear(84, 10) ) self.optimizer = optim.SGD(self.model.parameters(), lr=0.01) def receive_gradients(self, gradients): return gradientsdef calculate_gradients(device, model): device.model.load_state_dict(model.state_dict()) device.optimizer.zero_grad() for inputs, labels in device.train_data: outputs = device.model(inputs) loss = nn.CrossEntropyLoss()(outputs, labels) loss.backward() gradients = copy.deepcopy(device.model.state_dict()) device.optimizer.step() return gradientsdef elect_leader(devices): return devices[torch.randint(0, len(devices), (1,)).item()]def train_device(device, leader, num_rounds): for i in range(num_rounds): if leader is None or leader.id == device.id: leader = elect_leader(devices) device.model.load_state_dict(leader.model.state_dict()) gradients = calculate_gradients(device, device.model) if leader.id != device.id: leader_gradients = leader.receive_gradients(gradients) device.model.load_state_dict(leader.model.state_dict()) else: leader_gradients = gradients average_gradients = {} for key in leader_gradients.keys(): average_gradients[key] = torch.mean(torch.stack([gradients[key] for gradients in leader_gradients]), dim=0) device.model.load_state_dict({key: value - 0.01 * average_gradients[key] for key, value in device.model.state_dict().items()}) return deviceclass CIFAR10Dataset(Dataset): def __init__(self, data, labels): super().__init__() self.data = data self.labels = labels def __len__(self): return len(self.data) def __getitem__(self, index): img, label = self.data[index], self.labels[index] img = transforms.ToTensor()(img) return img, labelif __name__ == "__main__": transform = transforms.Compose([ transforms.RandomHorizontalFlip(), transforms.RandomCrop(32, padding=4), transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)), ]) train_set = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform) test_set = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform) train_size = len(train_set) // 3 test_size = len(test_set) // 3 device1 = Device(1, DataLoader(CIFAR10Dataset(train_set.data[:train_size], train_set.targets[:train_size]), batch_size=16), DataLoader(CIFAR10Dataset(test_set.data[:test_size], test_set.targets[:test_size]), batch_size=16)) device2 = Device(2, DataLoader(CIFAR10Dataset(train_set.data[train_size:2*train_size], train_set.targets[train_size:2*train_size]), batch_size=16), DataLoader(CIFAR10Dataset(test_set.data[test_size:2*test_size], test_set.targets[test_size:2*test_size]), batch_size=16)) device3 = Device(3, DataLoader(CIFAR10Dataset(train_set.data[2*train_size:], train_set.targets[2*train_size:]), batch_size=16), DataLoader(CIFAR10Dataset(test_set.data[2*test_size:], test_set.targets[2*test_size:]), batch_size=16)) devices = [device1, device2, device3] for device in devices: train_device(device, None, 10) # 测试模型并输入评估后果 for device in devices: device.model.eval() correct = 0 total = 0 with torch.no_grad(): for data in device.test_data: images, labels = data outputs = device.model(images) _, predicted = torch.max(outputs.data, 1) total += labels.size(0) correct += (predicted == labels).sum().item() print(f"Device {device.id}: Accuracy on test set: {100 * correct / total}%") # 汇总所有设施的评估后果并输入均匀准确率 total_correct = sum(d.correct for d in devices) total_samples = sum(d.total for d in devices) print(f"Overall Accuracy on test set: {100 * total_correct / total_samples}%") # 将模型保留到文件中 torch.save(devices[0].model.state_dict(), 'mnist_model.pt')
以上代码实现了一个应用CIFAR-10数据集进行去中心化联邦学习的案例。具体步骤如下:
定义了一个Device类,用于示意参加训练的设施,在初始化时须要指定设施id、训练数据和测试数据,并创立了一个蕴含卷积层、全连贯层和激活函数的神经网络模型以及优化器。
实现了一个calculate_gradients函数,用于通过在本地设施上训练模型并计算梯度。
实现了一个elect_leader函数,用于从参加训练的设施中随机抉择一个设施作为leader。
实现了一个train_device函数,用于训练指定设施的模型,并依据leader的抉择来执行去中心化联邦学习的过程,最终返回训练后的设施对象。
定义了一个CIFAR10Dataset类,用于将CIFAR-10数据集转换为PyTorch Dataset对象。
在main函数中,定义了三个设施对象,并应用CIFAR10Dataset将数据集划分成三份别离调配给这三个设施。而后,针对每个设施,调用train_device函数进行训练,最终失去训练好的模型。
须要留神的是,因为CIFAR-10数据集较大,上述代码仅应用了数据集中的1/3作为样本进行训练和测试。如果须要应用残缺的数据集进行训练,须要思考分布式训练等技术来提高效率。
该局部定义了一个transforms.Compose对象,其中蕴含两个转换操作:
transforms.ToTensor()将输出的PIL图像或ndarray转换为张量,并进行标准化到范畴[0, 1]。
transforms.Normalize(mean, std)用于对张量进行标准化。在这里,咱们将每个通道的平均值设置为0.5,标准差设置为0.5。
这些操作的目标是确保模型可能承受雷同模式的输出数据并对其进行正确的预测。在这里,咱们应用标准化来使得数据的取值范畴更加稳固,并且有助于进步模型的训练成果。
- 定义数据预处理的转换。
- 加载MNIST数据集。
- 将MNIST数据集调配给每个设施。
- 在每个设施上进行本地训练。
- 测试模型并输入评估后果。
- 汇总所有设施的评估后果并输入均匀准确率。
- 将模型保留到文件中。
须要留神的是,在这里咱们假如所有设施都具备雷同的计算能力和存储容量,并且在每个设施上训练的数据集大小雷同。在理论状况下,不同设施的硬件配置和网络速度可能不同,因而须要依据理论状况对训练数据进行划分,以保障训练成果和工夫开销的均衡。
论断
去中心化联邦学习是一种解决数据隐衷爱护和通信效率等问题的无效办法。在本文中,咱们介绍了去中心化联邦学习的原理和实现形式,并提供了2个Python代码案例,演示了如何应用数据集训练简略的联邦学习零碎。
本文由mdnice多平台公布