Summary
Hi,
We have the following workflow:
task
that preprocesses data and returns a list
of dict
using json.dumps
as FlyteFile
map_task
to process each dict in list that's in the FlyteFiledef ingest_data() -> pd.DataFrame:
return pd.DataFrame(...some Data)
@task
def preprocess_data(data: pd.DataFrame) -> FlyteFile:
rows = []
for _ , row in data.iterrows():
... some processing
rows.append(row)
out_path = Path(flytekit.current_context().working_directory) / "preprocessed.json"
with out_path.open(mode="w") as output_file:
pickle.dumps(input_rows, output_file)
return FlyteFile(path=str(out_path))
@task
def generate_ai_response(row: ?) -> ?:
ai_response = do_some_ai_stuff()
return ai_response
@workflow
def ai_workflow():
data = ingest_data()
preprocessed_data = preprocess_data(data)
predicted_responses: ? = map_task(generate_ai_response)(
row=preprocessed_data
)```
We don't understand how to `map_task` over a `FlyteFile` or if that's even possible. Can we deserialise the `preprocessed_data` inside the `workflow` prior to `map_task`? Or how else would we do this?
Thank you!