Create tasks for Datashare¶
In this section, you'll learn how to update our app to run together with Datashare.
Communicating with Datashare's via AMQP¶
Datashare features its own task manager used to execute many built-in tasks. This task manager can be configured in many different ways and in particular it can be configured to use AMQP via RabbitMQ to manage tasks.
Having Datashare's task manager powered by AMQP allows us to integrate our custom tasks in Datashare's backend:
- we'll use Datashare's task manager to create custom tasks
- the task manager will publish custom tasks on the AMQP broker
- our Python worker will receive the tasks and execute them
- the workers will return their results to the task manager
The above workflow however requires the worker to be properly configured to communicate with Datashare's task manager.
Aligning worker's config with Datashare¶
If Datashare uses non default RabbitMQ config make sure to update your config to match Datashare's RabbitMQ config.
Here is how to do so via the config file:
{
"type": "amqp",
"rabbitmq_host": "your-rabbitmq-host",
"rabbitmq_management_port": 16666,
"rabbitmq_password": "******",
"rabbitmq_port": 6666,
"rabbitmq_user": "your-rabbitmq-user",
"rabbitmq_vhost": "somevhost"
}
If you prefer using env vars, you can update the above variables by setting the ICIJ_WORKER_<KEY>
variables where<KEY>
is one of the above JSON's keys.
Read the full guide to worker config to learn more.
Grouping our tasks in the PYTHON
task group¶
Inside Datashare, tasks can be grouped together in order to be executed by specific workers.
In particular, Datashare defines the JAVA
task group reserved for built-in tasks
(such as indexing, batch searches/downloads...) and the PYTHON
task group reserved for your custom tasks.
When tasks are created and published inside Datashare by the task manager, it's possible to assign them with a group, allowing the task to be routed to specific workers.
When creating publishing tasks with the PYTHON
group, this will ensure only your workers will receive the tasks (and not builtin Java workers).
If you don't use the PYTHON
task group your worker will receive the builtin tasks reserved for Java workers,
since they can't execute these tasks, they will fail.
In order, to assign a task to a specific group, you can use the group: str | TaskGroup | None
argument of the task
decorator.
Read the full guide to task routing to learn more.
Adding the mandatory user: dict | None
argument to all tasks¶
Datashare systematically adds a user: dict | None
to task arguments.
Since task arguments are directly forwarded your function, they need to support the user
argument even when used,
otherwise the task will fail complaining that the user
argument was provided as input but is not used by the task.
Our first Datashare app¶
The app can hence be updated as following:
from icij_worker import AsyncApp
from icij_worker.app import TaskGroup
from icij_worker.typing_ import RateProgress
app = AsyncApp("some-app")
PYTHON_TASK_GROUP = TaskGroup(name="PYTHON")
@app.task(name="hello_world", group=PYTHON_TASK_GROUP)
def hello_world(user: dict | None) -> str: # pylint: disable=unused-argument
return "Hello world"
@app.task(name="hello_user", group=PYTHON_TASK_GROUP)
def hello_user(user: dict | None) -> str:
greeting = "Hello "
if user is None:
user = "unknown"
else:
user = user["id"]
return greeting + user
@app.task(name="hello_user_progress", group=PYTHON_TASK_GROUP)
async def hello_user_progress(user: dict | None, progress: RateProgress) -> str:
greeting = "Hello "
await progress(0.5)
if user is None:
user = "unknown"
else:
user = user["id"]
res = greeting + user
await progress(1)
return res
Running group-specific workers¶
As detailed in the task routing guide, worker pools can be restricted to execute tasks of a given group.
We can hence start our PYTHON
worker pool adding the -g PYTHON
argument:
[INFO][icij_worker.backend.backend]: worker config: {
...
}
[INFO][icij_worker.backend.mp]: starting 2 worker for app my_app.app
...