这是一个使用Delta框架编写的横向联邦学习的任务示例。
数据是分布在多个节点上的MNIST数据集,每个节点上只有其中的一部分样本。任务是训练一个卷积神经网络的模型,进行手写数字的识别。
本示例可以在Deltaboard中直接运行,完整的Jupyter Notebook文件也已经包含在Deltaboard中,进入Deltaboard的Playground,可以在examples文件下看到本示例文件。
在Deltaboard的在线版本中可以直接查看和运行这个示例
https://board.deltampc.com
1. 引入需要的包
我们的计算逻辑是用torch写的。所以首先引入numpy
和torch
,以及一些辅助的工具,然后从delta-task
的包中,引入Delta框架的内容,包括DeltaNode
节点,用于调用API发送任务,用于横向联邦学习任务HorizontalLearning
,以及用于配置安全聚合策略的FaultTolerantFedAvg
等:
from typing import Any, Dict, Iterable, List, Tuple
import logging
import numpy as np
import torch
from PIL.Image import Image
from torch.utils.data import DataLoader, Dataset
from delta.delta_node import DeltaNode
from delta.task.learning import HorizontalLearning, FaultTolerantFedAvg
import delta.dataset
2. 定义神经网络模型
接下来我们来定义神经网络模型,这里和传统的神经网络模型定义完全一样:
class LeNet(torch.nn.Module):
def __init__(self):
super().__init__()
self.conv1 = torch.nn.Conv2d(1, 16, 5, padding=2)
self.pool1 = torch.nn.AvgPool2d(2, stride=2)
self.conv2 = torch.nn.Conv2d(16, 16, 5)
self.pool2 = torch.nn.AvgPool2d(2, stride=2)
self.dense1 = torch.nn.Linear(400, 100)
self.dense2 = torch.nn.Linear(100, 10)
def forward(self, x: torch.Tensor):
x = self.conv1(x)
x = torch.relu(x)
x = self.pool1(x)
x = self.conv2(x)
x = torch.relu(x)
x = self.pool2(x)
x = x.view(-1, 400)
x = self.dense1(x)
x = torch.relu(x)
x = self.dense2(x)
return x
3. 定义隐私计算任务
然后可以开始定义我们的横向联邦任务了,用横向联邦学习的方式,在多节点上训练上面定义的神经网络模型
在定义横向联邦学习任务时,有几部分内容是需要用户自己定义的:
任务配置: 我们需要在 super().__init__()
方法中对任务进行配置。
数据集: 我们需要在dataset
方法中定义任务所需要的数据集。
训练集的Dataloader: 我们需要在make_train_dataloader
方法中定义训练集的Dataloader。
验证集的Dataloader: 我们需要在make_validate_dataloader
方法中定义验证集的Dataloader。
模型训练: 在该方法中,定义整个模型的训练过程,包含整个前向传播和后向传播的。该方法的输入是训练集的Dataloader。
模型验证: 在该方法中,定义整个模型的验证过程。该方法的输入是验证集的Dataloader,输出是一个字典,字典的键是计算出的指标名称,值是对应的指标值。
模型参数: 我们需要在state_dict
方法中定义所有需要训练和更新的模型参数。
def transform_data(data: List[Tuple[Image, str]]):
"""
作为dataloader的collate_fn,用于预处理函数。
将输入的mnist图片调整大小、归一化后,变为torch.Tensor返回。
"""
xs, ys = [], []
for x, y in data:
xs.append(np.array(x).reshape((1, 28, 28)))
ys.append(int(y))
imgs = torch.tensor(xs)
label = torch.tensor(ys)
imgs = imgs / 255 - 0.5
return imgs, label
class Example(HorizontalLearning):
def __init__(self) -> None:
super().__init__(
name="example", # 任务名称,用于在Deltaboard中的展示
max_rounds=2, # 任务训练的总轮次,每聚合更新一次权重,代表一轮
validate_interval=1, # 验证的轮次间隔,1表示每完成一轮,进行一次验证
validate_frac=0.1, # 验证集的比例,范围(0,1)
strategy=FaultTolerantFedAvg( # 安全聚合的策略,可选策略目前包含 FedAvg和FaultTolerantFedAvg,都位于delta.task.learning包下
min_clients=2, # 算法所需的最少客户端数,至少为2
max_clients=3, # 算法所支持的最大客户端数,必须大雨等于min_clients
merge_epoch=1, # 聚合更新的间隔,merge_interval_epoch表示每多少个epoch聚合更新一次权重
wait_timeout=30, # 等待超时时间,用来控制一轮计算的超时时间
connection_timeout=10 # 连接超时时间,用来控制流程中每个阶段的超时时间
)
)
self.model = LeNet()
self.loss_func = torch.nn.CrossEntropyLoss()
self.optimizer = torch.optim.SGD(
self.model.parameters(),
lr=0.1,
momentum=0.9,
weight_decay=1e-3,
nesterov=True,
)
def dataset(self) -> delta.dataset.Dataset:
"""
定义任务所需要的数据集。
return: 一个delta.dataset.Dataset
"""
return delta.dataset.Dataset(dataset="mnist")
def make_train_dataloader(self, dataset: Dataset) -> DataLoader:
"""
定义训练集Dataloader,可以对dataset进行各种变换、预处理等操作。
dataset: 训练集的Dataset
return: 训练集的Dataloader
"""
return DataLoader(dataset, batch_size=64, shuffle=True, drop_last=True, collate_fn=transform_data) # type: ignore
def make_validate_dataloader(self, dataset: Dataset) -> DataLoader:
"""
定义验证集Dataloader,可以对dataset进行各种变换、预处理等操作。
dataset: 验证集的Dataset
return: 验证集的Dataloader
"""
return DataLoader(dataset, batch_size=64, shuffle=False, drop_last=False, collate_fn=transform_data) # type: ignore
def train(self, dataloader: Iterable):
"""
训练步骤
dataloader: 训练数据集对应的dataloader
return: None
"""
for batch in dataloader:
x, y = batch
y_pred = self.model(x)
loss = self.loss_func(y_pred, y)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
def validate(self, dataloader: Iterable) -> Dict[str, Any]:
"""
验证步骤,输出验证的指标值
dataloader: 验证集对应的dataloader
return: Dict[str, float],一个字典,键为指标的名称(str),值为对应的指标值(float)
"""
total_loss = 0
count = 0
ys = []
y_s = []
for batch in dataloader:
x, y = batch
y_pred = self.model(x)
loss = self.loss_func(y_pred, y)
total_loss += loss.item()
count += 1
y_ = torch.argmax(y_pred, dim=1)
y_s.extend(y_.tolist())
ys.extend(y.tolist())
avg_loss = total_loss / count
tp = len([1 for i in range(len(ys)) if ys[i] == y_s[i]])
precision = tp / len(ys)
return {"loss": avg_loss, "precision": precision}
def state_dict(self) -> Dict[str, torch.Tensor]:
"""
需要训练、更新的模型参数
在聚合更新、保存结果时,只会更新、保存get_params返回的参数
return: List[torch.Tensor], 模型参数列表
"""
return self.model.state_dict()
具体来讲,当定义横向联邦学习任务时,我们需要定义一个继承自HorizontalLearning
的类。HorizontalLearning
类是一个虚基类,针对横向联邦学习任务的流程,定义了一些虚函数, 需要我们来实现。
首先是构造函数__init__
。构造函数在这里,主要是对任务任务进行一些基础的配置。这些配置项包括任务名称(name
),任务训练的总轮数(max_rounds
),执行验证的频率(每 validate_interval
轮执行一次验证),验证集的比例(validate_frac
),以及安全聚合的策略(strategy
)。在自己实现构造函数时,必须调用基类的构造函数,即super().__init__()
。
之后是dataset
方法。dataset
方法定义任务所需要的数据集。该方法返回一个delta.dataset.Dataset
实例, 其参数dataset
代表所需数据集的名称。关于数据集格式的具体细节,请参考这篇文章。目前横向联邦学习任务,只支持实用一个数据集,所以dataset
方法只能返回一个delta.dataset.Dataset
实例
然后是两个定义DataLoader的方法,分别是定义训练集DataLoader的make_train_dataloader
和定义验证集DataLoader的make_validate_dataloader
。这两个方法需要实现的逻辑类似,所以放在一起说。这两个方法的输入都是一个torch.utils.data.Dataset
实例,分别为训练集和验证集;在方法中,可以按照需要对数据集进行变换、预处理等操作。最后,我们需要返回一个torch.utils.data.Dataloader
实例,它会作为模型训练方法的输入。
然后是训练模型的方法train
。该方法的输入是在make_train_dataloader
中定义的DataLoader
;在该方法中,我们需要实现整个模型的训练过程,包括前向传播,后向传播和参数更新操作。
还有验证模型的方法validate
。该方法的输入是在make_validate_dataloader
中定义的Dataloader;在该方法中,可以根据需要,在验证集上,计算模型的各种性能指标;该方法最后返回一个字典,字典的键是模型的指标的名称,对应的值是指标的值。
最后,我们还需要在state_dict
方法中定义所有需要训练和更新的模型参数,方法的返回值就是这些模型参数的列表。
4. 指定执行任务用的Delta Node的API
定义好了任务,我们就可以开始准备在Delta Node上执行任务了。
Delta Task框架可以直接调用Delta Node API发送任务到Delta Node开始执行,只要在任务执行时指定Delta Node的API地址即可。
Deltaboard提供了对于Delta Node的API的封装,为每个用户提供了一个独立的API地址,支持多人同时使用同一个Delta Node,并且能够在Deltaboard中管理自己提交的任务。 在这里,我们使用Deltaboard提供的API来执行任务。如果用户自己搭建了Delta Node,也可以直接使用Delta Node的API。
在Deltaboard导航栏中进入“个人中心”,在Deltaboard API中,复制自己的API地址,并粘贴到下面的代码中:
DELTA_NODE_API = "http://127.0.0.1:6704"
5. 执行隐私计算任务
接下来我们可以开始运行这个模型了:
task = Example().build()
delta_node = DeltaNode(DELTA_NODE_API)
delta_node.create_task(task)
6. 查看执行状态
点击执行后,可以从输出的日志看出,任务已经提交到了Delta Node的节点上。
接下来,可以从左侧的导航栏中,前往“任务列表”,找到刚刚提交的任务,点击进去查看具体的执行日志了。
在系统详细设计的章节中,有隐私计算任务从发送到Delta Node开始,到执行完毕的详细流程说明,在上文中执行的横向联邦学习任务,可以在下面的文章中详细了解其执行过程:
横向联邦学习