Delta
中文
搜索
K
Links
Comment on page

使用Delta Node API管理任务

Delta支持直接通过Delta Node的API来提交任务、监控任务执行状态以及获取任务执行结果。可以使用delta-task包中封装的方法来直接调用API。使用这组API,开发者可以绕过Deltaboard,直接从代码中连接Delta Node。理论上支持将Delta Task的提交和执行做为更大的运算流程中的一步来自动化运行。
在这篇文档中,我们以一个计算多个企业的平均工资的联邦统计任务为例子,描述一个完整的计算任务从编写到执行,再到获取结果的过程。
下面的示例代码可以运行在任何Python环境中,比如开发者本地的PyCharm开发环境中。只要搭建有可用的Delta Node,可以远程访问API,就可以直接执行。
在任务开发之前,需要在本地的Python环境中安装delta-task包。可以使用pip进行安装:
pip install delta-task
假设这样一个场景:有三家企业,每家都有一份员工工资表。这个工资表绝对不能对外暴露。现在这三家企业都在企业内部署了Delta节点,并接入了工资表的数据,我们现在要写一个数据统计任务,计算出这三家企业的全部员工的工资平均值。
相关的API在delta-task中对应的方法主要有如下几个:
  • create_task:向Delta Node提交计算任务。
  • trace:持续获取并打印Delta Node上正在执行的某个任务的执行日志,直到任务结束。
  • wait:阻塞当前代码执行,直到Delta Node上某个任务执行完成。
  • get_result:获取Delta Node上某个任务的执行结果。
同样的例子,使用Deltaboard来编写和执行的例子可以参考这篇文档:
接下来,我们在本地的Python开发环境中编写如下的代码,首先是计算任务的定义:
from typing import Dict
import delta.dataset
from delta import pandas as pd
from delta.delta_node import DeltaNode
from delta.task import HorizontalAnalytics
class WageAvg(HorizontalAnalytics):
def __init__(self) -> None:
super().__init__(
name="wage_avg", # 任务名称,用于在Deltaboard中的展示
min_clients=3, # 算法所需的最少客户端数,至少为2
max_clients=3, # 算法所支持的最大客户端数,必须大雨等于min_clients
wait_timeout=5, # 等待超时时间,用来控制一轮计算的超时时间
connection_timeout=5, # 连接超时时间,用来控制流程中每个阶段的超时时间
)
def dataset(self) -> Dict[str, delta.dataset.DataFrame]:
"""
定义任务所需的数据。
return: 字典,键是数据的名字,需要与execute方法中的参数名称对应;值是一个delta.dataset.DataFrame实例。
"""
return {
"wages": delta.dataset.DataFrame("wages.csv")
}
def execute(self, wages: pd.DataFrame):
"""
实现具体的统计逻辑。
输入与dataset方法的返回值对应
"""
return wages.mean()
这个任务定义分了三个部分:对数据节点的要求、数据集的选取,以及计算的逻辑。
数据节点的要求主要是任务所要求的最少和最多的节点数。因为网络中的节点不是一直在线的,另外也对想要参与的任务有一些挑选,所以这里定义了任务所需要的节点数量。任务发布后,节点自行选择是否加入任务,当选择加入的节点数满足了任务的要求,任务就会开始执行。
平均工资计算任务有3个节点参与,所以我们把最小和最大节点数都设置为3,要求他们全部参与。
数据集的定义主要是说明本次计算需要哪些数据。这些数据分散在不同的节点上,需要以同样的命名和格式保存,供Delta节点来读取使用。在未来Delta数据协议发布后,会支持更多、更灵活的数据源接入。
在这里我们读入wages.csv。每个节点上的wages.csv文件里,放置的是这个企业自己员工的工资数据。
数据集的定义中还指定了数据读入后的格式。这里使用了delta.dataset.DataFrame,告诉Delta读入数据后,转化成Pandas的DataFrame供后续的使用。
然后就是计算逻辑的编写了,这里就是传统的Pandas代码,全部复制进来就好了。计算逻辑的输入,就是上一步定义的转化后的数据集,需要几个就定义几个,全部做为execute方法的参数传递给计算逻辑。
因为隐私保护的缘故,Delta对计算逻辑有一些要求,比如必须包括聚合操作,禁止直接返回原始数据。如果遇到了不符合要求的计算逻辑,Delta就会拒绝任务执行。
平均工资的计算逻辑比较简单,直接使用Pandas的mean函数求平均数就可以了。
然后是将这个Task发送到Delta Node进行执行,并获取计算结果的代码:
task = WageAvg().build()
DELTA_NODE_API = "http://127.0.0.1:6700"
delta_node = DeltaNode(DELTA_NODE_API)
task_id = delta_node.create_task(task) # 创建任务
if delta_node.trace(task_id): # 跟踪任务日志
# 如果不需要跟踪任务日志,也可以这么写:
# if delta_node.wait(task_id): # 等待任务结束
res = delta_node.get_result(task_id) # 获取任务结果
print(res)
else:
print("Task error")
在指定了Delta Node API的连接地址后,通过create_task创建任务,就可以获得一个任务ID(task_id)。 后续,我们可以通过这个任务ID来对任务进行管理。
如果我们需要获取任务日志,那么可以使用trace这个API。trace会持续地在stdout中 打印任务日志,直至任务正常结束或发生异常。trace会返回一个bool值,如果任务正常结束, 则返回值为True,否则为False
如果我们不需要任务日志,只需要等待任务结束,那么可以使用wait这个API。wait的用法 和返回值与trace一致,区别只是在于,它不会打印任务日志。
如果任务正常结束,那么我们可以通过get_result这个API来获取任务的结果。任务结果的类型由任务类型来决定。横向联邦学习任务,结果的类型是Dict[str, torch.Tensor], 即delta.task.HorizontalLearning.state_dict()的返回类型; 横向联邦统计任务,结果的类型是delta.task.HorizontalAnalytics.execute()的返回类型(delta.pandas对应pandas)。
如果在任务没有结束或异常退出的情况下,调用get_result,会抛出异常。所以调用get_result前,一定要使用 tracewait,等待任务正常结束。
如果任务发生异常,那么使用trace,可以在日志中看到发生的异常。
执行上述代码,在命令行有了如下的任务日志:
可以看到任务日志中包括了任务执行过程中的关键步骤的记录,并在最后输出了最终的平均工资计算结果。
下面是这个例子中涉及到的API方法的详细说明:

create_task - 创建任务

delta.delta_node.DeltaNode.create_task(self, task)
用户可以通过该方法将任务提交到Delta Node上,创建任务。
参数:
  • task: delta.core.task, 需要提交的任务,由用户编写的任务经过build生成
返回值:
  • 任务ID,int类型
例子:
# Example is a horizontal learning task (subclass of HorizontalLearning) or a horizontal analytics task (subclass of HorizontalAnalytics)
task = Example().build()
DELTA_NODE_API = "http://127.0.0.1:6700"
delta_node = DeltaNode(DELTA_NODE_API)
task_id = delta_node.create_task(task)

trace - 跟踪任务日志

delta.delta_node.DeltaNode.trace(self, task_id)
用户可以通过该方法,跟踪已经创建的任务的日志。该方法是阻塞方法,会持续打印任务的日志,直至任务正常结束或异常退出。 如果任务开启了零知识证明阶段,该方法会一直阻塞,直至任务完成零知识证明验证或异常退出。
参数:
  • task_id: 已经提交的任务ID。可以通过create_task方法得到
返回值:
  • 任务状态,bool类型,True表示任务执行成功,False表示任务执行过程中出现异常
例子:
# Example is a horizontal learning task (subclass of HorizontalLearning) or a horizontal analytics task (subclass of HorizontalAnalytics)
task = Example().build()
DELTA_NODE_API = "http://127.0.0.1:6700"
delta_node = DeltaNode(DELTA_NODE_API)
task_id = delta_node.create_task(task)
delta_node.trace(task_id)
命令行输出:
2022-05-24 03:43:06 start run task 0xcea03535685face8c12034c585876062915cbf01810c711c3cbb7e424f56d24a
2022-05-24 03:43:06 [Create Task] create task 0xcea03535685face8c12034c585876062915cbf01810c711c3cbb7e424f56d24a
...
...
...
2022-05-24 03:43:27 task 0xcea03535685face8c12034c585876062915cbf01810c711c3cbb7e424f56d24a finish

wait - 等待任务结束

delta.delta_node.DeltaNode.wait(self, task_id)
用户可以通过该方法,等待任务结束。该方法是阻塞方法,会阻塞直至任务正常结束或异常退出。 如果任务开启了零知识证明阶段,该方法会一直阻塞,直至任务完成零知识证明验证或异常退出。
参数:
  • task_id: 已经提交的任务ID。可以通过create_task方法得到
返回值:
  • 任务状态,bool类型,True表示任务执行成功,False表示任务执行过程中出现异常
例子:
# Example is a horizontal learning task (subclass of HorizontalLearning) or a horizontal analytics task (subclass of HorizontalAnalytics)
task = Example().build()
DELTA_NODE_API = "http://127.0.0.1:6700"
delta_node = DeltaNode(DELTA_NODE_API)
task_id = delta_node.create_task(task)
delta_node.wait(task_id)

get_result - 获取任务结果

delta.delta_node.DeltaNode.get_result(self, task_id)
用户可以通过该方法,在任务正常结束后,获取任务结果。如果任务没有结束或执行过程中出现异常,调用此方法会抛出异常。
参数:
  • task_id: 已经提交的任务ID。可以通过create_task方法得到
返回值:
  • 任务结果。横向联邦学习任务,结果的类型是Dict[str, torch.Tensor],即delta.task.HorizontalLearning.state_dict()的返回类型;横向联邦统计任务,结果的类型是delta.task.HorizontalAnalytics.execute()的返回类型(delta.pandas对应pandas)。
例子:
task = Example().build()
DELTA_NODE_API = "http://127.0.0.1:6700"
delta_node = DeltaNode(DELTA_NODE_API)
task_id = delta_node.create_task(task)
if delta_node.wait(task_id):
res = delta_node.get_result(task_id)
print(res)
else:
print("Task error")