F

Flyte enables you to build & deploy data & ML pipelines, hassle-free. The infinitely scalable and flexible workflow orchestration platform that seamlessly unifies data, ML and analytics stacks. Explore and Join the Flyte Community!

Flexible Workflow for Model Development

Summary

The user is seeking a flexible workflow for model development that can be shared among data scientists, focusing on data preprocessing, feature processing, model training, and evaluation. They want to enable researchers to easily define and modify model configurations using Flyte, integrated with Neptune.ai for experiment tracking. The user suggests creating an ExperimentExecutor class in Flyte for execution and configuration management and mentions a draft tutorial on using Flyte with Hydra and Pydantic. They highlight Flyte's capability to access dataclass attributes in its workflow DSL and propose exploring the integration of Hydra and Pydantic with Flyte's imperative workflows. Additionally, the user shares tips on managing data sources, optimizing the transform task, discretizing train-test split options, utilizing Hydra for model instantiation, and addressing potential issues with Pydantic's type unions in Flyte. They also mention a successful setup for a large experiment that was efficient, prevented overfitting, and avoided data duplication in blob storage.

Status
resolved
Tags
  • Neptune.ai
  • Experiment Tracking
  • Workflow
  • flyte
  • Model Development
  • Data Processing
  • Workflow Development
  • User
  • Collaboration
  • Support Need
  • Developer
  • Question
  • Data Scientist
  • Developer Help
  • Support Request
  • Model Training
  • Hydra
  • Feature Request
  • Workflow Integration
