使用Delta Node API管理任务
最后更新于
最后更新于
Delta支持直接通过Delta Node的API来提交任务、监控任务执行状态以及获取任务执行结果。可以使用delta-task包中封装的方法来直接调用API。使用这组API,开发者可以绕过Deltaboard,直接从代码中连接Delta Node。理论上支持将Delta Task的提交和执行做为更大的运算流程中的一步来自动化运行。
在这篇文档中,我们以一个计算多个企业的平均工资的联邦统计任务为例子,描述一个完整的计算任务从编写到执行,再到获取结果的过程。
下面的示例代码可以运行在任何Python环境中,比如开发者本地的PyCharm开发环境中。只要搭建有可用的Delta Node,可以远程访问API,就可以直接执行。
在任务开发之前,需要在本地的Python环境中安装delta-task
包。可以使用pip
进行安装:
假设这样一个场景:有三家企业,每家都有一份员工工资表。这个工资表绝对不能对外暴露。现在这三家企业都在企业内部署了Delta节点,并接入了工资表的数据,我们现在要写一个数据统计任务,计算出这三家企业的全部员工的工资平均值。
相关的API在delta-task中对应的方法主要有如下几个:
create_task
:向Delta Node提交计算任务。
trace
:持续获取并打印Delta Node上正在执行的某个任务的执行日志,直到任务结束。
wait
:阻塞当前代码执行,直到Delta Node上某个任务执行完成。
get_result
:获取Delta Node上某个任务的执行结果。
同样的例子,使用Deltaboard来编写和执行的例子可以参考这篇文档:
接下来,我们在本地的Python开发环境中编写如下的代码,首先是计算任务的定义:
这个任务定义分了三个部分:对数据节点的要求、数据集的选取,以及计算的逻辑。
数据节点的要求主要是任务所要求的最少和最多的节点数。因为网络中的节点不是一直在线的,另外也对想要参与的任务有一些挑选,所以这里定义了任务所需要的节点数量。任务发布后,节点自行选择是否加入任务,当选择加入的节点数满足了任务的要求,任务就会开始执行。
平均工资计算任务有3个节点参与,所以我们把最小和最大节点数都设置为3,要求他们全部参与。
数据集的定义主要是说明本次计算需要哪些数据。这些数据分散在不同的节点上,需要以同样的命名和格式保存,供Delta节点来读取使用。在未来Delta数据协议发布后,会支持更多、更灵活的数据源接入。
在这里我们读入wages.csv
。每个节点上的wages.csv
文件里,放置的是这个企业自己员工的工资数据。
数据集的定义中还指定了数据读入后的格式。这里使用了delta.dataset.DataFrame
,告诉Delta读入数据后,转化成Pandas的DataFrame供后续的使用。
然后就是计算逻辑的编写了,这里就是传统的Pandas代码,全部复制进来就好了。计算逻辑的输入,就是上一步定义的转化后的数据集,需要几个就定义几个,全部做为execute方法的参数传递给计算逻辑。
因为隐私保护的缘故,Delta对计算逻辑有一些要求,比如必须包括聚合操作,禁止直接返回原始数据。如果遇到了不符合要求的计算逻辑,Delta就会拒绝任务执行。
平均工资的计算逻辑比较简单,直接使用Pandas的mean
函数求平均数就可以了。
然后是将这个Task发送到Delta Node进行执行,并获取计算结果的代码:
在指定了Delta Node API的连接地址后,通过create_task
创建任务,就可以获得一个任务ID(task_id
)。 后续,我们可以通过这个任务ID来对任务进行管理。
如果我们需要获取任务日志,那么可以使用trace
这个API。trace
会持续地在stdout
中 打印任务日志,直至任务正常结束或发生异常。trace
会返回一个bool
值,如果任务正常结束, 则返回值为True
,否则为False
。
如果我们不需要任务日志,只需要等待任务结束,那么可以使用wait
这个API。wait
的用法 和返回值与trace
一致,区别只是在于,它不会打印任务日志。
如果任务正常结束,那么我们可以通过get_result
这个API来获取任务的结果。任务结果的类型由任务类型来决定。横向联邦学习任务,结果的类型是Dict[str, torch.Tensor], 即delta.task.HorizontalLearning.state_dict()的返回类型; 横向联邦统计任务,结果的类型是delta.task.HorizontalAnalytics.execute()的返回类型(delta.pandas对应pandas)。
如果在任务没有结束或异常退出的情况下,调用get_result
,会抛出异常。所以调用get_result
前,一定要使用 trace
或wait
,等待任务正常结束。
如果任务发生异常,那么使用trace
,可以在日志中看到发生的异常。
执行上述代码,在命令行有了如下的任务日志:
可以看到任务日志中包括了任务执行过程中的关键步骤的记录,并在最后输出了最终的平均工资计算结果。
下面是这个例子中涉及到的API方法的详细说明:
用户可以通过该方法将任务提交到Delta Node上,创建任务。
参数:
task: delta.core.task, 需要提交的任务,由用户编写的任务经过build生成
返回值:
任务ID,int类型
例子:
用户可以通过该方法,跟踪已经创建的任务的日志。该方法是阻塞方法,会持续打印任务的日志,直至任务正常结束或异常退出。 如果任务开启了零知识证明阶段,该方法会一直阻塞,直至任务完成零知识证明验证或异常退出。
参数:
task_id: 已经提交的任务ID。可以通过create_task方法得到
返回值:
任务状态,bool类型,True表示任务执行成功,False表示任务执行过程中出现异常
例子:
命令行输出:
用户可以通过该方法,等待任务结束。该方法是阻塞方法,会阻塞直至任务正常结束或异常退出。 如果任务开启了零知识证明阶段,该方法会一直阻塞,直至任务完成零知识证明验证或异常退出。
参数:
task_id: 已经提交的任务ID。可以通过create_task方法得到
返回值:
任务状态,bool类型,True表示任务执行成功,False表示任务执行过程中出现异常
例子:
用户可以通过该方法,在任务正常结束后,获取任务结果。如果任务没有结束或执行过程中出现异常,调用此方法会抛出异常。
参数:
task_id: 已经提交的任务ID。可以通过create_task方法得到
返回值:
任务结果。横向联邦学习任务,结果的类型是Dict[str, torch.Tensor],即delta.task.HorizontalLearning.state_dict()的返回类型;横向联邦统计任务,结果的类型是delta.task.HorizontalAnalytics.execute()的返回类型(delta.pandas对应pandas)。
例子: