Basic Datashare worker¶
Initialize a worker python package¶
Project basic-worker initialized !cd basic-worker
Implement an activity function¶
Let's implement a simple activity function taking a user: dict | None argument as input and greeting
this user. The function performing this task is:
def hello_user(user: dict | None) -> str:
greeting = "Hello "
user = "unknown" if user is None else user["id"]
return greeting + user
Register an activity¶
Next, we'll need to turn our task function into a temporal activity under the "hello_user" name and register
inside the ACTIVITIES variable:
from datashare_python.utils import activity_defn
@activity_defn(name="hello_user") # (1)!
def hello_user(user: dict | None) -> str:
greeting = "Hello "
user = "unknown" if user is None else user["id"]
return greeting + user
ACTIVITIES = [hello_user] # (2)!
- decorate the activity function with
@activity_defnusing"hello-user"as name - expose the activity function to
datashare-python's CLI, by listing it in theACTIVITIESvariable
Under the hood, the ACTIVITIES is registered as
plugin entrypoint
in the package's pyproject.toml:
[project.entry-points."datashare.activities"]
activities = "basic_worker.activities:ACTIVITIES"
datashare-python worker start CLI, datashare-python will look for any variable registered under
the "datashare.activities" key and will be able to run activities registered in these variables.
You can register as many variables as you want, under the names of your choices, as long as it's registered under
the "datashare.activities" key.
Implement and register a workflow¶
Temporal activities run inside workflows. In our case the workflow takes args: dict as input
(like all Datashare workflows) and just run our our hello_world activity function. In plain Python code, this is:
from .activities import hello_user
def run_hello_world_workflow(args: dict) -> str:
user = args.get("user")
return hello_user(user)
In practice, we need to turn our workflow function into an actual temporal workflow like so:
from datetime import timedelta
from temporalio import workflow
from .activities import hello_user
@workflow.defn(name="hello-user") # (1)!
class HelloUserWorkflow:
@workflow.run # (2)!
async def run_hello_world_workflow(self, args: dict) -> str:
user = args.get("user") # (3)!
return await workflow.execute_activity(
hello_user, # (4)!
user,
start_to_close_timeout=timedelta(seconds=10),
)
WORKFLOWS = [HelloUserWorkflow] # (5)!
- decorate the workflow class with
@workflow.defnusing"hello-user"as name - decorate
runfunction with@workflow.run - get the
userfrom workflow's args - execute our
hello_useractivity with the user - expose the workflow class to
datashare-python's CLI, by listing it in theWORKFLOWSvariable
Just like for activities, our workflow is exposed to datashare-python's CLI under the WORKFLOWS variable,
bound in the pyproject.toml:
[project.entry-points."datashare.workflows"]
workflows = "basic_worker.workflows:WORKFLOWS"
We've built a very simple workflows, but in practice, real workflows can be arbitrarily complex.
Next¶
Now that you have created a basic app, you can either:
- learn how to build a docker image from it
- learn how to implement a more realistic worker in the advanced example