Summary
The user identifies an issue with the AsyncAgentExecutorMixin
where the delete method is only activated by a SIGINT signal, complicating resource cleanup and affecting test case progression. They propose modifying the cleanup process to ensure it runs consistently as a final step in execution, regardless of interruptions, and provide a code snippet using a try-finally block for this purpose. The user also mentions that changing this behavior would necessitate adjustments to the logic of propeller. They inquire about ensuring proper data cleanup during end-to-end tests using the delete method and seek clarification on the need to change propeller logic, questioning whether the delete method is not already called when a terminal state is reached.
eric901201
and you can discuss with other folks from domino, spotify, linkedin ...
eric901201
Kevin and I wrote most of its code
eric901201
you can ping me at this channel
eric901201
is there anything we can help?
blaketastic2
we are using them
eric901201
does your company want to use agent?
eric901201
you are more accurate
eric901201
case
eric901201
yes, delete is an exceptional cas
blaketastic2
and maybe state that "delete" is an exceptional case
eric901201
I think we missed details, right?
blaketastic2
I just don't see where it's stated that you would use get to clean up resources in a "happy path"
eric901201
or do we make it too hard to understand?
blaketastic2
eric901201
do you think the documentation is unclear
blaketastic2
I didn't realize that delete wasn't always called. From the API and description, I assumed it was create, get, delete always called
eric901201
companies like linkedin or spotify, in my opinion might do this
blaketastic2
ah ok
eric901201
you can write the delete logic in your get
method
eric901201
if terminal state is not succeed, delete will get called
blaketastic2
If we want to do an end-to-end test and we use the delete method to clean up database resources, how can we ensure that the data is being cleaned out correctly?
Can you explain more about why we would need to change propeller logic? Does delete not get called already whenever a terminal state is reached?
blaketastic2
AFK for 30
eric901201
and also, if we want to change this, we have to also change the logic of propeller
eric901201
I don't think it is a good way, since if get
reach a terminal state, why we need to delete it?
eric901201
will reply you tonight
eric901201
this is written by me
eric901201
hi
blaketastic2
In the AsyncAgentExecutorMixin
https://github.com/flyteorg/flytekit/blob/master/flytekit/extend/backend/base_agent.py#L383|code, the delete method is scheduled in the signal_handler, which is only called when a signal SIGINT is emitted. This is preventing resource clean up as part of normal execution (without any interruptions) and is preventing one of our test cases from progressing.
What are thoughts on simply making this a final step in the execution, ensuring we get to that code every time? Something like:
ctx = FlyteContext.current_context()
ss = ctx.serialization_settings or SerializationSettings(ImageConfig())
output_prefix = ctx.file_access.get_random_remote_directory()
from flytekit.tools.translator import get_serializable
task_template = get_serializable(OrderedDict(), ss, self).template
self._agent = AgentRegistry.get_agent(task_template.type, task_template.task_type_version)
resource_meta = asyncio.run(
self._create(task_template=task_template, output_prefix=output_prefix, inputs=kwargs)
)
try:
# Main execution logic
resource = asyncio.run(self._get(resource_meta=resource_meta))
if resource.phase != TaskExecution.SUCCEEDED:
raise FlyteUserException(f"Failed to run the task {self.name} with error: {resource.message}")
# Process outputs
if task_template.interface.outputs and resource.outputs is None:
local_outputs_file = ctx.file_access.get_random_local_path()
ctx.file_access.get_data(f"{output_prefix}/outputs.pb", local_outputs_file)
output_proto = utils.load_proto_from_file(literals_pb2.LiteralMap, local_outputs_file)
return LiteralMap.from_flyte_idl(output_proto)
if resource.outputs and not isinstance(resource.outputs, LiteralMap):
return TypeEngine.dict_to_literal_map(ctx, resource.outputs)
return resource.outputs
finally:
# Cleanup logic that runs after the try block, even if it returns or raises an exception
try:
asyncio.run(self._agent.delete(resource_meta=resource_meta))
except Exception as e:
logger.error(f"Error during resource cleanup: {e}")```