Skip to content

Basic Datashare worker

Initialize a worker python package

uvx datashare-python project init basic-workerInitializing basic-worker worker project in .
Project basic-worker initialized !
cd basic-worker

Implement an activity function

Let's implement a simple activity function taking a user: dict | None argument as input and greeting this user. The function performing this task is:

basic_worker/activities.py
def hello_user(user: dict | None) -> str:
    greeting = "Hello "
    user = "unknown" if user is None else user["id"]
    return greeting + user

Register an activity

Next, we'll need to turn our task function into a temporal activity under the "hello_user" name and register inside the ACTIVITIES variable:

basic_worker/activities.py
from datashare_python.utils import activity_defn


@activity_defn(name="hello_user")  # (1)!
def hello_user(user: dict | None) -> str:
    greeting = "Hello "
    user = "unknown" if user is None else user["id"]
    return greeting + user


ACTIVITIES = [hello_user]  # (2)!
  1. decorate the activity function with @activity_defn using "hello-user" as name
  2. expose the activity function to datashare-python's CLI, by listing it in the ACTIVITIES variable

Under the hood, the ACTIVITIES is registered as plugin entrypoint in the package's pyproject.toml:

pyproject.toml
[project.entry-points."datashare.activities"]
activities = "basic_worker.activities:ACTIVITIES"
When running a worker using datashare-python worker start CLI, datashare-python will look for any variable registered under the "datashare.activities" key and will be able to run activities registered in these variables.

You can register as many variables as you want, under the names of your choices, as long as it's registered under the "datashare.activities" key.

Implement and register a workflow

Temporal activities run inside workflows. In our case the workflow takes args: dict as input (like all Datashare workflows) and just run our our hello_world activity function. In plain Python code, this is:

from .activities import hello_user


def run_hello_world_workflow(args: dict) -> str:
    user = args.get("user")
    return hello_user(user)

In practice, we need to turn our workflow function into an actual temporal workflow like so:

basic_worker/workflows.py
from datetime import timedelta

from temporalio import workflow

from .activities import hello_user


@workflow.defn(name="hello-user")  # (1)!
class HelloUserWorkflow:
    @workflow.run  # (2)!
    async def run_hello_world_workflow(self, args: dict) -> str:
        user = args.get("user")  # (3)!
        return await workflow.execute_activity(
            hello_user,  # (4)!
            user,
            start_to_close_timeout=timedelta(seconds=10),
        )


WORKFLOWS = [HelloUserWorkflow]  # (5)!
  1. decorate the workflow class with @workflow.defn using "hello-user" as name
  2. decorate runfunction with @workflow.run
  3. get the user from workflow's args
  4. execute our hello_user activity with the user
  5. expose the workflow class to datashare-python's CLI, by listing it in the WORKFLOWS variable

Just like for activities, our workflow is exposed to datashare-python's CLI under the WORKFLOWS variable, bound in the pyproject.toml:

pyproject.toml
[project.entry-points."datashare.workflows"]
workflows = "basic_worker.workflows:WORKFLOWS"

We've built a very simple workflows, but in practice, real workflows can be arbitrarily complex.

Next

Now that you have created a basic app, you can either: