# 横向联邦统计

Delta中的横向联邦统计在横向联邦任务框架下执行：

{% content-ref url="/pages/-MeZMmOeN3g7Mj5rTG7S" %}
[横向联邦任务框架](/system-design/horizontal-task-framework.md)
{% endcontent-ref %}

横向联邦任务框架对横向的隐私计算任务进行了抽象，所有的横向任务都可被拆分为多个计算单元，每个计算单元包括`select`, `map`, `aggregate`, `reduce`四个步骤。

用户编写的不同的横向计算任务，在真正执行前，都会先由Delta做一次转化，将整个计算过程转化为包含这四个步骤的多个单元，然后再发送到Delta Node进行执行。

本文详细讲解横向联邦统计的任务从用户编写的计算代码，到可被Delta Node执行的多个计算单元的转化过程。

### 用户如何定义横向联邦统计任务

首先，我们要解决的问题，就是用户如何编写代码，来实现他的横向联邦统计任务。关于这一点，当然是希望能最大限度地方便用户的编写。

这一点，我们从数据的角度来入手。也就是说，用户是在什么数据类型上进行统计计算的。对于统计任务来说，最常见的数据类型，就是各种表格了。 如果是一个单机的任务，用户要在表格数据上进行统计，那么他一般会使用`pandas`这个包来将数据读取成`DataFrame`，并通过`DataFrame`上的 各种算子，来对数据进行计算，最终得到他想要的结果。

那么，Delta的任务就很明确了，实现一套类似`pandas.DataFrame`的API，让用户可以像写`pandas`一样地来编写横向联邦统计任务。

### 提取任务的输入与输出

{% content-ref url="/pages/yNWSyYy7eeb0YtqIcIYP" %}
[横向联邦统计任务](/delta-task-development/hfa-task-example.md)
{% endcontent-ref %}

首先，我们就需要提取出用户编写任务的输入与输出。

对于任务的外部输入，也就是指客户端上的数据集，我们采用声明式的方式来让用户定义（参考Example里的`dataset`方法）。对于 其他内部的输入，比如说用户写在代码里的字面量，Delta可以自动地收集这些值，将他们作为任务的内部输入。

对于任务的输出，由于我们限定了，用户需要将所有的计算逻辑都写在一个方法里（参考Example里的`execute`方法），那么这个方法的 输出，就是整个任务的输出。任务执行时，我们只需要记录下execute方法输出的变量，作为任务的输出即可。

### 将单个算子转化为map与reduce

接下来，我们看如何将单个算子，转化为map与reduce。在这里，我们说的算子，指的就是`pandas.DataFrame`上的一些方法， 比如说`sum`，`mean`，`std`等等。

首先我们要分析，一个算子，需要转化为单纯的map操作，还是map+reduce操作，还是单纯的reduce操作。是的，你没看错， 根据算子参数的不同，同一个算子，可能转化为三种不同的操作。简单来说，当一个算子，需要联合多个不同客户端上的数据进行计算时， 就需要转化为map+reduce；否则的话，如果数据在客户端上，就转化为map，如果数据在服务端上，就转化为reduce。 举一个具体的例子，比如说`mean`这个算子，我们都知道它有一个`axis`参数，用来控制是沿着表格的行求和，还是沿着列求和。 在横向联邦的场景下，每个客户端持有数据的不同行，也就是说，每个客户端数据的行不相同，但是列相同。所以，在这种情况下， 当`axis=0`时（沿着行的方向，即对DataFrame的列求均值），`mean`算子需要转化为map+reduce；当`axis=1`时（沿着列的方向，即对DataFrame的行求均值）， 如果这个DataFrame在客户端上，那么`mean`就需要转化为一个map操作，如果在服务端上，就需要转化为一个reduce操作。 这里提一句，什么时候一个DataFrame，或者说数据会在服务端上，简单来说，一个map+reduce操作之后产生的结果，就位于服务端上， 当我们还需要对这个结果进行进一步的操作时，就会需要单纯的reduce操作。

接下来我们看如何将一个算子转化为map+reduce的操作。我们还是以`mean`为例。 对于单纯的map操作和单纯的reduce操作，它们都很简单，都是转化为`pandas.DataFrame.mean`就行了，比较复杂的是map+reduce操作。 我们先不考虑横向联邦的场景，就当做是一个传统的Map Reduce场景，看看要如何实现`mean`。 这点大家应该都能想到，只需要每个map操作输出本地数据的和`sum`以及数量`count`，reduce操作汇集所有map操作的输出之后求和， 得到所有数据的和`sum'`，以及所有数据的数量`count'`，最后计算`sum' / count'`即可。 带入到横向联邦的场景下来，在横向联邦中，安全聚合会自动将所有客户端map操作的输出求和，也就是说，reduce操作可以直接获得 `sum'`以及`count'`，不需要自己进行求和操作了。那么在横向联邦统计中，`mean`算子的map操作，依然输出本地数据的和`sum`以及数量`count`， 而reduce操作的输入，就直接是所有数据的和`sum'`以及所有数据的数量`count'`了，直接计算`sum' / count'`输出即可。

