Summary
The user is seeking a flexible workflow for model development that can be shared among data scientists, focusing on data preprocessing, feature processing, model training, and evaluation. They want to enable researchers to easily define and modify model configurations using Flyte, integrated with Neptune.ai for experiment tracking. The user suggests creating an ExperimentExecutor class in Flyte for execution and configuration management and mentions a draft tutorial on using Flyte with Hydra and Pydantic. They highlight Flyte's capability to access dataclass attributes in its workflow DSL and propose exploring the integration of Hydra and Pydantic with Flyte's imperative workflows. Additionally, the user shares tips on managing data sources, optimizing the transform task, discretizing train-test split options, utilizing Hydra for model instantiation, and addressing potential issues with Pydantic's type unions in Flyte. They also mention a successful setup for a large experiment that was efficient, prevented overfitting, and avoided data duplication in blob storage.
grantham
I created this exact set up for a very large experiment and it was very pleasant and effective. Zero chance of overfitting, super memory efficient, and prevents you from having to duplicate your data backed to blob storage.
grantham
Not necessarily. My point is that you could create the hash anywhere, virtually instantaneously, while guaranteeing it is reproducible. So, if you wanted to train 10 different models, you would be creating this hash 10 different times whenever you load the data. This means you never duplicate the data nor do you have to store a list of indices and pass that around among your tasks. Just create a function that will do this and then filter out your data via a lazy expression.
tom.touati
Thanks for the clarification :slightly_smiling_face: So you're suggesting to add a hash column to the data, then each fold gets a floating point range and uses only a part of the data? I'm still trying to figure out 2 things:
grantham
Does that make sense?
grantham
Sorry if I wasn’t clear. Well, let’s break down the naive method. Say, train_test_split
from sklearn
Executing this in its own separate task means that you are doubling the memory requirements and doubling the disk space of the data that you are backing to blob storage. This is pretty inefficient.
If you instead use the aforementioned method, you can lazily evaluate your splits during model training, but for larger-than-memory observation streaming (torch IterableDataset or TF datasets) as well as in-memory data loading for model training.
For example, you could use a Polars LazyFrame to load in your unsplit data, and then evaluate a filter that will hash each observation’s primary key. Because this is lazily evaluated, you only have to actually load your training / validation data for model training, and you only have to load your testing data during OOS evaluation. You get all the same functionality of train_test_split
without having to duplicate your data or load in more data than necessary at any point in time. You also don’t need to create “indices” for which observation belongs to each strata, because that information is available given the hash of the primary key. It also guarantees reproducibility.
This same method can be used for iterable observation filtering for your DL models.
tom.touati
Can you clarify what added valued does this method bring?
grantham
This worked wonders for me.
grantham
Yeah I have encountered the same problem before and I found a good solution to it.
Firstly, this is all under the assumption you only want to split your data randomly. That is, you don’t care about out-of-time (OOT) splits.
Given that assumption, you can do a pretty clever trick to get the best of both worlds. First, you’ll want to identify the primary key for each observation. In my past modeling experience, I would use the customer ID as the primary key, even if I was using multiple transactions for each customer. But that is aside the point. You just need to identify the key along which you want to uniquely stratify your data.
Once you have that, you can “hash” the key into a floating point value uniformly between 0 and 1. This guarantees reproducibility. I then just store this value (the “seed”) alongside my data as a unique column. You only have to do this once for each dataset, because it is reproducible, and it is effectively instantaneous.
You can then use this seed to determine the “split” of any observation at runtime. For example, any value about .8 is test, otherwise it should be use for training/validation in your cross-folding.
tom.touati
I see what you are saying. To start with i think that it's good to have a mapping between model type and resources that can be accessed when creating the wf, with a default resource configuration for classical/deep models.
Btw - I'm currently trying to figure out how to perform the train-test split and the consequences that it brings with it. I think that on a workflow level - I could get the number of datapoints from the inputs somehow, then apply the split of indices outside of a task, then run the cross validation in a map_task. However - models with short training time(like classical models) will be better off just performing the train/test split and training in a single task in terms of overhead. perhaps if no train_test_transform task is provided, the cross validation fold will be skipped. What's your view on that?
grantham
Awesome! Thinking a bit more, I would also try to break each model training config into its own task that you stitch together with Imperative.
I say that because I think you’ll be better off by setting the resources for each model type. For example, GBMs are going to take way more memory than GLMs.
Also, you might even consider trying to bootstrap your own resource allocations dynamically based on the configurations. Say, if you are training a GBM with some max depth, and some max iterations, then request X amount of memory, whereas if you have higher max depth and max iterations, use 2X the amount of memory.
I am just thinking you could end up wasting a lot of money if you try to make a “one size fits all” approach here without some thought around memory requirements
tom.touati
Thanks a lot for the tips, they do help :slightly_smiling_face: will keep you posted
grantham
Really enjoying your project! Excited to hear more updates.
grantham
Hi Tom! Sounds like you have a good handle on this. I would like to provide a few tips around some "gotchas" here though!
list[str]
or list[MyDataSource]
to represent your data sources, I would highly recommend creating a small task to sort
the items in the list. You want to ensure that downstream users don't unintentionally create unnecessary "cache misses" of the transform
task by simply changing the order of data sources.transform
task, if each data source you are trying to merge together are already sorted row-wise, you might be able to simply concatenate
instead of join
/ merge
. This would likely be much faster and require less memory.train_test_transform
, I would highly recommend trying to discretize the options here. IE: only allow users to select 80/10/10
, 70/15/15
, or 60/20/20
, or other such discrete strategies. I guess, you don't really want to give each user the ability to define continuous train-test split hyperparameters because this could result in effectively useless cache-misses. Also, you should ensure that this operation is idempotent / reproducible.XGBoostConfig|RandomForestConfig
might throw an error. I would instead recommend a "parent" dataclass that contains all of the possible children dataclasses, each of which are optional, and then you would override the None
to your config for your specific model type.tom.touati
Update <@U07E8MHFW83> <@UNZB4NW3S> The current architecture is: • Experiment Runner class • static model development flow steps ( tasks) : ◦ dataset_fetching ◦ map_task signal_processing (each tasks handle separate data source) ◦ map_task transform_data_source (each tasks handle separate data source) ◦ transform dataset (all sources are merged, entire dataset transformed) ◦ train_test_transform(given train test split strategy, fit_transform on train data, then transform on test data) ◦ model training with 2 options: ▪︎ one type of task takes import path to model and runs a generic training for classical model/ DL model ▪︎ custom task for a specific model. ◦ model evaluation (same as training structure) • researcher flow: ◦ initiate experiment runner class : ▪︎ config remote/local execution env, caching, neptune project. ▪︎ pass tasks that match the static model development steps e.g tranform_dataset, while using default tasks that can be edited. ▪︎ config with dataclass per static flow step ◦ run experiment, track progress, get dataclass ◦
kumare
I am not saying a literal plugin but an additional layer / like a supporting lib
grantham
TBH I feel like config-driven imperative workflow authoring would be very difficult to develop as a plugin.
Maptasks (with partials), dynamic WF, IO, overrides. Lots of nuances. I’m sure it’s possible.
We do have an “omegaconf” plugin that is compatible with Hydra but this loses the fine-grained caching, UI support, attribute-access, and type checking you get with the automatic, recursive dataclass instantiation method I shared above.
kumare
Maybe this is a Flyte plugin that ships out of the box and then he community makes it reproducible
grantham
Oh, yeah that is a very good point!
With imperative workflows, it is static, compiled, and reproducible. With eager, it could be reproducible, but it will be more ergonomic and more easily managed
I think the ideal solution is going to be a function of how complex you need the programmatic authoring to be. If you want to support hundreds of potential permutation among multiple configuration groups (different model types, metric types, feature engineering logic, etc) then Eager is more favorable.
If you have like, a dozen or so different model types, and everything else can be managed with like dynamic workflows or conditionals, Imperative Workflows is going to be more principled. You can use that “multi-run” logic to programmatically register them. But, well, that gets more complex when you have a large set of permutations.
kumare
one problem with eager is the onus of reproducibility lies on the implementer of the eager logic
kumare
+1
grantham
The config is static, but model developers may opt to change the configs via Hydra to, say, swap from a GBM to a GLM, SVM, NN, RF, etc. This means that they would need to register different workflows for each of the unique variations that the imperative workflow may programmatically author.
Imperative workflows is definitely the way to manage all of this right now, I 100% agree, but Eager could effectively offload all of this programmatic authoring logic inside of the workflow. This will be much easier to manage and version. It will also be a bit more ergonomic, I think.
For this reason, I think that Eager will be a better long term solution for you and your team. But, once again, I would not currently recommend Eager because it is still experimental.
kumare
Isn’t the config static? Why not generate the workflow statically using imperative?
grantham
I am glad to hear! This pattern is pretty powerful IMO. Getting configurations "done right" is surprisingly challenging. I also want to highlight that, given the above recommendation, the Union Console provides a built-in UI ("Launch Forms") such that model developers may create / modify such workflow executions, even for such complex, arbitrarily nested data class configurations! (Screen shot below).
I personally think that you can currently address this last piece (programmatic workflow authoring) via <https://docs.flyte.org/en/latest/user_guide/basics/imperative_workflows.html#|Imperative Workflows>. You could do this by programmatically constructing the Imperative workflow inside of a python match-case block as a function of the dataclass type (IE XGBoostConfig
vs SVMConfig
) in order to select the appropriate task. This will create a workflow on the fly locally from your configurations.
In the future, I believe that your needs would be better suited by "<https://docs.flyte.org/en/latest/user_guide/advanced_composition/eager_workflows.html|Eager workflows>". Eager workflows will be more literal, ergonomic and extensible. These are a still a work in progress however (so I would not recommend them just yet), but my brilliant colleagues are investing resources into them over the next several months <@U0762PD3DME> <@U0635LYB5PD>
tom.touati
<@U07E8MHFW83> Wow that's really good! thanks a lot. This will definitely help us with standardisation of experiment execution and configuration ease, especially with multi-runs. Together with Neptune that should cover 90% of what I need. The only thing to consider is how to run different models simply by changing a parameter. My options are: • creating a task/workflow for each model type e.g train_xgboost, Then create a dynamic_workflow that calls the relevant task/wf. • create a generic "train_model" task, the problem is that different models have different inputs. currently thinking about 2 options: ◦ The generic task takes a model config dataclass as input, which contains an str field of "model import path" and different parameters. ◦ Every model has a non-task function - e.g "train_xgboost" that is called from the task "train_model" but the latter option perhaps is less preferred. What do you think?
tom.touati
<@UNZB4NW3S> awesome thanks, will check it out.
grantham
Hi Tom, a lot of your requests here make a lot of sense. Programmatically executing workflows from a dynamic set of local configurations (managed by Hydra), which are all locally validated with Pydantic.
This is something I have been tinkering with for a few years now and I have a draft of a tutorial on how to accomplish this with Flyte + Hydra + Pydantic https://github.com/granthamtaylor/hydra-flyte|here. In short, you may automatically, recursively instantiate your local yaml
configurations and programmatically execute workflows from them.
You do not need a rigorous "config dataclass for each step" - Flyte allows you to access attributes of the dataclasses in the Flyte workflow DSL.
Otherwise, if you want to also programmatically construct / define a workflow from the configs (in addition to programmatically executing them from configs), I think you would be best served by exploring how to integrate Hydra + Pydantic with Flyte's "<https://docs.flyte.org/en/latest/user_guide/basics/imperative_workflows.html#|Imperative Workflows>".
Does this make sense for your use case?
kumare
Cc <@U01DYLVUNJE> <@U0635LYB5PD> <@U07E8MHFW83>
kumare
We already have a Neptune ai integration
tom.touati
So unionML is not usable? sounds pretty much what I'm searching for. Basically my idea was integrating flyte with http://Neptune.ai|Neptune.ai for experiment tracking. On the flyte side there would be: an ExperimentExecutor class which takes the following inputs: • local/cluster execution. • config dataclass for each step (allowing for grid execute) • programmatically builds a workflow. • presents easy API for querying results and caching the initial dataset locally for followup local examination.
kumare
Honestly I would love to put simmering like this in Flyte, if you have ideas
kumare
Just focus
tom.touati
where can I see that? why did it stop?
kumare
But we stopped that project
kumare
We built something like this with union ml take a look
kumare
Folks usually create reference tasks and then point folks to it
tom.touati
Hey all :) TLDR - Did anyone develop a robust, generic workflow for model development that can be shared across data scientists?
we are currently upgrading our model development environment. we would like to create a workflow with the task structure of: • preprocessing data • processing features • model train • model evaluation this wf will be used by all our data scientists so it has to be very flexible in terms of different types of models and code. the researcher basically should define a generic model config, alter some of the code, then run it. did anyone develop this already? would love to learn from your experience and exchange thoughts.