F

Flyte enables you to build & deploy data & ML pipelines, hassle-free. The infinitely scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks. Explore and Join the Flyte Community!

Locking Capabilities for Workflows

Summary

The user is looking to add locking capabilities to their workflows to enhance efficiency and shares a code snippet featuring acquire_lock and release_lock functions. They describe a situation where multiple upstream sources can trigger downstream processes that need to run sequentially. The user requests feedback from others with similar experiences and notes that task2() in their example is probably an echo(message="noop") or something similar, and they also suggest using on_failure.

Status
resolved
Tags
    Source
    #flyte-agents
      b

      blaketastic2

      10/7/2024

      yea, was just looking at that :slightly_smiling_face:

      k

      kumare

      10/7/2024

      Please use on_failure as well

      b

      blaketastic2

      10/7/2024

      which more clearly shows that we're skipping the compute

      b

      blaketastic2

      10/7/2024

      also, in my example, the task2() is probably an echo(message="noop") or something

      b

      blaketastic2

      10/7/2024

      We've also talked about a world where the lock waits to acquire using some sensor mechanism, but that's a bit more complicated and can wait

      b

      blaketastic2

      10/7/2024

      Sure. From a high level, we have downstreams that can run if any number of upstream triggers it. If something kicks it off, it just shouldn't run again while the other is running. We are perfectly fine with eventual consistency of running the downstream again on the next scheduled execution.

      e

      eric901201

      10/7/2024

      a more comprehensive example will be appreciated

      e

      eric901201

      10/7/2024

      But as a maintainer, I'm curious the usecase of this

      e

      eric901201

      10/7/2024

      I think theirs no similar in OSS

      b

      blaketastic2

      10/7/2024

      We're looking to add locking capabilities to our workflows(mostly for efficiency gains) like so:

      def locking_wf() -> None:
          acquired, lock_handle = acquire_lock(lock_handle="my_lock", duration_in_seconds=10)
      
          cond = (
              conditional("test lock")
              .if_(acquired.is_true())
              .then(task1(lock_handle))
              .else_()
              .then(task2())
          )
      
          cond > release_lock(lock_handle=lock_handle)```
      Where `acquire_lock` and `release_lock` are Agents responsible for maintaining the lock. Curious if others have similar use-cases and or implementations?