从`mean`算子这个例子出发，我们可以看出，map操作可能需要输出多个值，而其对应的reduce操作的输入，就是多个客户端上map操作的输出值之和。 所以，我们可以将map操作的输出组织成一个字典，字典的里的每个值，都需要是可加的；reduce操作的输入也是个字典，这个字典与map操作输出的字典 结构完全一致，只是字典的值变成了多个客户端输出的和。

### 将多个算子串联起来

在将单个算子转化为map与reduce之后，还不算完，我们还需要考虑如何将多个算子串联起来，毕竟用户编写的代码，不可能只包含一个算子。

在这里，Delta选择使用静态计算图的方式，来解决这个问题。如果用户用过Tenserflow 1.0，那应该很容易理解这个概念。 简单来说，用户编写的代码，并不是直接在Delta上执行的代码，用户编写的代码，只相当于定义了整个计算图，执行用户代码的结果， 是得到一张计算图。Delta会根据这张计算图，对它进行分析，将其转化为一系列的map与reduce操作，再插入相应的select和aggregate操作， 组成一个个Round，最后构建出整个横向联邦统计任务。

从实现上来说，用户在Delta中编写横向联邦统计的任务代码时，输入的数据结构`delta.pandas.DataFrame`，调用的是`delta.pandas.DataFrame` 上的各种算子。但实际上，`delta.pandas.DataFrame`并不包含数据，只是代表计算图上的一个数据节点。调用`delta.pandas.DataFrame`上的算子， 也只是在计算图上，添加了对应的map、reduce或者map+reduce节点。运行完用户编写的代码后，输出的数据节点，就是整个任务的输出了。 从输出节点出发进行遍历，就能得到整张计算图了。计算图中，那些没有入度的数据节点，自然就是任务的输入了。 得到计算图之后，我们需要根据计算图，来整理出任务中的每个Round。在每个Round中，都包含一个map和一个reduce操作，我们要做的，就是 将计算图上的map操作和reduce操作排好序，安排到每个Round的map和reduce中。这里需要指出的是，Round中的一个map和reduce，可以对应于计算图 中的多个map和reduce，只要这些计算图上的map或者reduce操作的输入数据在执行前已经准备完成，就可以把他们合并为一个map或者reduce操作。 构建好这些Round之后，整个任务的构建就完成了。之后我们需要做的，就是提交任务，开始执行了。

### 任务的执行

任务提交后，最后是在Delta Node的客户端与服务端上执行的。Delta Node在任务执行中，主要起两个作用： 一是与链上安全聚合系统交互，实现任务的创建、结算、以及安全聚合；二是为任务的执行提供数据。

首先看Delta Node的客户端。客户端除了监听任务创建、适时加入任务、下载任务外，在任务执行的过程中，主要执行 每个Round中的map操作。为了保证map操作能够顺利运行，需要根据map操作的输入，选择从本地加载相应的数据集、 从服务端下载所需的输入、从本地缓存中读取之前map操作的输出。map操作执行结束后，将属于map+reduce操作的输出通过链上安全聚合提交， 至于其他的输出，则会缓存在本地，供之后的map操作使用。

其次是Delta Node的服务端。服务端除了接受用户提交的任务，将任务提交到链上安全聚合系统外，在任务的执行过程中， 主要执行每个Round的reduce操作。reduce操作的输入，除了从链上安全聚合系统中获得外，还有可能有一些本地缓存的数据。 reduce操作执行结束后，得到的输出会缓存在本地，以供之后的reduce操作，或者其他客户端的map操作使用。

在每个Round开始执行时，选择加入这个Round的客户端，都会通过链上安全聚合系统，尝试去加入这个Round。 服务端也会通过链上安全聚合系统，对客户端进行选择。没有被选中的客户端，会直接退出这个Round的执行。

任务所有Round都结束后，客户端与服务端都会清理本地的缓存。同时，服务端还会根据任务的输出，将缓存中对应的值保存起来， 以供用户进行下载查看。


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.deltampc.com/system-design/horizontal-federated-analytics.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
