F

Flyte enables you to build & deploy data & ML pipelines, hassle-free. The infinitely scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks. Explore and Join the Flyte Community!

Workflow Issues with Flyte and map_task

Summary

The user describes a workflow involving data preprocessing and the use of map_task in Flyte, where they have a task for data ingestion and another for preprocessing into a FlyteFile with a list of dictionaries. They seek clarification on using map_task with a FlyteFile and whether it's possible to deserialize preprocessed_data before applying map_task. Additionally, they report encountering a runtime execution error related to an output file being too large, exceeding the maximum allowed size, and inquire about potential flyte-propeller settings in Helm to resolve this issue.

Status
resolved
Tags
  • Workflow
  • flyte
  • Data Processing
  • FlyteFile
  • Flyte
  • flyte-propeller
  • User
  • Developer
  • Question
  • runtime execution error
  • Developer Help
  • Support Request
  • Workflow Orchestration
Source
#ask-the-community
    c

    curupa

    11/12/2024
    k

    kumare

    11/11/2024

    It’s BYOC, your cloud

    k

    kumare

    11/11/2024

    Pii data is ok, union runs in your cloud

    m

    maartenvanmeeuwen

    11/11/2024

    <@UNZB4NW3S> Thanks, but we have PII data and SLAs require us to run workflows on our hosted k8s cluster.

    k

    kumare

    11/11/2024

    we have released a big thing where you can return extremely large lists without affecting the data limit. the max allowed. Union has a limit till 10GB. What do you think about using union?

    k

    kumare

    11/11/2024

    ohh wait, i dont think we can run map on that :disappointed:

    k

    kumare

    11/11/2024

    <@U07V4T1H2JX> have you thought of using JSONL and JSONIterator?

    m

    maartenvanmeeuwen

    11/11/2024

    Hi <@U0762PD3DME>, we keep running into this issue where:

    Workflow[ai-project:development:main.ntrk_mutation_analysis_workflow] failed. RuntimeExecutionError: max number of system retry attempts [11/10] exhausted. Last known status message: failed at Node[n2]. RuntimeExecutionError: failed during plugin execution, caused by: output file @[<s3://flyte/metadata/propeller/ai-project-development-f93f414e3c13c4a199e2/n2/data/0/outputs.pb>] is too large [50062009] bytes, max allowed [2097152] bytes How can we solve that? Is there some flyte-propeller settings in Helm?

    n

    niels

    11/6/2024

    Hi Maarten, you can’t map over FlyteFile s, but you could output a list[dict] objects (basically rows in data.iterrows()) and you can map over that. You can use https://docs.flyte.org/en/latest/user_guide/data_types_and_io/dataclass.html#id1|dataclasses if you want more structure.

    def ingest_data() -&gt; pd.DataFrame: 
    	return pd.DataFrame(...some Data)
    
    @task
    def preprocess_data(data: pd.DataFrame) -&gt; list[dict]:
    	rows = []
    	for _ , row in data.iterrows(): 
    		... some processing
            row = dict(...)
    		rows.append(row)
    	return rows
    
    
    @task
    def generate_ai_response(row: dict) -&gt; dict:
    	ai_response = do_some_ai_stuff()
    	return ai_response
    
    
    @workflow
    def ai_workflow(): 
    	data = ingest_data()
    	preprocessed_data = preprocess_data(data)
    	predicted_responses = map_task(generate_ai_response)(  
    	    row=preprocessed_data  
    	)```
    
    m

    maartenvanmeeuwen

    11/6/2024

    Hi,

    We have the following workflow:

    • We have a task that preprocesses data and returns a list of dict using json.dumps as FlyteFile
    • Then we want to use map_task to process each dict in list that's in the FlyteFile
    def ingest_data() -&gt; pd.DataFrame: 
    	return pd.DataFrame(...some Data)
    
    @task
    def preprocess_data(data: pd.DataFrame) -&gt; FlyteFile:
    	rows = []
    	for _ , row in data.iterrows(): 
    		... some processing
    		rows.append(row)
    	out_path = Path(flytekit.current_context().working_directory) / "preprocessed.json"  
    	with out_path.open(mode="w") as output_file:  
    	    pickle.dumps(input_rows, output_file)
    	return FlyteFile(path=str(out_path))
    
    
    @task
    def generate_ai_response(row: ?) -&gt; ?:
    	ai_response = do_some_ai_stuff()
    	return ai_response
    
    
    @workflow
    def ai_workflow(): 
    	data = ingest_data()
    	preprocessed_data = preprocess_data(data)
    	predicted_responses: ? = map_task(generate_ai_response)(  
    	    row=preprocessed_data  
    	)```
    We don't understand how to `map_task` over a `FlyteFile` or if that's even possible. Can we deserialise the `preprocessed_data` inside the `workflow` prior to `map_task`? Or how else would we do this?
    Thank you!