This post was written in collaboration with Assaf Pinhasi, an Engineering and MLOps expert, who helped us build the solution.
At 3DFY.ai, we are faced with some of the most difficult problems in computer vision and, in particular, AI-based 3D reconstruction under extremely challenging conditions.
Our solution is designed as a computational pipeline comprised of multiple modules, most of which are Deep Learning models, trained to perform a specific sub-task in the overall 3D reconstruction process. Some of our models are quite large and can reach hundreds of millions of parameters.
Training large networks typically requires large datasets, and our data is multidimensional in nature. Additionally, we are all about being data-centric which in practice means we train and retrain our models on new datasets quite often.
Putting it all together, we train large networks with large datasets and we train often. We consider this approach as the right one for dealing with challenging, real-world problems in a practical way. However, as we experienced firsthand, without the right infrastructure training can easily become an R&D bottleneck from a training time perspective, in addition to the cloud costs incurred.
We started out using disparate on-demand cloud GPU instances with 4-8 GPUs for our training jobs. Training one of our large networks would take about three weeks and would cost a few thousand dollars.
As we wanted to speed up our R&D activities, we decided to invest in proper training infrastructure that would cover the following user requirements:
We divided the system into four main layers:
In the following sections, we will dive into a more detailed analysis of the requirements from each of these layers.
Training models on hundreds of GBs (or more) of data makes loading the data onto the training machines’ local disks impractical, so we decided that jobs would read training data directly from centralized storage.
Our training examples have multiple data layers attached to them and are represented in a unique file format. During data loading, we pull and combine information from several data layers, depending on the training goal. Doing so enables us to reduce IO dramatically, as only needed layers are fetched. This API requires us to work with files (vs. object blobs).
As for experiment outputs, such as model checkpoints, debug images, Tensorboard events etc. we wanted to set up centralized storage for those too, in order to make it easier to manage this data, while still providing researchers with a way to work with it similarly to how they would locally.
We had by this point already set up an NFS storage solution that handled both the training data and the training output (one NFS per cloud provider) and it worked well for us.
We noticed that having a shared file system between all training nodes and researcher stations made it very easy to share information and communicate between all our processes, in a transparent way.
We wanted to continue using our NFS based storage solution, provided it could scale to the read throughput we needed.
The cluster layer needs to provision and set up the machines for the training processes. It is also responsible for scheduling training processes onto machines, starting, and monitoring them until completion.
Training jobs are ephemeral workloads — they can be submitted at any time, run anywhere from minutes to hours and even days, and then terminate.
This requires the compute resources to be elastic: scale-out when new training jobs are submitted, and scale-in when jobs terminate.
The cluster manager should perform any setup needed to prepare instances for the training job. This includes installing CUDA drivers, mounting remote file systems, and setting up logging and monitoring agents.
Once the instances are ready, the training processes should be scheduled onto them and started. The cluster manager needs to provide a way to check on the health status of the processes.
Since training jobs can take many hours and sometimes days, running on interruptible instances means interruptions are a rather likely scenario. On GCP, for example, instances are guaranteed to be interrupted after 24 hours, which is not an unheard-of length of time for a single training job to run.
To address this, the cluster manager needs to perform proactive steps to maintain the target compute capacity needed for running the jobs. When instances get interrupted and terminated, the cluster manager should attempt to spin up replacement instances, set them up and restart the training process on the new instances.
The cluster management needs to support additional concerns such as security, machine and process level observability, and more.
The training layer depends on the cluster layer and is in charge of all the tasks which are specific to the training process.
To support training jobs that run over tens of GPUs, we needed to distribute a single training job to run on multiple machines.
This means starting training workers on different machines, allowing them to discover each other, divide up the data between them, and synchronize their state during the training process.
The training layer needs to be resilient to training workers getting killed as a result of instance interruptions, and also allow replacement workers to join the training, without losing state or corrupting the training correctness.
The training layer needs to provide a solution for packaging training jobs so that they can be deployed on the cluster — allowing the cluster manager to start jobs, pass the parameters, and monitor them.
We wanted to create a simple API layer that will enable researchers and other clients to submit training jobs, check on their status, terminate them and generally manage them, without needing to learn new technologies or understand the internals of the cluster manager.
Buying a solution initially seemed like an interesting option, as it could help reduce the operational footprint and shorten the time and effort needed to set up the solution.
However, these solutions are relatively expensive, and also cloud-specific, which doesn’t meet our requirements for multi-cloud support.
We didn’t find many mature third party vendors who offered distributed training as a service. Those we did find tended to be expensive or to lock you into their platform. Using a third-party vendor would also prevent us from enjoying cloud vendor usage discounts.
The conclusion was that the solution would need to be built with open source components. Since we are a PyTorch shop, our first step was to look at PyTorch-native solutions.
As mentioned in the introduction, we typically run our training jobs in the Data parallel regime. Under this regime, the model gets replicated to multiple GPUs, each GPU trains on a different portion of data and, finally, results from all GPUs are combined to mimic training on one large batch.
DistributedDataParallel, or DDP for short, is a PyTorch module that supports training models in a Data-Parallel way, where the GPUs may reside on different machines.
Here is a brief explanation of the learning process DDP implements:
For more information about the DDP process, see the PyTorch documentation.
DDP addresses some of the core requirements of the Training Layer:
It has an API that enables you to plug in a worker discovery mechanism, it handles data sharding, and of course, it contains the implementation of the coordination mechanism needed for multiple GPUs to perform the actual learning process.
However, the official documentation contained no hint as to how one should run DDP in a real-life setting on a cluster. Another thing we noticed is that DDP doesn’t support the training elasticity we require.
We decided to look for real-life examples of distributed training solutions that run on clusters, and see what we could learn from them.
During our research, we didn’t come across any well-formed reference architecture which addressed all of our requirements. We did find several tools and platforms which supported some of our key requirements, each with its own pros/cons. Below is a brief overview of the main ones:
Kubeflow is a popular ML platform that runs on Kubernetes as the cluster manager. It has many capabilities, like pipeline authoring, experiment tracking, hyperparameter tuning and even serving. Kubeflow supports distributed training in PyTorch with its PyTorchOperator, which uses DDP under the hood.
Horovod is a distributed training solution from Uber. It’s one of the earliest implementations of the ring algorithm and has been used for several years in production. It has support for multiple DL frameworks including PyTorch. It’s possible to deploy Horovod jobs on Kubernetes, and it has a solution for elastic training jobs.
Ray is a platform and framework for writing distributed applications. It’s unique in that it offers both a cluster manager and a training coordination solution. The cluster manager covers node provisioning on multiple cloud providers including interruptible instances.
The training solution enables you to run DDP training jobs with a Ray wrapper. Setting up a Ray cluster seemed like it would be easy compared to Kubernetes. In fact, it seemed lightweight enough to allow us to roll out ephemeral clusters along with their training job, thus eliminating any single point of failure in cluster management.
Ray supports running its cluster manager over Kubernetes. While this reduces the risk of relying on Ray’s cluster manager, it also eliminates the solution’s simplicity which was its main attraction.
TorchElastic is a new, native PyTorch module, which extends DDP and adds support for training elasticity, i.e. enabling training jobs that are resilient to workers terminating and joining mid-training.
It has a simple module for integrating it with Kubernetes.
Not a very mature solution. At the point of our evaluation, TorchElastic was at version 0.21, which didn’t seem to have many contributors.
Despite being a relatively new project, looking at the code made it clear that TorchElastic is a very simple and elegant solution, with relatively few moving parts. This encouraged us to continue evaluating it for our solution
TorchElastic and Kubernetes emerged as our leading candidates.
TorchElastic (and the underlying DDP) seemed to cover all the requirements from the training layer. Despite being a new solution, it introduced minimal complexity, which meant less risk and a smaller solution footprint.
Kubernetes is indeed a rather complex technology — but at the same time, it is by far the most mature cluster management solution out there, and we felt sure it could support all of our requirements.
With the help of the cloud providers’ simplified Kubernetes offerings, and with the abundance of community and available resources, we thought we could get it up and running without a huge investment.
Interested to learn more about how we implemented our distributed training solution? You can find additional details on the PyTorch medium channel.