F

Flyte enables you to build & deploy data & ML pipelines, hassle-free. The infinitely scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks. Explore and Join the Flyte Community!

AsyncAgentExecutorMixin Cleanup Issue

Summary

The user identifies an issue with the AsyncAgentExecutorMixin where the delete method is only activated by a SIGINT signal, complicating resource cleanup and affecting test case progression. They propose modifying the cleanup process to ensure it runs consistently as a final step in execution, regardless of interruptions, and provide a code snippet using a try-finally block for this purpose. The user also mentions that changing this behavior would necessitate adjustments to the logic of propeller. They inquire about ensuring proper data cleanup during end-to-end tests using the delete method and seek clarification on the need to change propeller logic, questioning whether the delete method is not already called when a terminal state is reached.

Status
resolved
Tags
  • SIGINT
  • Cleanup
  • flyte
  • Code Improvement
  • try-finally block
  • Developer
  • propeller
  • Question
  • Feature Request
  • AsyncAgentExecutorMixin
  • Bug Report
  • Resource Management
Source
#flyte-agents
    e

    eric901201

    11/12/2024

    and you can discuss with other folks from domino, spotify, linkedin ...

    e

    eric901201

    11/12/2024

    Kevin and I wrote most of its code

    e

    eric901201

    11/12/2024

    you can ping me at this channel

    e

    eric901201

    11/12/2024

    is there anything we can help?

    b

    blaketastic2

    11/12/2024

    we are using them

    e

    eric901201

    11/12/2024

    does your company want to use agent?

    e

    eric901201

    11/12/2024

    you are more accurate

    e

    eric901201

    11/12/2024

    case

    e

    eric901201

    11/12/2024

    yes, delete is an exceptional cas

    b

    blaketastic2

    11/12/2024

    and maybe state that "delete" is an exceptional case

    e

    eric901201

    11/12/2024

    I think we missed details, right?

    b

    blaketastic2

    11/12/2024

    I just don't see where it's stated that you would use get to clean up resources in a "happy path"

    e

    eric901201

    11/12/2024

    or do we make it too hard to understand?

    e

    eric901201

    11/12/2024

    do you think the documentation is unclear

    b

    blaketastic2

    11/12/2024

    I didn't realize that delete wasn't always called. From the API and description, I assumed it was create, get, delete always called

    e

    eric901201

    11/12/2024

    companies like linkedin or spotify, in my opinion might do this

    b

    blaketastic2

    11/12/2024

    ah ok

    e

    eric901201

    11/12/2024

    you can write the delete logic in your get method

    e

    eric901201

    11/12/2024

    if terminal state is not succeed, delete will get called

    b

    blaketastic2

    11/12/2024

    If we want to do an end-to-end test and we use the delete method to clean up database resources, how can we ensure that the data is being cleaned out correctly?

    Can you explain more about why we would need to change propeller logic? Does delete not get called already whenever a terminal state is reached?

    b

    blaketastic2

    11/12/2024

    AFK for 30

    e

    eric901201

    11/12/2024

    and also, if we want to change this, we have to also change the logic of propeller

    e

    eric901201

    11/12/2024

    I don't think it is a good way, since if get reach a terminal state, why we need to delete it?

    e

    eric901201

    11/12/2024

    will reply you tonight

    e

    eric901201

    11/12/2024

    this is written by me

    e

    eric901201

    11/12/2024

    hi

    b

    blaketastic2

    11/8/2024

    In the AsyncAgentExecutorMixin https://github.com/flyteorg/flytekit/blob/master/flytekit/extend/backend/base_agent.py#L383|code, the delete method is scheduled in the signal_handler, which is only called when a signal SIGINT is emitted. This is preventing resource clean up as part of normal execution (without any interruptions) and is preventing one of our test cases from progressing.

    What are thoughts on simply making this a final step in the execution, ensuring we get to that code every time? Something like:

        ctx = FlyteContext.current_context()
        ss = ctx.serialization_settings or SerializationSettings(ImageConfig())
        output_prefix = ctx.file_access.get_random_remote_directory()
    
        from flytekit.tools.translator import get_serializable
    
        task_template = get_serializable(OrderedDict(), ss, self).template
        self._agent = AgentRegistry.get_agent(task_template.type, task_template.task_type_version)
    
        resource_meta = asyncio.run(
            self._create(task_template=task_template, output_prefix=output_prefix, inputs=kwargs)
        )
    
        try:
            # Main execution logic
            resource = asyncio.run(self._get(resource_meta=resource_meta))
    
            if resource.phase != TaskExecution.SUCCEEDED:
                raise FlyteUserException(f"Failed to run the task {self.name} with error: {resource.message}")
    
            # Process outputs
            if task_template.interface.outputs and resource.outputs is None:
                local_outputs_file = ctx.file_access.get_random_local_path()
                ctx.file_access.get_data(f"{output_prefix}/outputs.pb", local_outputs_file)
                output_proto = utils.load_proto_from_file(literals_pb2.LiteralMap, local_outputs_file)
                return LiteralMap.from_flyte_idl(output_proto)
    
            if resource.outputs and not isinstance(resource.outputs, LiteralMap):
                return TypeEngine.dict_to_literal_map(ctx, resource.outputs)
    
            return resource.outputs
        finally:
            # Cleanup logic that runs after the try block, even if it returns or raises an exception
            try:
                asyncio.run(self._agent.delete(resource_meta=resource_meta))
            except Exception as e:
                logger.error(f"Error during resource cleanup: {e}")```