Skip to content

Creating asynchronous tasks

Turning functions into async tasks using icij-worker

Before implementing task for Datashare, let's learn how to transform any Python function into a task which can be run asynchronously by a worker using the icij-worker lib.

Given the following function:

def hello_world() -> str:
    return "Hello world"

the icij-worker lib lets you very simply turn it into a task executed asynchronously by a worker:

from icij_worker import AsyncApp

app = AsyncApp("some-app")


@app.task
def hello_world() -> str:
    return "Hello world"

Let's break down the above code into pieces. First we start by defining an app and give it a name:

from icij_worker import AsyncApp

app = AsyncApp("some-app")


@app.task
def hello_world() -> str:
    return "Hello world"
the we use the @app.task decorator to register our hello_world function as a task app:
from icij_worker import AsyncApp

app = AsyncApp("some-app")


@app.task
def hello_world() -> str:
    return "Hello world"

... and that's about it, we've turned our hello_world very complex ML pipeline function (🧌-ing) into as an async task !

Registering tasks with names

The app AsyncApp instance acts as task registry. By default, task are registered using the decorated function name.

Tip

To avoid conflicts and bugs, it's highly recommended to register tasks using a name.

This will decouple the task name from the Python function name and avoid naming issues when you renaming/refactoing task functions.

Applying the above tip, we can register our task with the hello_world name:

from icij_worker import AsyncApp
from icij_worker.typing_ import RateProgress

app = AsyncApp("some-app")


@app.task(name="hello_world")
def hello_world() -> str:
    return "Hello world"

Task arguments

Since tasks are just plain Python function, they support input arguments.

For instance, let's register a new task which greets a user given as a str. When no user is provided, let's call it unknown:

@app.task(name="hello_user")
def hello_user(user: str | None) -> str:
    greeting = "Hello "
    if user is None:
        user = "unknown"
    return greeting + user

Task result

The task result is simply the value returned by the decorated function.

Supported argument and result types

icij-worker tasks support any types as arguments and result as long as they are JSON serializable.

Tip

If you are used to using libs such as pydantic, your task can have dict arguments, which you can easily convert into pydantic.Model. Similarly, you will have to make sure to convert output into from pydantic.Model to a dict using the pydantic.Model.dict method.

Publishing progress updates

For long-running task (longer than the one above), it can be useful to publish progress updates in order to monitor the task execution.

This can be done by adding a progress: RateProgress argument to task functions. The progress argument name is reserved and icij-worker will automatically populate this argument with a async coroutine that you can call to publish progress updates. Progress updates are expected to be between 0.0 and 1.0 included:

from icij_worker.typing_ import RateProgress

@app.task(name="hello_user_progress")
async def hello_user_progress(user: str | None, progress: RateProgress) -> str:
    greeting = "Hello "
    await progress(0.5)
    if user is None:
        user = "unknown"
    res = greeting + user
    await progress(1)
    return res

Note

In practice, RateProgress = Callable[[float], Awaitable[None]], which mean that publishing tasks progress is performed through an asyncio coroutine. To support publishing progress updates, task functions have to be async.

Notice, how hello_user_progress is defined using async def

Next

You'll learn see what a full async app looks like !