横向联邦学习任务

这是一个使用Delta框架编写的横向联邦学习的任务示例。

数据是分布在多个节点上的MNIST数据集,每个节点上只有其中的一部分样本。任务是训练一个卷积神经网络的模型,进行手写数字的识别。

本示例可以在Deltaboard中直接运行,完整的Jupyter Notebook文件也已经包含在Deltaboard中,进入Deltaboard的Playground,可以在examples文件下看到本示例文件。

在Deltaboard的在线版本中可以直接查看和运行这个示例

https://board.deltampc.com

1. 引入需要的包

我们的计算逻辑是用torch写的。所以首先引入numpytorch,以及一些辅助的工具,然后从delta-task的包中,引入Delta框架的内容,包括DeltaNode节点,用于调用API发送任务,用于横向联邦学习任务HorizontalLearning,以及用于配置安全聚合策略的FaultTolerantFedAvg等:

from typing import Any, Dict, Iterable, List, Tuple

import logging
import numpy as np
import torch
from PIL.Image import Image
from torch.utils.data import DataLoader, Dataset

from delta.delta_node import DeltaNode
from delta.task.learning import HorizontalLearning, FaultTolerantFedAvg
import delta.dataset

2. 定义神经网络模型

接下来我们来定义神经网络模型,这里和传统的神经网络模型定义完全一样:

