Summary
The user describes a workflow involving data preprocessing and the use of map_task
in Flyte, where they have a task for data ingestion and another for preprocessing into a FlyteFile
with a list of dictionaries. They seek clarification on using map_task
with a FlyteFile
and whether it's possible to deserialize preprocessed_data
before applying map_task
. Additionally, they report encountering a runtime execution error related to an output file being too large, exceeding the maximum allowed size, and inquire about potential flyte-propeller
settings in Helm to resolve this issue.
curupa
<@U07V4T1H2JX>, you can set storage.limits.maxDownloadMBs
in your values file: https://github.com/flyteorg/flyte/blob/25cfe16940f10f9bbef02e288c823db16eb37609/charts/flyte-core/values.yaml#L610-L611
kumare
It’s BYOC, your cloud
kumare
Pii data is ok, union runs in your cloud
maartenvanmeeuwen
<@UNZB4NW3S> Thanks, but we have PII data and SLAs require us to run workflows on our hosted k8s cluster.
kumare
we have released a big thing where you can return extremely large lists without affecting the data limit. the max allowed
. Union has a limit till 10GB. What do you think about using union?
kumare
ohh wait, i dont think we can run map on that :disappointed:
kumare
<@U07V4T1H2JX> have you thought of using JSONL and JSONIterator?
maartenvanmeeuwen
Hi <@U0762PD3DME>, we keep running into this issue where:
Workflow[ai-project:development:main.ntrk_mutation_analysis_workflow] failed. RuntimeExecutionError: max number of system retry attempts [11/10] exhausted. Last known status message: failed at Node[n2]. RuntimeExecutionError: failed during plugin execution, caused by: output file @[<s3://flyte/metadata/propeller/ai-project-development-f93f414e3c13c4a199e2/n2/data/0/outputs.pb>] is too large [50062009] bytes, max allowed [2097152] bytes
How can we solve that? Is there some flyte-propeller
settings in Helm?
niels
Hi Maarten, you can’t map over FlyteFile
s, but you could output a list[dict]
objects (basically rows in data.iterrows()
) and you can map over that. You can use https://docs.flyte.org/en/latest/user_guide/data_types_and_io/dataclass.html#id1|dataclasses if you want more structure.
def ingest_data() -> pd.DataFrame:
return pd.DataFrame(...some Data)
@task
def preprocess_data(data: pd.DataFrame) -> list[dict]:
rows = []
for _ , row in data.iterrows():
... some processing
row = dict(...)
rows.append(row)
return rows
@task
def generate_ai_response(row: dict) -> dict:
ai_response = do_some_ai_stuff()
return ai_response
@workflow
def ai_workflow():
data = ingest_data()
preprocessed_data = preprocess_data(data)
predicted_responses = map_task(generate_ai_response)(
row=preprocessed_data
)```
maartenvanmeeuwen
Hi,
We have the following workflow:
task
that preprocesses data and returns a list
of dict
using json.dumps
as FlyteFile
map_task
to process each dict in list that's in the FlyteFiledef ingest_data() -> pd.DataFrame:
return pd.DataFrame(...some Data)
@task
def preprocess_data(data: pd.DataFrame) -> FlyteFile:
rows = []
for _ , row in data.iterrows():
... some processing
rows.append(row)
out_path = Path(flytekit.current_context().working_directory) / "preprocessed.json"
with out_path.open(mode="w") as output_file:
pickle.dumps(input_rows, output_file)
return FlyteFile(path=str(out_path))
@task
def generate_ai_response(row: ?) -> ?:
ai_response = do_some_ai_stuff()
return ai_response
@workflow
def ai_workflow():
data = ingest_data()
preprocessed_data = preprocess_data(data)
predicted_responses: ? = map_task(generate_ai_response)(
row=preprocessed_data
)```
We don't understand how to `map_task` over a `FlyteFile` or if that's even possible. Can we deserialise the `preprocessed_data` inside the `workflow` prior to `map_task`? Or how else would we do this?
Thank you!