Summary
The user is facing issues with cache hits when executing Flyte tasks using FlyteRemote, particularly with a PythonPickle blob as input. They have tried using the Annotated HashMethod for input types but have not been successful since their input is a Python type rather than a Flyte-generated type. The user suspects that FlyteRemote is re-uploading the pickle file as a new input, which hinders cache hits. They are exploring the option of providing a custom HashMethod for caching non-Flyte offloaded objects and are looking for solutions or workarounds. The user notes that cache hits work when running the task locally but not with FlyteRemote. They also suggest that using FlyteFile
with a specified remote_path
for uploading videos to an S3 bucket might help with caching, although they are uncertain if this will work with FlyteRemote.
peter.kun
thanks, I will try it out
curupa
<@U06FV6LLKSQ>, FYI, I https://github.com/flyteorg/flyte/issues/5823#issuecomment-2420592440|replied on the gh issue. This should be enough to unblock your use of flyteremote.
peter.kun
not at all, here it is: https://github.com/flyteorg/flyte/issues/5823
ytong
I think we should support this. Mind creating a gh issue for this <@U06FV6LLKSQ>?
niels.bantilan
another solution might be to use FlyteFile
and specify the remote_path
argument, where the value is some s3 bucket path where you want to upload your videos. I’m not 100% certain this works with FlyteRemote
, but that should make caching work.
niels.bantilan
<@U0265RTUJ5B> <@UNR3C6Y4T> ^^
niels.bantilan
I don’t think this is possible today
peter.kun
Yes, but what if I don't have? that was my very first question. I would like to execute tasks by FlyteRemote, so the inputs are coming from Python and not from a flyte task.
niels.bantilan
def produce_video(...) -> Annotated[VideoRecord, HashMethod(...)]:
...
@task
def consume_video(video_record: VideoRecord): ...```
niels.bantilan
so in your case, you’d need a task that produces the VideoRecord
object:
niels.bantilan
> Note how the output of task foo
is annotated with an object of type HashMethod
. Essentially, it represents a function that produces a hash that is used as part of the cache key calculation in calling the task bar
.
niels.bantilan
oh wait… you’re annotating the input with HashMethod
? I don’t that that’ll work…
You need to annotated the output of a task with HashMethod
. (referring to the <https://docs.flyte.org/en/latest/user_guide/development_lifecycle/caching.html#caching-of-non-flyte-offloaded-objects|docs example>)
return str(pandas.util.hash_pandas_object(df))
@task
def foo_1( # noqa: F811
a: int,
b: str, # noqa: F821
) -> Annotated[pandas.DataFrame, HashMethod(hash_pandas_dataframe)]: # noqa: F821 # noqa: F821
df = pandas.DataFrame(...) # noqa: F821
...
return df
@task(cache=True, cache_version="1.0") # noqa: F811
def bar_1(df: pandas.DataFrame) -> int: # noqa: F811
... # noqa: F811
@workflow
def wf_1(a: int, b: str): # noqa: F811
df = foo(a=a, b=b) # noqa: F811```
niels.bantilan
regardless, this does seem like a bug
niels.bantilan
is there any reason you’re using a custom class VideoRecord
and not a FlyteFile
?
peter.kun
Any idea why is it not working by FlyteRemote?
peter.kun
Btw bar_1 is registered entity and if I run it locally, then cache hit does happen:
video_record = VideoRecord("path/to/video")
bar_1(video_record=video_record)
bar_1(video_record=video_record)```
peter.kun
Let me show you my toy example, where cache hit never happens after consecutive runs:
from typing_extensions import Annotated
from flytekit.remote import FlyteRemote
from flytekit.configuration import Config, PlatformConfig
class VideoRecord:
def __init__(self, video_path: str):
self.video_path = video_path
def hash_video_record(record: VideoRecord) -> str:
return record.video_path
@task(cache=True, cache_version="1.0")
def bar_1(video_record: Annotated[VideoRecord, HashMethod(hash_video_record)]) -> str:
print("Running bar_1")
return video_record.video_path
if __name__ == "__main__":
video_record = VideoRecord("path/to/video")
remote = FlyteRemote(
config=Config(
platform=PlatformConfig(
endpoint=endpoint,
insecure=True,
insecure_skip_verify=True,
)
),
default_project=default_project,
default_domain=default_domain,
)
entity = remote.fetch_task(name="toy_example.bar_1", version="1.1")
remote.execute(
entity=entity,
inputs={"video_record": video_record},
wait=True,
tags=[],
overwrite_cache=False,
)```
kumare
It will work if your hashing algorithm results in the same hash
peter.kun
Yes I can
Is the problem in the discussion not valid anymore and should it work now (even though the input is not produced by flyte)? Because I strongly believe I am having the same problem and looking for some workaround.
niels.bantilan
can you confirm locally that the hash function is producing the same output for the pickle file?
peter.kun
I did use HashMethod
the same way as it is used in the discussion linked above. However, when executing that single task by FlyteRemote
, it never hits the cache. As you said, it re-uploads the pickle input file and runs the task as it would be new input. Don't you know some solution for this?
niels.bantilan
You can supply your own HashMethod
as shown here: https://docs.flyte.org/en/latest/user_guide/development_lifecycle/caching.html#caching-of-non-flyte-offloaded-objects
niels.bantilan
If you use FlyteRemote
I believe it re-uploads the pickle file input so as far as Flyte knows it’s a different file.
peter.kun
The input in my case is a Python type and not one produced by flyte
peter.kun
kumare
It should work, you hash maybe changing
peter.kun
Hello community,
I am executing registered flyte tasks by FlyteRemote: https://github.com/flyteorg/flytekit/blob/master/flytekit/remote/remote.py#L1176
Is there a way to have cache hit when having single (PythonPickle) blob as inputs?
I have tried to add the Annotated HashMethod to the input types in the flyte task, but it is not working.