Skip to content

Create tasks for Datashare

In this section, you'll learn how to update our app to run together with Datashare.

Communicating with Datashare's via AMQP

Datashare features its own task manager used to execute many built-in tasks. This task manager can be configured in many different ways and in particular it can be configured to use AMQP via RabbitMQ to manage tasks.

Having Datashare's task manager powered by AMQP allows us to integrate our custom tasks in Datashare's backend:

  • we'll use Datashare's task manager to create custom tasks
  • the task manager will publish custom tasks on the AMQP broker
  • our Python worker will receive the tasks and execute them
  • the workers will return their results to the task manager

The above workflow however requires the worker to be properly configured to communicate with Datashare's task manager.

Aligning worker's config with Datashare

If Datashare uses non default RabbitMQ config make sure to update your config to match Datashare's RabbitMQ config.

Here is how to do so via the config file:

app_config.json
{
  "type": "amqp",
  "rabbitmq_host":  "your-rabbitmq-host",
  "rabbitmq_management_port": 16666,
  "rabbitmq_password": "******",
  "rabbitmq_port": 6666,
  "rabbitmq_user":  "your-rabbitmq-user",
  "rabbitmq_vhost": "somevhost"
}

If you prefer using env vars, you can update the above variables by setting the ICIJ_WORKER_<KEY> variables where<KEY> is one of the above JSON's keys.

Read the full guide to worker config to learn more.

Grouping our tasks in the PYTHON task group

Inside Datashare, tasks can be grouped together in order to be executed by specific workers. In particular, Datashare defines the JAVA task group reserved for built-in tasks (such as indexing, batch searches/downloads...) and the PYTHON task group reserved for your custom tasks.

When tasks are created and published inside Datashare by the task manager, it's possible to assign them with a group, allowing the task to be routed to specific workers.

When creating publishing tasks with the PYTHON group, this will ensure only your workers will receive the tasks (and not builtin Java workers).

If you don't use the PYTHON task group your worker will receive the builtin tasks reserved for Java workers, since they can't execute these tasks, they will fail.

In order, to assign a task to a specific group, you can use the group: str | TaskGroup | None argument of the task decorator.

Read the full guide to task routing to learn more.

Adding the mandatory user: dict | None argument to all tasks

Datashare systematically adds a user: dict | None to task arguments. Since task arguments are directly forwarded your function, they need to support the user argument even when used, otherwise the task will fail complaining that the user argument was provided as input but is not used by the task.

Our first Datashare app

The app can hence be updated as following:

my_app.py
from icij_worker import AsyncApp
from icij_worker.app import TaskGroup
from icij_worker.typing_ import RateProgress

app = AsyncApp("some-app")

PYTHON_TASK_GROUP = TaskGroup(name="PYTHON")


@app.task(name="hello_world", group=PYTHON_TASK_GROUP)
def hello_world(user: dict | None) -> str:  # pylint: disable=unused-argument
    return "Hello world"


@app.task(name="hello_user", group=PYTHON_TASK_GROUP)
def hello_user(user: dict | None) -> str:
    greeting = "Hello "
    if user is None:
        user = "unknown"
    else:
        user = user["id"]
    return greeting + user


@app.task(name="hello_user_progress", group=PYTHON_TASK_GROUP)
async def hello_user_progress(user: dict | None, progress: RateProgress) -> str:
    greeting = "Hello "
    await progress(0.5)
    if user is None:
        user = "unknown"
    else:
        user = user["id"]
    res = greeting + user
    await progress(1)
    return res

Running group-specific workers

As detailed in the task routing guide, worker pools can be restricted to execute tasks of a given group.

We can hence start our PYTHON worker pool adding the -g PYTHON argument:

python -m icij-worker workers start -c app_config.json -g PYTHON -n 2 app.app[INFO][icij_worker.backend.backend]: Loading worker configuration from env...
[INFO][icij_worker.backend.backend]: worker config: {
...
}
[INFO][icij_worker.backend.mp]: starting 2 worker for app my_app.app
...