F

Flyte enables you to build & deploy data & ML pipelines, hassle-free. The infinitely scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks. Explore and Join the Flyte Community!

Cache Issues with FlyteRemote and PythonPickle

Summary

The user is facing issues with cache hits when executing Flyte tasks using FlyteRemote, particularly with a PythonPickle blob as input. They have tried using the Annotated HashMethod for input types but have not been successful since their input is a Python type rather than a Flyte-generated type. The user suspects that FlyteRemote is re-uploading the pickle file as a new input, which hinders cache hits. They are exploring the option of providing a custom HashMethod for caching non-Flyte offloaded objects and are looking for solutions or workarounds. The user notes that cache hits work when running the task locally but not with FlyteRemote. They also suggest that using FlyteFile with a specified remote_path for uploading videos to an S3 bucket might help with caching, although they are uncertain if this will work with FlyteRemote.

Status
resolved
Tags
    Source
    #ask-the-community
      p

      peter.kun

      10/22/2024

      thanks, I will try it out

      c

      curupa

      10/17/2024

      <@U06FV6LLKSQ>, FYI, I https://github.com/flyteorg/flyte/issues/5823#issuecomment-2420592440|replied on the gh issue. This should be enough to unblock your use of flyteremote.

      p

      peter.kun

      10/8/2024
      y

      ytong

      10/7/2024

      I think we should support this. Mind creating a gh issue for this <@U06FV6LLKSQ>?

      n

      niels.bantilan

      10/7/2024

      another solution might be to use FlyteFile and specify the remote_path argument, where the value is some s3 bucket path where you want to upload your videos. I’m not 100% certain this works with FlyteRemote , but that should make caching work.

      n

      niels.bantilan

      10/7/2024

      <@U0265RTUJ5B> <@UNR3C6Y4T> ^^

      n

      niels.bantilan

      10/7/2024

      I don’t think this is possible today

      p

      peter.kun

      10/7/2024

      Yes, but what if I don't have? that was my very first question. I would like to execute tasks by FlyteRemote, so the inputs are coming from Python and not from a flyte task.

      n

      niels.bantilan

      10/7/2024
      def produce_video(...) -&gt; Annotated[VideoRecord, HashMethod(...)]:
          ...
      
      @task
      def consume_video(video_record: VideoRecord): ...```
      
      n

      niels.bantilan

      10/7/2024

      so in your case, you’d need a task that produces the VideoRecord object:

      n

      niels.bantilan

      10/7/2024

      > Note how the output of task foo is annotated with an object of type HashMethod. Essentially, it represents a function that produces a hash that is used as part of the cache key calculation in calling the task bar.

      n

      niels.bantilan

      10/7/2024

      oh wait… you’re annotating the input with HashMethod? I don’t that that’ll work…

      You need to annotated the output of a task with HashMethod . (referring to the <https://docs.flyte.org/en/latest/user_guide/development_lifecycle/caching.html#caching-of-non-flyte-offloaded-objects|docs example>)

          return str(pandas.util.hash_pandas_object(df))
      
      
      @task
      def foo_1(  # noqa: F811
          a: int,
          b: str,  # noqa: F821
      ) -&gt; Annotated[pandas.DataFrame, HashMethod(hash_pandas_dataframe)]:  # noqa: F821  # noqa: F821
          df = pandas.DataFrame(...)  # noqa: F821
          ...
          return df
      
      
      @task(cache=True, cache_version="1.0")  # noqa: F811
      def bar_1(df: pandas.DataFrame) -&gt; int:  # noqa: F811
          ...  # noqa: F811
      
      
      @workflow
      def wf_1(a: int, b: str):  # noqa: F811
          df = foo(a=a, b=b)  # noqa: F811```
      
      n

      niels.bantilan

      10/7/2024

      regardless, this does seem like a bug

      n

      niels.bantilan

      10/7/2024

      is there any reason you’re using a custom class VideoRecord and not a FlyteFile?

      p

      peter.kun

      10/7/2024

      Any idea why is it not working by FlyteRemote?

      p

      peter.kun

      10/7/2024

      Btw bar_1 is registered entity and if I run it locally, then cache hit does happen:

          video_record = VideoRecord("path/to/video")
          bar_1(video_record=video_record)
          bar_1(video_record=video_record)```
      
      p

      peter.kun

      10/7/2024

      Let me show you my toy example, where cache hit never happens after consecutive runs:

      from typing_extensions import Annotated
      from flytekit.remote import FlyteRemote
      from flytekit.configuration import Config, PlatformConfig
      
      
      class VideoRecord:
          def __init__(self, video_path: str):
              self.video_path = video_path
      
      
      def hash_video_record(record: VideoRecord) -&gt; str:
          return record.video_path
      
      
      @task(cache=True, cache_version="1.0")
      def bar_1(video_record: Annotated[VideoRecord, HashMethod(hash_video_record)]) -&gt; str:
          print("Running bar_1")
          return video_record.video_path
      
      
      if __name__ == "__main__":
          video_record = VideoRecord("path/to/video")
      
          remote = FlyteRemote(
              config=Config(
                  platform=PlatformConfig(
                      endpoint=endpoint,
                      insecure=True,
                      insecure_skip_verify=True,
                  )
              ),
              default_project=default_project,
              default_domain=default_domain,
          )
          entity = remote.fetch_task(name="toy_example.bar_1", version="1.1")
          remote.execute(
              entity=entity,
              inputs={"video_record": video_record},
              wait=True,
              tags=[],
              overwrite_cache=False,
          )```
      
      k

      kumare

      10/6/2024

      It will work if your hashing algorithm results in the same hash

      p

      peter.kun

      10/4/2024

      Yes I can

      Is the problem in the discussion not valid anymore and should it work now (even though the input is not produced by flyte)? Because I strongly believe I am having the same problem and looking for some workaround.

      n

      niels.bantilan

      10/4/2024

      can you confirm locally that the hash function is producing the same output for the pickle file?

      p

      peter.kun

      10/4/2024

      I did use HashMethod the same way as it is used in the discussion linked above. However, when executing that single task by FlyteRemote, it never hits the cache. As you said, it re-uploads the pickle input file and runs the task as it would be new input. Don't you know some solution for this?

      n

      niels.bantilan

      10/4/2024

      If you use FlyteRemote I believe it re-uploads the pickle file input so as far as Flyte knows it’s a different file.

      p

      peter.kun

      10/4/2024

      The input in my case is a Python type and not one produced by flyte

      k

      kumare

      10/4/2024

      It should work, you hash maybe changing

      p

      peter.kun

      10/4/2024

      Hello community,

      I am executing registered flyte tasks by FlyteRemote: https://github.com/flyteorg/flytekit/blob/master/flytekit/remote/remote.py#L1176

      Is there a way to have cache hit when having single (PythonPickle) blob as inputs?

      I have tried to add the Annotated HashMethod to the input types in the flyte task, but it is not working.