F

Flyte enables you to build & deploy data & ML pipelines, hassle-free. The infinitely scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks. Explore and Join the Flyte Community!

Issues with Flyte Signals Feature

Summary

The user is looking for design documentation on the Signals feature to better understand its functionality. They are facing issues with sending signals in their Flyte workflow, specifically encountering a FlyteInvalidInputException when tasks fail. The problem arises from a timing mismatch where the signal-waiting task is not ready to receive signals from the model subworkflow, which attempts to send the signal 30 to 40 seconds after spawning. This results in the signal being missed, causing the workflow to hang until it times out. The user discovered that the gate node is not ready or the signal is not registered by checking the list of signals by execution ID, which returned an empty list. They are seeking insights or metrics to help debug this issue further.

Status
open
Tags
  • Signals
  • Error
  • Documentation Issue
  • flyte
  • Flyte
  • Documentation
  • User
  • Feature
  • Support Need
  • FlyteInvalidInputException
  • 30 to 40 seconds
  • Question
  • Developer Help
Source
#ask-the-community
    n

    nihar.pawade

    11/13/2024

    Thank you.

    n

    nihar.pawade

    11/13/2024

    | the fact that these are dynamic workflows doesn’t really matter right? it does in our usecase because number of gate nodes and node name depends upon the input config

    y

    ytong

    11/13/2024

    let us think about it.

    y

    ytong

    11/13/2024

    and yeah i think i get it… this wait for input gate node was not written with this in mind (getting fulfilled by something else in the same workflow)

    y

    ytong

    11/13/2024

    this set up can be done with static wfs and you’d have the same problem

    y

    ytong

    11/13/2024

    the fact that these are dynamic workflows doesn’t really matter right?

    n

    nihar.pawade

    11/13/2024

    to back my hypothesis screenshot is the failed run with same issue where the first highlighted with google_bid_model is the fate node which got spawned after second highlighted task failed as it tried to set signal when the gate node was yet to be ready, after which the below task (on_failure cut in screenshot) was able to set signal as it was available by then on timeline

    n

    nihar.pawade

    11/13/2024

    ______________________. ___________________ | parent/dyanamic 1 |. | parent/dynamic 2 | _______________________. ____________________ | | | | signal_node <<<----------- task that sets signal

    y

    ytong

    11/13/2024
            │               │             
            │  parent       │             
            │               ├──┐          
            └──┬────────────┘  │          
               │               │          
      ┌────────▼─┐         ┌───▼────────┐  
      │  signal  │◄────────┤ task that  │ 
      │   node   │         │ sets signal│ 
      └──────────┘         └────────────┘ ```
    
    n

    nihar.pawade

    11/13/2024

    yes but the 2 tasks are different which spawns the gate node and the ones which sets the signals

    y

    ytong

    11/13/2024

    so there’s two things basically right? let’s simplify it a bit, some parent nodes run, and then a signal node and something else that will set that signal node, get run at the same time.

    n

    nihar.pawade

    11/13/2024

    honestly what i feel is since we are using dynamic to spin up gate nodes dynamically, in few cases the wait_model_signals pod might be getting scheduled on different node, which takes its own time to start execution of wait_model_signals which spins up the gate nodes. till that point the model completes the execution and tries to set the signal which does not exists/registered yet

    n

    nihar.pawade

    11/13/2024

    so the subworkflow has the implementation to set the signal with values Success or Fail from either • the subworkflow has model_on_success_container_task to set signal with Success value • in case anything fails in sub-workflow the on-error task has implementaion to set signal with Failure value

    n

    nihar.pawade

    11/13/2024

    oh my bad i completely forgot to add workflow func implementations from run models, so we are dynamically determining the sub-workflow name depending on the input criterial, all of the subworkflows are imported and maintained in map/dict

    y

    ytong

    11/13/2024

    these are input signal nodes right? they require someone to fulfill the signal, are you saying there’s another workflow that does that? (as opposed to someone typing it in the UI)

    y

    ytong

    11/13/2024

    do you think you could write a minimal repro? something we can run on our end? I’m still not entirely sure what’s happening.

    n

    nihar.pawade

    11/13/2024

    please let me know if its still not clear

    n

    nihar.pawade

    11/13/2024

    <@UNR3C6Y4T> ive edited and simplified above source to represent the way we are using it

    y

    ytong

    11/11/2024

    where is the parent workflow? like where are check_and_run_frequent_actions and model_signal_waits defined?

    y

    ytong

    11/11/2024

    i’m not sure i understand this…

    d

    dubovikov.kirill

    11/8/2024

    <@UNR3C6Y4T> do you think this is an expected behaviour? For now, we have resolved it by manually retry-waiting till we find that the signal is available and only then we start listening

    n

    nihar.pawade

    10/31/2024

    <@UNR3C6Y4T> above is the code specifically for the blocks which are giving us issues in production

    d

    divyank.agarwal

    10/31/2024

    we tried using &gt;&gt; operator, but it doesn't work.

    a simple code is Task A : wait_for_input(signal_name) Task B : send_signal( signal_name)

    A>> B :x: ( blocked as B send signal and A will not recieve signal) B >> A :x: ( Failure as receiver is not ready)

    so task A and Task B has to be parallel. and ideal Sequence is : • Task A start • Task B start • Task B end • Task A end but under heavy load. there is no guarantee that Task A will start before Task B start.

    y

    ytong

    10/30/2024

    have you tried using the &gt;&gt; operator?

    y

    ytong

    10/30/2024

    can you share the code?

    n

    nihar.pawade

    10/29/2024

    Hi Team, here is more context about the issue that we are facing:

    • In our Flyte workflow, we have model tasks/subworkflows that trigger a signal to a signal-waiting task upon successful or failed execution. • However, there's a timing mismatch where, by the time the subworkflow completes and attempts to send the signal, the signal-wait task is not always ready to receive it, leading to an error. • Both the model subworkflow and the signal-wait (gate node) are spawned simultaneously, as they share a common predecessor. Importantly, the model subworkflow generally attempts to send the signal 30 to 40 seconds after it is spawned. Despite this delay, the signal-wait task is sometimes not ready, causing the signal to be missed, which results in the workflow hanging until it eventually times out.

    [f50afe3993cc347818cc-n8-0-dn1-0-dn1-0] terminated with exit code (1). Reason [Error]. Message: 
    7818cc} google_bid_models}] exists, err: [signal does not exist]"}"
    &gt;
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "/root/flyte/pipeline/optimization_engine/utils/on_success_utility.py", line 30, in &lt;module&gt;
        on_success_task(execution_name, signal_name, msg)
      File "/root/flyte/pipeline/optimization_engine/utils/on_success_utility.py", line 22, in on_success_task
        return send_signal(execution_name, signal_name, msg)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/root/flyte/pipeline/optimization_engine/utils/utils.py", line 61, in send_signal
        remote.set_signal(signal_name, execution_name, signal_value)
      File "/usr/local/lib/python3.11/site-packages/flytekit/remote/remote.py", line 524, in set_signal
        self.client.set_signal(req)
      File "/usr/local/lib/python3.11/site-packages/flytekit/clients/raw.py", line 159, in set_signal
        return self._signal.SetSignal(signal_set_request, metadata=self._metadata)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/grpc/_interceptor.py", line 277, in __call__
        response, ignored_call = self._with_call(
                                 ^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/grpc/_interceptor.py", line 329, in _with_call
        call = self._interceptor.intercept_unary_unary(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/usr/local/lib/python3.11/site-packages/flytekit/clients/grpc_utils/wrap_exception_interceptor.py", line 44, in intercept_unary_unary
        raise e
      File "/usr/local/lib/python3.11/site-packages/flytekit/clients/grpc_utils/wrap_exception_interceptor.py", line 40, in intercept_unary_unary
        self._raise_if_exc(request, e)
      File "/usr/local/lib/python3.11/site-packages/flytekit/clients/grpc_utils/wrap_exception_interceptor.py", line 30, in _raise_if_exc
        raise FlyteInvalidInputException(request) from e
    flytekit.exceptions.user.FlyteInvalidInputException```
    how did we discover that gate node is not ready or signal is not registered ?
    • we are printing list_signals by execution id which is returning empty list at that time
    has anyone faced similar issue or any metrics that we can check to debug this more ?
    cc <@U06GQUG37ME> <@U06MQ3WEUBS>
    
    *attached wf screenshot  :* where the on success task tried sending signal to gate node but failed, but again when the on error task tried sending fail signal, it succeeded
    
    d

    divyank.agarwal

    10/29/2024
    d

    divyank.agarwal

    10/29/2024

    Hi team. I am looking for design documentation of the Signals feature. Want to understand how it works in detail.

    What problem I am facing. we are sending signals to continue the workflow if any of its task fail. However sometimes we are getting FlyteInvalidInputException . What we understand is that signal Gate Node is not ready to receive the signal and hence the failure