Summary
The user is looking to add locking capabilities to their workflows to enhance efficiency and shares a code snippet featuring acquire_lock
and release_lock
functions. They describe a situation where multiple upstream sources can trigger downstream processes that need to run sequentially. The user requests feedback from others with similar experiences and notes that task2()
in their example is probably an echo(message="noop")
or something similar, and they also suggest using on_failure
.
blaketastic2
yea, was just looking at that :slightly_smiling_face:
kumare
Please use on_failure as well
blaketastic2
which more clearly shows that we're skipping the compute
blaketastic2
also, in my example, the task2()
is probably an echo(message="noop")
or something
blaketastic2
We've also talked about a world where the lock waits to acquire using some sensor mechanism, but that's a bit more complicated and can wait
blaketastic2
Sure. From a high level, we have downstreams that can run if any number of upstream triggers it. If something kicks it off, it just shouldn't run again while the other is running. We are perfectly fine with eventual consistency of running the downstream again on the next scheduled execution.
eric901201
a more comprehensive example will be appreciated
eric901201
But as a maintainer, I'm curious the usecase of this
eric901201
I think theirs no similar in OSS
blaketastic2
We're looking to add locking capabilities to our workflows(mostly for efficiency gains) like so:
def locking_wf() -> None:
acquired, lock_handle = acquire_lock(lock_handle="my_lock", duration_in_seconds=10)
cond = (
conditional("test lock")
.if_(acquired.is_true())
.then(task1(lock_handle))
.else_()
.then(task2())
)
cond > release_lock(lock_handle=lock_handle)```
Where `acquire_lock` and `release_lock` are Agents responsible for maintaining the lock. Curious if others have similar use-cases and or implementations?