class LeNet(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = torch.nn.Conv2d(1, 16, 5, padding=2)
        self.pool1 = torch.nn.AvgPool2d(2, stride=2)
        self.conv2 = torch.nn.Conv2d(16, 16, 5)
        self.pool2 = torch.nn.AvgPool2d(2, stride=2)
        self.dense1 = torch.nn.Linear(400, 100)
        self.dense2 = torch.nn.Linear(100, 10)

    def forward(self, x: torch.Tensor):
        x = self.conv1(x)
        x = torch.relu(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = torch.relu(x)
        x = self.pool2(x)

        x = x.view(-1, 400)
        x = self.dense1(x)
        x = torch.relu(x)
        x = self.dense2(x)
        return x

3. 定义隐私计算任务

然后可以开始定义我们的横向联邦任务了,用横向联邦学习的方式,在多节点上训练上面定义的神经网络模型

在定义横向联邦学习任务时,有几部分内容是需要用户自己定义的:

  • 任务配置: 我们需要在 super().__init__() 方法中对任务进行配置。

  • 数据集: 我们需要在dataset方法中定义任务所需要的数据集。

  • 训练集的Dataloader: 我们需要在make_train_dataloader方法中定义训练集的Dataloader。

  • 验证集的Dataloader: 我们需要在make_validate_dataloader方法中定义验证集的Dataloader。

  • 模型训练: 在该方法中,定义整个模型的训练过程,包含整个前向传播和后向传播的。该方法的输入是训练集的Dataloader。

  • 模型验证: 在该方法中,定义整个模型的验证过程。该方法的输入是验证集的Dataloader,输出是一个字典,字典的键是计算出的指标名称,值是对应的指标值。

  • 模型参数: 我们需要在state_dict方法中定义所有需要训练和更新的模型参数。

def transform_data(data: List[Tuple[Image, str]]):
    """
    作为dataloader的collate_fn,用于预处理函数。
    将输入的mnist图片调整大小、归一化后,变为torch.Tensor返回。
    """
    xs, ys = [], []
    for x, y in data:
        xs.append(np.array(x).reshape((1, 28, 28)))
        ys.append(int(y))

    imgs = torch.tensor(xs)
    label = torch.tensor(ys)
    imgs = imgs / 255 - 0.5
    return imgs, label


class Example(HorizontalLearning):
    def __init__(self) -> None:
        super().__init__(
            name="example",  # 任务名称,用于在Deltaboard中的展示
            max_rounds=2,  # 任务训练的总轮次,每聚合更新一次权重,代表一轮
            validate_interval=1,  # 验证的轮次间隔,1表示每完成一轮,进行一次验证
            validate_frac=0.1,  # 验证集的比例,范围(0,1)
            strategy=FaultTolerantFedAvg(  # 安全聚合的策略,可选策略目前包含 FedAvg和FaultTolerantFedAvg,都位于delta.task.learning包下
                min_clients=2,  # 算法所需的最少客户端数,至少为2
                max_clients=3,  # 算法所支持的最大客户端数,必须大雨等于min_clients
                merge_epoch=1,  # 聚合更新的间隔,merge_interval_epoch表示每多少个epoch聚合更新一次权重
                wait_timeout=30,  # 等待超时时间,用来控制一轮计算的超时时间
                connection_timeout=10  # 连接超时时间,用来控制流程中每个阶段的超时时间
            )
        )
        self.model = LeNet()
        self.loss_func = torch.nn.CrossEntropyLoss()
        self.optimizer = torch.optim.SGD(
            self.model.parameters(),
            lr=0.1,
            momentum=0.9,
            weight_decay=1e-3,
            nesterov=True,
        )

    def dataset(self) -> delta.dataset.Dataset:
        """
        定义任务所需要的数据集。
        return: 一个delta.dataset.Dataset
        """
        return delta.dataset.Dataset(dataset="mnist")

    def make_train_dataloader(self, dataset: Dataset) -> DataLoader:
        """
        定义训练集Dataloader,可以对dataset进行各种变换、预处理等操作。
        dataset: 训练集的Dataset
        return: 训练集的Dataloader
        """
        return DataLoader(dataset, batch_size=64, shuffle=True, drop_last=True, collate_fn=transform_data)  # type: ignore

    def make_validate_dataloader(self, dataset: Dataset) -> DataLoader:
        """
        定义验证集Dataloader,可以对dataset进行各种变换、预处理等操作。
        dataset: 验证集的Dataset
        return: 验证集的Dataloader
        """
        return DataLoader(dataset, batch_size=64, shuffle=False, drop_last=False, collate_fn=transform_data)  # type: ignore

    def train(self, dataloader: Iterable):
        """
        训练步骤
        dataloader: 训练数据集对应的dataloader
        return: None
        """
        for batch in dataloader:
            x, y = batch
            y_pred = self.model(x)
            loss = self.loss_func(y_pred, y)
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

    def validate(self, dataloader: Iterable) -> Dict[str, Any]:
        """
        验证步骤,输出验证的指标值
        dataloader: 验证集对应的dataloader
        return: Dict[str, float],一个字典,键为指标的名称(str),值为对应的指标值(float)
        """
        total_loss = 0
        count = 0
        ys = []
        y_s = []
        for batch in dataloader:
            x, y = batch
            y_pred = self.model(x)
            loss = self.loss_func(y_pred, y)
            total_loss += loss.item()
            count += 1

            y_ = torch.argmax(y_pred, dim=1)
            y_s.extend(y_.tolist())
            ys.extend(y.tolist())
        avg_loss = total_loss / count
        tp = len([1 for i in range(len(ys)) if ys[i] == y_s[i]])
        precision = tp / len(ys)

        return {"loss": avg_loss, "precision": precision}

    def state_dict(self) -> Dict[str, torch.Tensor]:
        """
        需要训练、更新的模型参数
        在聚合更新、保存结果时,只会更新、保存get_params返回的参数
        return: List[torch.Tensor], 模型参数列表
        """
        return self.model.state_dict()

具体来讲,当定义横向联邦学习任务时,我们需要定义一个继承自HorizontalLearning的类。HorizontalLearning类是一个虚基类,针对横向联邦学习任务的流程,定义了一些虚函数, 需要我们来实现。

首先是构造函数__init__。构造函数在这里,主要是对任务任务进行一些基础的配置。这些配置项包括任务名称(name),任务训练的总轮数(max_rounds),执行验证的频率(每 validate_interval 轮执行一次验证),验证集的比例(validate_frac),以及安全聚合的策略(strategy)。在自己实现构造函数时,必须调用基类的构造函数,即super().__init__()

之后是dataset方法。dataset方法定义任务所需要的数据集。该方法返回一个delta.dataset.Dataset实例, 其参数dataset代表所需数据集的名称。关于数据集格式的具体细节,请参考这篇文章。目前横向联邦学习任务,只支持实用一个数据集,所以dataset方法只能返回一个delta.dataset.Dataset实例

然后是两个定义DataLoader的方法,分别是定义训练集DataLoader的make_train_dataloader和定义验证集DataLoader的make_validate_dataloader。这两个方法需要实现的逻辑类似,所以放在一起说。这两个方法的输入都是一个torch.utils.data.Dataset实例,分别为训练集和验证集;在方法中,可以按照需要对数据集进行变换、预处理等操作。最后,我们需要返回一个torch.utils.data.Dataloader实例,它会作为模型训练方法的输入。

然后是训练模型的方法train。该方法的输入是在make_train_dataloader中定义的DataLoader;在该方法中,我们需要实现整个模型的训练过程,包括前向传播,后向传播和参数更新操作。

还有验证模型的方法validate。该方法的输入是在make_validate_dataloader中定义的Dataloader;在该方法中,可以根据需要,在验证集上,计算模型的各种性能指标;该方法最后返回一个字典,字典的键是模型的指标的名称,对应的值是指标的值。

最后,我们还需要在state_dict方法中定义所有需要训练和更新的模型参数,方法的返回值就是这些模型参数的列表。

4. 指定执行任务用的Delta Node的API

定义好了任务,我们就可以开始准备在Delta Node上执行任务了。

Delta Task框架可以直接调用Delta Node API发送任务到Delta Node开始执行,只要在任务执行时指定Delta Node的API地址即可。

Deltaboard提供了对于Delta Node的API的封装,为每个用户提供了一个独立的API地址,支持多人同时使用同一个Delta Node,并且能够在Deltaboard中管理自己提交的任务。 在这里,我们使用Deltaboard提供的API来执行任务。如果用户自己搭建了Delta Node,也可以直接使用Delta Node的API。

在Deltaboard导航栏中进入“个人中心”,在Deltaboard API中,复制自己的API地址,并粘贴到下面的代码中:

DELTA_NODE_API = "http://127.0.0.1:6704"

5. 执行隐私计算任务

接下来我们可以开始运行这个模型了:

task = Example().build()

delta_node = DeltaNode(DELTA_NODE_API)
delta_node.create_task(task)

6. 查看执行状态

点击执行后,可以从输出的日志看出,任务已经提交到了Delta Node的节点上。

接下来,可以从左侧的导航栏中,前往“任务列表”,找到刚刚提交的任务,点击进去查看具体的执行日志了。

在系统详细设计的章节中,有隐私计算任务从发送到Delta Node开始,到执行完毕的详细流程说明,在上文中执行的横向联邦学习任务,可以在下面的文章中详细了解其执行过程:

横向联邦学习

最后更新于