Summary
The user is looking for design documentation on the Signals feature to better understand its functionality. They are facing issues with sending signals in their Flyte workflow, specifically encountering a FlyteInvalidInputException
when tasks fail. The problem arises from a timing mismatch where the signal-waiting task is not ready to receive signals from the model subworkflow, which attempts to send the signal 30 to 40 seconds after spawning. This results in the signal being missed, causing the workflow to hang until it times out. The user discovered that the gate node is not ready or the signal is not registered by checking the list of signals by execution ID, which returned an empty list. They are seeking insights or metrics to help debug this issue further.
nihar.pawade
Thank you.
nihar.pawade
| the fact that these are dynamic workflows doesn’t really matter right? it does in our usecase because number of gate nodes and node name depends upon the input config
ytong
let us think about it.
ytong
and yeah i think i get it… this wait for input gate node was not written with this in mind (getting fulfilled by something else in the same workflow)
ytong
this set up can be done with static wfs and you’d have the same problem
ytong
the fact that these are dynamic workflows doesn’t really matter right?
nihar.pawade
to back my hypothesis screenshot is the failed run with same issue where the first highlighted with google_bid_model
is the fate node which got spawned after second highlighted task failed as it tried to set signal when the gate node was yet to be ready, after which the below task (on_failure cut in screenshot) was able to set signal as it was available by then on timeline
nihar.pawade
______________________. ___________________ | parent/dyanamic 1 |. | parent/dynamic 2 | _______________________. ____________________ | | | | signal_node <<<----------- task that sets signal
ytong
│ │
│ parent │
│ ├──┐
└──┬────────────┘ │
│ │
┌────────▼─┐ ┌───▼────────┐
│ signal │◄────────┤ task that │
│ node │ │ sets signal│
└──────────┘ └────────────┘ ```
nihar.pawade
yes but the 2 tasks are different which spawns the gate node and the ones which sets the signals
ytong
so there’s two things basically right? let’s simplify it a bit, some parent nodes run, and then a signal node and something else that will set that signal node, get run at the same time.
nihar.pawade
honestly what i feel is since we are using dynamic to spin up gate nodes dynamically, in few cases the wait_model_signals
pod might be getting scheduled on different node, which takes its own time to start execution of wait_model_signals
which spins up the gate nodes. till that point the model completes the execution and tries to set the signal which does not exists/registered yet
nihar.pawade
so the subworkflow has the implementation to set the signal with values Success or Fail from either • the subworkflow has model_on_success_container_task to set signal with Success value • in case anything fails in sub-workflow the on-error task has implementaion to set signal with Failure value
nihar.pawade
oh my bad i completely forgot to add workflow func implementations from run models, so we are dynamically determining the sub-workflow name depending on the input criterial, all of the subworkflows are imported and maintained in map/dict
ytong
these are input signal nodes right? they require someone to fulfill the signal, are you saying there’s another workflow that does that? (as opposed to someone typing it in the UI)
ytong
do you think you could write a minimal repro? something we can run on our end? I’m still not entirely sure what’s happening.
nihar.pawade
please let me know if its still not clear
nihar.pawade
<@UNR3C6Y4T> ive edited and simplified above source to represent the way we are using it
ytong
where is the parent workflow? like where are check_and_run_frequent_actions and model_signal_waits defined?
ytong
i’m not sure i understand this…
dubovikov.kirill
<@UNR3C6Y4T> do you think this is an expected behaviour? For now, we have resolved it by manually retry-waiting till we find that the signal is available and only then we start listening
nihar.pawade
<@UNR3C6Y4T> above is the code specifically for the blocks which are giving us issues in production
divyank.agarwal
we tried using >>
operator, but it doesn't work.
a simple code is Task A : wait_for_input(signal_name) Task B : send_signal( signal_name)
A>> B :x: ( blocked as B send signal and A will not recieve signal) B >> A :x: ( Failure as receiver is not ready)
so task A and Task B has to be parallel. and ideal Sequence is : • Task A start • Task B start • Task B end • Task A end but under heavy load. there is no guarantee that Task A will start before Task B start.
ytong
have you tried using the >>
operator?
ytong
can you share the code?
nihar.pawade
Hi Team, here is more context about the issue that we are facing:
• In our Flyte workflow, we have model tasks/subworkflows that trigger a signal to a signal-waiting task upon successful or failed execution. • However, there's a timing mismatch where, by the time the subworkflow completes and attempts to send the signal, the signal-wait task is not always ready to receive it, leading to an error. • Both the model subworkflow and the signal-wait (gate node) are spawned simultaneously, as they share a common predecessor. Importantly, the model subworkflow generally attempts to send the signal 30 to 40 seconds after it is spawned. Despite this delay, the signal-wait task is sometimes not ready, causing the signal to be missed, which results in the workflow hanging until it eventually times out.
[f50afe3993cc347818cc-n8-0-dn1-0-dn1-0] terminated with exit code (1). Reason [Error]. Message:
7818cc} google_bid_models}] exists, err: [signal does not exist]"}"
>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/root/flyte/pipeline/optimization_engine/utils/on_success_utility.py", line 30, in <module>
on_success_task(execution_name, signal_name, msg)
File "/root/flyte/pipeline/optimization_engine/utils/on_success_utility.py", line 22, in on_success_task
return send_signal(execution_name, signal_name, msg)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/root/flyte/pipeline/optimization_engine/utils/utils.py", line 61, in send_signal
remote.set_signal(signal_name, execution_name, signal_value)
File "/usr/local/lib/python3.11/site-packages/flytekit/remote/remote.py", line 524, in set_signal
self.client.set_signal(req)
File "/usr/local/lib/python3.11/site-packages/flytekit/clients/raw.py", line 159, in set_signal
return self._signal.SetSignal(signal_set_request, metadata=self._metadata)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/grpc/_interceptor.py", line 277, in __call__
response, ignored_call = self._with_call(
^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/grpc/_interceptor.py", line 329, in _with_call
call = self._interceptor.intercept_unary_unary(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/flytekit/clients/grpc_utils/wrap_exception_interceptor.py", line 44, in intercept_unary_unary
raise e
File "/usr/local/lib/python3.11/site-packages/flytekit/clients/grpc_utils/wrap_exception_interceptor.py", line 40, in intercept_unary_unary
self._raise_if_exc(request, e)
File "/usr/local/lib/python3.11/site-packages/flytekit/clients/grpc_utils/wrap_exception_interceptor.py", line 30, in _raise_if_exc
raise FlyteInvalidInputException(request) from e
flytekit.exceptions.user.FlyteInvalidInputException```
how did we discover that gate node is not ready or signal is not registered ?
• we are printing list_signals by execution id which is returning empty list at that time
has anyone faced similar issue or any metrics that we can check to debug this more ?
cc <@U06GQUG37ME> <@U06MQ3WEUBS>
*attached wf screenshot :* where the on success task tried sending signal to gate node but failed, but again when the on error task tried sending fail signal, it succeeded
divyank.agarwal
found this https://www.youtube.com/watch?v=wiOTEOVKb8s
divyank.agarwal
Hi team. I am looking for design documentation of the Signals feature. Want to understand how it works in detail.
What problem I am facing.
we are sending signals to continue the workflow if any of its task fail. However sometimes we are getting FlyteInvalidInputException
. What we understand is that signal Gate Node is not ready to receive the signal and hence the failure