Source
#ask-the-community
    g

    grantham

    11/17/2024

    I created this exact set up for a very large experiment and it was very pleasant and effective. Zero chance of overfitting, super memory efficient, and prevents you from having to duplicate your data backed to blob storage.

    g

    grantham

    11/17/2024

    Not necessarily. My point is that you could create the hash anywhere, virtually instantaneously, while guaranteeing it is reproducible. So, if you wanted to train 10 different models, you would be creating this hash 10 different times whenever you load the data. This means you never duplicate the data nor do you have to store a list of indices and pass that around among your tasks. Just create a function that will do this and then filter out your data via a lazy expression.

    1. I am assuming you have a large set of data. I mean, it might be a little wonky around thousands of observations. But at millions? Nope. And even if it was a little off (71% / 29%), the 70% / 30% for train and validation/test is all arbitrary anyways :)
    2. Your requirements might be different, but what I had in mind was this: you use this hash to determine whether something belongs in the training / validation split OR the testing split. In other words, in your model training tasks you would only ever load in the data where “seed<0.70”. The KFold could be used for the training and validation split, but never the testing split (out of sample split). Your validation split would be used in different ways, but basically it would help with preventing overfitting your model. So, for GBMs you would use it to determine when you should stop adding trees, for NNs you would use to it determine when you should halt training and to which previous checkpoint you should use, and for GLMs you might use it to help prune features. You can KFold over this or whatever - that is all happening inside your training task and so you only have 70% of your data loaded in because the other 30% was filtered out via the lazy expression. Now, after you create your final validated model, you then return it from your training task and then execute a “evaluation” task that uses that model with your 30% testing split. This split will never change for the same dataset even if you use completely different models. It is a fair, reproducible collection of observations that you can absolutely guarantee your model never had a chance to ever use. Once again, this approach is guaranteed to be reproducible, you never have to duplicate your data, and is super fast and memory efficient.
    t

    tom.touati

    11/17/2024

    Thanks for the clarification :slightly_smiling_face: So you're suggesting to add a hash column to the data, then each fold gets a floating point range and uses only a part of the data? I'm still trying to figure out 2 things:

    1. if the hash wont be evenly distributed we might end up with unpredictable behaviour, no?
    2. I think If I use sklearn.model_selection.KFold I wont really need to load the data and save parts of it. I can simply present a list of client_ids, get a list of integer indices for each fold, then feed that list into a map_task, so that the data selection will happen only during the training.
    g

    grantham

    11/17/2024

    Does that make sense?

    g

    grantham

    11/17/2024

    Sorry if I wasn’t clear. Well, let’s break down the naive method. Say, train_test_split from sklearn

    Executing this in its own separate task means that you are doubling the memory requirements and doubling the disk space of the data that you are backing to blob storage. This is pretty inefficient.

    If you instead use the aforementioned method, you can lazily evaluate your splits during model training, but for larger-than-memory observation streaming (torch IterableDataset or TF datasets) as well as in-memory data loading for model training.

    For example, you could use a Polars LazyFrame to load in your unsplit data, and then evaluate a filter that will hash each observation’s primary key. Because this is lazily evaluated, you only have to actually load your training / validation data for model training, and you only have to load your testing data during OOS evaluation. You get all the same functionality of train_test_split without having to duplicate your data or load in more data than necessary at any point in time. You also don’t need to create “indices” for which observation belongs to each strata, because that information is available given the hash of the primary key. It also guarantees reproducibility.

    This same method can be used for iterable observation filtering for your DL models.

    t

    tom.touati

    11/17/2024

    Can you clarify what added valued does this method bring?

    g

    grantham

    11/17/2024

    This worked wonders for me.

    g

    grantham

    11/17/2024

    Yeah I have encountered the same problem before and I found a good solution to it.

    Firstly, this is all under the assumption you only want to split your data randomly. That is, you don’t care about out-of-time (OOT) splits.

    Given that assumption, you can do a pretty clever trick to get the best of both worlds. First, you’ll want to identify the primary key for each observation. In my past modeling experience, I would use the customer ID as the primary key, even if I was using multiple transactions for each customer. But that is aside the point. You just need to identify the key along which you want to uniquely stratify your data.

    Once you have that, you can “hash” the key into a floating point value uniformly between 0 and 1. This guarantees reproducibility. I then just store this value (the “seed”) alongside my data as a unique column. You only have to do this once for each dataset, because it is reproducible, and it is effectively instantaneous.

    You can then use this seed to determine the “split” of any observation at runtime. For example, any value about .8 is test, otherwise it should be use for training/validation in your cross-folding.

    t

    tom.touati

    11/17/2024

    I see what you are saying. To start with i think that it's good to have a mapping between model type and resources that can be accessed when creating the wf, with a default resource configuration for classical/deep models.

    Btw - I'm currently trying to figure out how to perform the train-test split and the consequences that it brings with it. I think that on a workflow level - I could get the number of datapoints from the inputs somehow, then apply the split of indices outside of a task, then run the cross validation in a map_task. However - models with short training time(like classical models) will be better off just performing the train/test split and training in a single task in terms of overhead. perhaps if no train_test_transform task is provided, the cross validation fold will be skipped. What's your view on that?

    g

    grantham

    11/17/2024

    Awesome! Thinking a bit more, I would also try to break each model training config into its own task that you stitch together with Imperative.

    I say that because I think you’ll be better off by setting the resources for each model type. For example, GBMs are going to take way more memory than GLMs.

    Also, you might even consider trying to bootstrap your own resource allocations dynamically based on the configurations. Say, if you are training a GBM with some max depth, and some max iterations, then request X amount of memory, whereas if you have higher max depth and max iterations, use 2X the amount of memory.

    I am just thinking you could end up wasting a lot of money if you try to make a “one size fits all” approach here without some thought around memory requirements

    t

    tom.touati

    11/17/2024

    Thanks a lot for the tips, they do help :slightly_smiling_face: will keep you posted

    g

    grantham

    11/17/2024

    Really enjoying your project! Excited to hear more updates.

    g

    grantham

    11/17/2024

    Hi Tom! Sounds like you have a good handle on this. I would like to provide a few tips around some "gotchas" here though!

    1. Assuming you are inputting some list[str] or list[MyDataSource] to represent your data sources, I would highly recommend creating a small task to sort the items in the list. You want to ensure that downstream users don't unintentionally create unnecessary "cache misses" of the transform task by simply changing the order of data sources.
    2. It may not be applicable or feasible in your case, but if you can, it would be very beneficial to ensure that in the transform task, if each data source you are trying to merge together are already sorted row-wise, you might be able to simply concatenate instead of join / merge. This would likely be much faster and require less memory.
    3. Given the configurations to train_test_transform, I would highly recommend trying to discretize the options here. IE: only allow users to select 80/10/10 , 70/15/15, or 60/20/20, or other such discrete strategies. I guess, you don't really want to give each user the ability to define continuous train-test split hyperparameters because this could result in effectively useless cache-misses. Also, you should ensure that this operation is idempotent / reproducible.
    4. "one type of task takes import path to model" ... if you haven't already, I would highly recommend checking out <https://hydra.cc/docs/advanced/instantiate_objects/overview/|this neat functionality> from hydra that will convert the path to model and load that in for you. But right, you will need to pass in the name of the model path and instantiate it inside of the task, whereas with my previous recommendation Hydra would try to instantiate the model in your local environment (which could work but might result in weird behavior).
    5. You might run into some issues around Pydantic's support for type unions and Flyte's requirement around strict typing. In other words: a config of type XGBoostConfig|RandomForestConfig might throw an error. I would instead recommend a "parent" dataclass that contains all of the possible children dataclasses, each of which are optional, and then you would override the None to your config for your specific model type.
    t

    tom.touati

    11/17/2024

    Update <@U07E8MHFW83> <@UNZB4NW3S> The current architecture is: • Experiment Runner class • static model development flow steps ( tasks) : ◦ dataset_fetching ◦ map_task signal_processing (each tasks handle separate data source) ◦ map_task transform_data_source (each tasks handle separate data source) ◦ transform dataset (all sources are merged, entire dataset transformed) ◦ train_test_transform(given train test split strategy, fit_transform on train data, then transform on test data) ◦ model training with 2 options: ▪︎ one type of task takes import path to model and runs a generic training for classical model/ DL model ▪︎ custom task for a specific model. ◦ model evaluation (same as training structure) • researcher flow: ◦ initiate experiment runner class : ▪︎ config remote/local execution env, caching, neptune project. ▪︎ pass tasks that match the static model development steps e.g tranform_dataset, while using default tasks that can be edited. ▪︎ config with dataclass per static flow step ◦ run experiment, track progress, get dataclass ◦

    k

    kumare

    11/10/2024

    I am not saying a literal plugin but an additional layer / like a supporting lib

    g

    grantham

    11/10/2024

    TBH I feel like config-driven imperative workflow authoring would be very difficult to develop as a plugin.

    Maptasks (with partials), dynamic WF, IO, overrides. Lots of nuances. I’m sure it’s possible.

    We do have an “omegaconf” plugin that is compatible with Hydra but this loses the fine-grained caching, UI support, attribute-access, and type checking you get with the automatic, recursive dataclass instantiation method I shared above.

    k

    kumare

    11/10/2024

    Maybe this is a Flyte plugin that ships out of the box and then he community makes it reproducible

    g

    grantham

    11/10/2024

    Oh, yeah that is a very good point!

    With imperative workflows, it is static, compiled, and reproducible. With eager, it could be reproducible, but it will be more ergonomic and more easily managed

    I think the ideal solution is going to be a function of how complex you need the programmatic authoring to be. If you want to support hundreds of potential permutation among multiple configuration groups (different model types, metric types, feature engineering logic, etc) then Eager is more favorable.

    If you have like, a dozen or so different model types, and everything else can be managed with like dynamic workflows or conditionals, Imperative Workflows is going to be more principled. You can use that “multi-run” logic to programmatically register them. But, well, that gets more complex when you have a large set of permutations.

    k

    kumare

    11/10/2024

    one problem with eager is the onus of reproducibility lies on the implementer of the eager logic

    k

    kumare

    11/10/2024

    +1

    g

    grantham

    11/10/2024

    The config is static, but model developers may opt to change the configs via Hydra to, say, swap from a GBM to a GLM, SVM, NN, RF, etc. This means that they would need to register different workflows for each of the unique variations that the imperative workflow may programmatically author.

    Imperative workflows is definitely the way to manage all of this right now, I 100% agree, but Eager could effectively offload all of this programmatic authoring logic inside of the workflow. This will be much easier to manage and version. It will also be a bit more ergonomic, I think.

    For this reason, I think that Eager will be a better long term solution for you and your team. But, once again, I would not currently recommend Eager because it is still experimental.

    k

    kumare

    11/10/2024

    Isn’t the config static? Why not generate the workflow statically using imperative?

    g

    grantham

    11/10/2024

    I am glad to hear! This pattern is pretty powerful IMO. Getting configurations "done right" is surprisingly challenging. I also want to highlight that, given the above recommendation, the Union Console provides a built-in UI ("Launch Forms") such that model developers may create / modify such workflow executions, even for such complex, arbitrarily nested data class configurations! (Screen shot below).

    I personally think that you can currently address this last piece (programmatic workflow authoring) via <https://docs.flyte.org/en/latest/user_guide/basics/imperative_workflows.html#|Imperative Workflows>. You could do this by programmatically constructing the Imperative workflow inside of a python match-case block as a function of the dataclass type (IE XGBoostConfig vs SVMConfig) in order to select the appropriate task. This will create a workflow on the fly locally from your configurations.

    In the future, I believe that your needs would be better suited by "<https://docs.flyte.org/en/latest/user_guide/advanced_composition/eager_workflows.html|Eager workflows>". Eager workflows will be more literal, ergonomic and extensible. These are a still a work in progress however (so I would not recommend them just yet), but my brilliant colleagues are investing resources into them over the next several months <@U0762PD3DME> <@U0635LYB5PD>

    t

    tom.touati

    11/10/2024

    <@U07E8MHFW83> Wow that's really good! thanks a lot. This will definitely help us with standardisation of experiment execution and configuration ease, especially with multi-runs. Together with Neptune that should cover 90% of what I need. The only thing to consider is how to run different models simply by changing a parameter. My options are: • creating a task/workflow for each model type e.g train_xgboost, Then create a dynamic_workflow that calls the relevant task/wf. • create a generic "train_model" task, the problem is that different models have different inputs. currently thinking about 2 options: ◦ The generic task takes a model config dataclass as input, which contains an str field of "model import path" and different parameters. ◦ Every model has a non-task function - e.g "train_xgboost" that is called from the task "train_model" but the latter option perhaps is less preferred. What do you think?

    t

    tom.touati

    11/9/2024

    <@UNZB4NW3S> awesome thanks, will check it out.

    g

    grantham

    11/9/2024

    Hi Tom, a lot of your requests here make a lot of sense. Programmatically executing workflows from a dynamic set of local configurations (managed by Hydra), which are all locally validated with Pydantic.

    This is something I have been tinkering with for a few years now and I have a draft of a tutorial on how to accomplish this with Flyte + Hydra + Pydantic https://github.com/granthamtaylor/hydra-flyte|here. In short, you may automatically, recursively instantiate your local yaml configurations and programmatically execute workflows from them.

    You do not need a rigorous "config dataclass for each step" - Flyte allows you to access attributes of the dataclasses in the Flyte workflow DSL.

    Otherwise, if you want to also programmatically construct / define a workflow from the configs (in addition to programmatically executing them from configs), I think you would be best served by exploring how to integrate Hydra + Pydantic with Flyte's "<https://docs.flyte.org/en/latest/user_guide/basics/imperative_workflows.html#|Imperative Workflows>".

    Does this make sense for your use case?

    k

    kumare

    11/9/2024

    Cc <@U01DYLVUNJE> <@U0635LYB5PD> <@U07E8MHFW83>

    k

    kumare

    11/9/2024

    We already have a Neptune ai integration

    t

    tom.touati

    11/9/2024

    So unionML is not usable? sounds pretty much what I'm searching for. Basically my idea was integrating flyte with http://Neptune.ai|Neptune.ai for experiment tracking. On the flyte side there would be: an ExperimentExecutor class which takes the following inputs: • local/cluster execution. • config dataclass for each step (allowing for grid execute) • programmatically builds a workflow. • presents easy API for querying results and caching the initial dataset locally for followup local examination.

    k

    kumare

    11/9/2024

    Honestly I would love to put simmering like this in Flyte, if you have ideas

    k

    kumare

    11/9/2024

    Just focus

    t

    tom.touati

    11/9/2024

    where can I see that? why did it stop?

    k

    kumare

    11/9/2024

    But we stopped that project

    k

    kumare

    11/9/2024

    We built something like this with union ml take a look

    k

    kumare

    11/9/2024

    Folks usually create reference tasks and then point folks to it

    t

    tom.touati

    11/9/2024

    Hey all :) TLDR - Did anyone develop a robust, generic workflow for model development that can be shared across data scientists?

    we are currently upgrading our model development environment. we would like to create a workflow with the task structure of: • preprocessing data • processing features • model train • model evaluation this wf will be used by all our data scientists so it has to be very flexible in terms of different types of models and code. the researcher basically should define a generic model config, alter some of the code, then run it. did anyone develop this already? would love to learn from your experience and exchange thoughts.