Task Submission

2PM Node Framework supports direct task submission, monitoring, and result retrieval through the 2PM Node's API using methods encapsulated in the 2pm-task package. In this document, we illustrate a complete computation task from writing to execution, to retrieving results using an example that can be run in any Python environment, such as a developer's local PyCharm. As long as there is an accessible 2PM Node, the task can be executed remotely. Before starting task development, the 2pm-task package must be installed in the local Python environment using pip:

pip install 2pm-task

Key methods in the 2pm-task package include:

  • create_task: Submits a computation task to the 2PM Node.

  • trace: Continuously retrieves and prints the execution log of a task running on the 2PM Node until the task completes.

  • wait: Blocks the current code execution until a task on the 2PM Node is completed.

  • get_result: Retrieves the result of a task executed on the 2PM Node.

Next, we define a task according to FL or FHE tutorials, then execute this task on the 2PM Node and retrieve the computation results with the following code:

task = YOUR_TASK_INSTANCE().build()

2PM_NODE_API = "http://127.0.0.1:6700"

2pm_node = 2PMNode(2PM_NODE_API)

task_id = 2pm_node.create_task(task)  # Create task

if 2pm_node.trace(task_id):  # Track task logs
    # If you do not need to track task logs, you can also write:
    # if 2pm_node.wait(task_id):  # Wait for the task to end
    res = 2pm_node.get_result(task_id)  # Get task result
    print(res)
else:
    print("Task error")

Once the 2PM Node API connection address is specified and a task is created using create_task, a task ID (task_id) is obtained. This task ID can then be used to manage the task.

If you need to obtain task logs, you can use the trace API. Trace continuously prints task logs to stdout until the task ends normally or an exception occurs. Trace returns a boolean value; it returns True if the task ends normally, and False otherwise.

If you do not need task logs and only need to wait for the task to end, you can use the wait API. The usage and return values of wait are consistent with trace, the difference being that it does not print task logs.

If the task ends normally, you can retrieve the results of the task using the get_result API. The type of the task result depends on the type of task. For horizontal federated learning tasks, the result type is Dict[str, torch.Tensor], which is the return type of 2pm.task.HorizontalLearning.state_dict(). For FHE Machine Learning tasks, the result type is the return type of 2pm.task.FHEMachineLearning.execute().

If get_result is called before the task ends or if the task exits abnormally, an exception is thrown. Therefore, it is crucial to use trace or wait to ensure the task has ended normally before calling get_result.

If the task encounters an exception, using trace will display the exception in the logs.

API Specification

create_task - Create Task

2pm.2pm_node.2PMNode.create_task(self, task)

Users can use this method to submit tasks to the 2PM Node and create a task.

Parameters:

  • task: 2pm.core.task, the task to be submitted, generated by the user through the build method.

Return Value:

  • Task ID, int type.

Example:

# Example is a horizontal learning task (subclass of HorizontalLearning) or a horizontal analytics task (subclass of HorizontalAnalytics)
task = Example().build()
2PM_NODE_API = "http://127.0.0.1:6700"
2pm_node = 2PMNode(2PM_NODE_API)
task_id = 2pm_node.create_task(task)

trace - Trace Task Logs

2pm.2pm_node.2PMNode.trace(self, task_id)

Users can use this method to trace the logs of already created tasks. This method is blocking and will continuously print the task logs until the task completes successfully or exits abnormally. If the task has enabled the zero-knowledge proof phase, this method will remain blocked until the task completes zero-knowledge proof verification or exits abnormally.

Parameters:

  • task_id: The ID of the already submitted task. Can be obtained through the create_task method.

Return Value:

  • Task status, bool type, True indicates successful task execution, False indicates an exception occurred during task execution.

Example:

# Example is a horizontal learning task (subclass of HorizontalLearning) or a horizontal analytics task (subclass of HorizontalAnalytics)
task = Example().build()
2PM_NODE_API = "http://127.0.0.1:6700"
2pm_node = 2PMNode(2PM_NODE_API)
task_id = 2pm_node.create_task(task)
2pm_node.trace(task_id)

wait - Wait for Task Completion

2pm.2pm_node.2PMNode.wait(self, task_id)

Users can use this method to wait for the task to complete. This method is blocking and will block until the task completes successfully or exits abnormally. If the task has enabled the zero-knowledge proof phase, this method will remain blocked until the task completes zero-knowledge proof verification or exits abnormally.

Parameters:

  • task_id: The ID of the already submitted task. Can be obtained through the create_task method.

Return Value:

  • Task status, bool type, True indicates successful task execution, False indicates an exception occurred during task execution.

Example:

# Example is a horizontal learning task (subclass of HorizontalLearning) or a horizontal analytics task (subclass of HorizontalAnalytics)
task = Example().build()
2PM_NODE_API = "http://127.0.0.1:6700"
2pm_node = 2PMNode(2PM_NODE_API)
task_id = 2pm_node.create_task(task)
2pm_node.wait(task_id)

get_result - Get Task Result

2pm.2pm_node.2PMNode.get_result(self, task_id)

Users can use this method to obtain the task result after the task completes successfully. If the task has not ended or an exception occurred during execution, calling this method will raise an exception.

Parameters:

  • task_id: The ID of the already submitted task. Can be obtained through the create_task method.

Return Value:

  • Task result. For horizontal federated learning tasks, the result type is Dict[str, torch.Tensor], the return type of 2pm.task.HorizontalLearning.state_dict(). For FHE Machine Learning tasks, the result type is the return type of 2pm.task.FHEMachineLearning.execute().

Example:

task = Example().build()
2PM_NODE_API = "http://127.0.0.1:6700"
2pm_node = 2PMNode(2PM_NODE_API)
task_id = 2pm_node.create_task(task)
if 2pm_node.wait(task_id):
    res = 2pm_node.get_result(task_id)
    print(res)
else:
    print("Task error")

Last updated