Creating asynchronous tasks¶
Turning functions into async tasks using icij-worker
¶
Before implementing task for Datashare, let's learn how to transform any Python function into a task which can be run asynchronously by a worker using the icij-worker lib.
Given the following function:
the icij-worker
lib lets you very simply turn it into a task executed asynchronously by a worker:
from icij_worker import AsyncApp
app = AsyncApp("some-app")
@app.task
def hello_world() -> str:
return "Hello world"
Let's break down the above code into pieces. First we start by defining an app and give it a name:
from icij_worker import AsyncApp
app = AsyncApp("some-app")
@app.task
def hello_world() -> str:
return "Hello world"
@app.task
decorator to register our hello_world
function as a task app:
from icij_worker import AsyncApp
app = AsyncApp("some-app")
@app.task
def hello_world() -> str:
return "Hello world"
... and that's about it, we've turned our hello_world
very complex ML pipeline function (🧌-ing) into as an async task !
Registering tasks with names¶
The app
AsyncApp
instance acts as task registry. By default, task are registered using the decorated function name.
Tip
To avoid conflicts and bugs, it's highly recommended to register tasks using a name.
This will decouple the task name from the Python function name and avoid naming issues when you renaming/refactoing task functions.
Applying the above tip, we can register our task with the hello_world
name:
from icij_worker import AsyncApp
from icij_worker.typing_ import RateProgress
app = AsyncApp("some-app")
@app.task(name="hello_world")
def hello_world() -> str:
return "Hello world"
Task arguments¶
Since tasks are just plain Python function, they support input arguments.
For instance, let's register a new task which greets a user given as a str
. When no user is provided, let's call it unknown
:
@app.task(name="hello_user")
def hello_user(user: str | None) -> str:
greeting = "Hello "
if user is None:
user = "unknown"
return greeting + user
Task result¶
The task result is simply the value returned by the decorated function.
Supported argument and result types¶
icij-worker
tasks support any types as arguments and result as long as they are JSON serializable.
Tip
If you are used to using libs such as pydantic, your task can have dict
arguments, which you can easily convert into pydantic.Model
.
Similarly, you will have to make sure to convert output into from pydantic.Model
to a dict
using the pydantic.Model.dict
method.
Publishing progress updates¶
For long-running task (longer than the one above), it can be useful to publish progress updates in order to monitor the task execution.
This can be done by adding a progress: RateProgress
argument to task functions.
The progress
argument name is reserved and icij-worker
will automatically populate this argument with a async coroutine that you can call to publish progress updates.
Progress updates are expected to be between 0.0
and 1.0
included:
from icij_worker.typing_ import RateProgress
@app.task(name="hello_user_progress")
async def hello_user_progress(user: str | None, progress: RateProgress) -> str:
greeting = "Hello "
await progress(0.5)
if user is None:
user = "unknown"
res = greeting + user
await progress(1)
return res
Note
In practice, RateProgress = Callable[[float], Awaitable[None]]
, which mean that publishing tasks progress is performed through an asyncio
coroutine.
To support publishing progress updates, task functions have to be async.
Notice, how hello_user_progress
is defined using async def
Next¶
You'll learn see what a full async app looks like !