S

SiftZendeskTest

Hi, We have the following workflow: * We have a `task` that preprocesses data and returns a `list` of `dict` using `json.dumps` as `FlyteFile` * Then we want to use `map_task` to process each dict in list that's in the FlyteFile ```@task def ingest_data() -> pd.DataFrame: return pd.DataFrame(...some Data) @task def preprocess_data(data: pd.DataFrame) -> FlyteFile: rows = [] for _ , row in data.iterrows(): ... some processing rows.append(row) out_path = Path(flytekit.current_context().working_directory) / "preprocessed.json" with out_path.open(mode="w") as output_file: pickle.dumps(input_rows, output_file) return FlyteFile(path=str(out_path)) @task def generate_ai_response(row: ?) -> ?: ai_response = do_some_ai_stuff() return ai_response @workflow def ai_workflow(): data = ingest_data() preprocessed_data = preprocess_data(data) predicted_responses: ? = map_task(generate_ai_response)( row=preprocessed_data )``` We don't understand how to `map_task` over a `FlyteFile` or if that's even possible. Can we deserialise the `preprocessed_data` inside the `workflow` prior to `map_task`? Or how else would we do this? Thank you!

Summary

Hi,

We have the following workflow:

  • We have a task that preprocesses data and returns a list of dict using json.dumps as FlyteFile
  • Then we want to use map_task to process each dict in list that's in the FlyteFile
def ingest_data() -> pd.DataFrame: 
	return pd.DataFrame(...some Data)

@task
def preprocess_data(data: pd.DataFrame) -> FlyteFile:
	rows = []
	for _ , row in data.iterrows(): 
		... some processing
		rows.append(row)
	out_path = Path(flytekit.current_context().working_directory) / "preprocessed.json"  
	with out_path.open(mode="w") as output_file:  
	    pickle.dumps(input_rows, output_file)
	return FlyteFile(path=str(out_path))


@task
def generate_ai_response(row: ?) -> ?:
	ai_response = do_some_ai_stuff()
	return ai_response


@workflow
def ai_workflow(): 
	data = ingest_data()
	preprocessed_data = preprocess_data(data)
	predicted_responses: ? = map_task(generate_ai_response)(  
	    row=preprocessed_data  
	)```
We don't understand how to `map_task` over a `FlyteFile` or if that's even possible. Can we deserialise the `preprocessed_data` inside the `workflow` prior to `map_task`? Or how else would we do this?
Thank you!
Status
resolved
Tags
    Source
    #ask-the-community