

# **Distributed DL/ML Solutions for HPC systems**

Dr. Fabio Baruffa Sr. Technical Consulting Engineer, Intel IAGS



# BASIC CONCEPTS ON DISTRIBUTED COMPUTING

### **Types of parallelism**

- **SIMD**: Single instruction multiple data (Data Parallel)
	- The same instruction is simultaneously applied on multiple data items
- **MIMD**: Multiple instructions multiple data (Task Parallel)
	- Different instructions on different data
- **SPMD**: Single program multiple data (MPI Parallel)
	- This is the message passing programming on distributed systems







### Optimization Notice

### **Shared vs distributed memory system**





- There is a unique address space shared between the processors
- All the processors can access the same memory



- **Distributed memory**
	- Each processor has its own local memory
	- Messages are exchanged between the processors to communicate the data

### Optimization Notice

Copyright © 2019, Intel Corporation. All rights reserved. \*Other names and brands may be claimed as the property of others. 4

## **What is high-performance computing (HPC)?**

**Leveraging distributed compute resources to solve complex problems with large datasets**

- Terabytes to petabytes to zettabytes of data
- Results in minutes to hours instead of days or weeks



Cluster manager runs workloads on distributed resources, such as CPUs, FPGAs, GPUs and disk drives all interconnected via network

Copyright © 2019, Intel Corporation. All rights reserved. Optimization Notice

\*Other names and brands may be claimed as the property of others.



### **domain decomposition method for HPC**

The domain decomposition is a technique for dividing a computational problem in several parts (domains) allowing to solve a large problem on the available resources

- *Partition* the data, assign them to each resource and associate the computation
- *Communication* happens to eventually exchange intermediate results
- Aggregate the results from the different resources



### **Scaling aspects of distributed computing**

■ Strong scaling: how the time to solution changes by increasing the compute resources for a fixed *total* problem size



Weak scaling: how the time to solution changes by increasing the compute resource for a constant problem size *per process*





### **How do we reduce the computational time?**

**CPU** memory **Number of training data set = 8 Example 1 Example 1 Example 1 Example 1 Example 1 Example 1 Epoch 2 Epoch n … …** We could use a strong scaling approach to reduce the time for all the epochs



### **strong scaling on training set**











## **Message passing interface (MPI)**

MPI is a standard which gets implemented in form of libraries for inter-process communication and data exchange. **Applications** 

Function categories:

- Point-to-point communication
- Collective communication
- Communicator topologies
- User-defined data types
- Utilities (for example, timing and initialization)



Intel® MPI Library - One MPI Library to develop, maintain & test for multiple fabrics





# **DISTRIBUTING STRATEGY FOR MACHINE LEARNING**

### **From Prototype to Production**

*Escatterplot* 

sns.set()

cols = ['SalePrice', 'OverallQual', 'GrLivArea', 'GarageCars', 'TotalBsmtSF', 'FullBath', 'YearBuilt'] sns.pairplot(df\_train[cols], size = 2.5)

 $plt.show()$ :



https://www.kaggle.com/pmarcelino/comprehensive-data-exploration-with-python

### Optimization Notice



## **Why distributed ML/DL**

- Most Machine Learning tasks assume the data can be easily accessible, but:
	- Data loading on a single machine can be a bottleneck in case of large amount of data
	- To run production applications large memory systems is required (data not fitting in the local computer RAM)
	- Traditional sequential algorithms are not suitable in case of distributed memory system
- Time to solution is critical on highly competitive market.



## **Why distributed ML/DL**

- Deep Learning training takes time:
	- Computational complexity of DL training can be up to 100+ ExaFLOP (1 ExaFLOP = $10^{18}$  op);
	- Typical single node performance is up-to tens of TeraFLOPS (1 TF =  $10^{12}$ ) op/sec);
	- Peak performance of most powerful HPC clusters is up-to tens of PetaFLOPS (1 PF =  $10^{15}$  op/sec).
- **Time to solution is critical on highly competitive market.**



### **Daal4py: accelerated analytics tools**

- Package created to address the needs of **Data Scientists and Framework Designers** to harness the **Intel® Data Analytics Acceleration Library (DAAL)** with a **Pythonic API**
- For scaling capabilities, **daal4py** also provides the ability to do distributed machine learning using **Intel® MPI library**
- **daal4py** operates in SPMD style (Single Program Multiple Data), which means your program is executed on several processes (e.g. similar to MPI)
- The use of MPI is not required for **daal4py**'s SPMD-mode to work, all necessary communication and synchronization happens under the hood of daal4py
- It is possible to use **daal4py** and mpi4py in the same program



### **Scaling Machine Learning Beyond a Single Node**



Simple Python API Powers scikit-learn

Powered by DAAL

Scalable to multiple nodes

### Try it out! conda install -c intel daal4py



### **K-Means using daal4py**

import daal4py as d4p

# daal4py accepts data as CSV files, numpy arrays or pandas dataframes # here we let daal4py load process-local data from csv files data = "kmeans\_dense.csv"

# Create algob object to compute initial centers  $init = d4p.kmeans_init(10, method='plusPlusDense")$ # compute initial centers ires = init.compute(data) # results can have multiple attributes, we need centroids Centroids = ires.centroids # compute initial centroids & kmeans clustering result =  $d4p.kmeans(10).compute(data, centroids)$ 



### **Distributed K-Means using daal4py**

import daal4py as d4p

# initialize distributed execution environment d4p.daalinit()

# daal4py accepts data as CSV files, numpy arrays or pandas dataframes # here we let daal4py load process-local data from csv files data = "kmeans\_dense\_ $\{\}$ .csv".format(d4p.my\_procid())

# compute initial centroids & kmeans clustering init = d4p.kmeans\_init(10, method="plusPlusDense", distributed=True) centroids = init.compute(data).centroids result = d4p.kmeans(10, distributed=True).compute(data, centroids)

**mpirun -n 4 python ./kmeans.py**



### **STRONG & WEAK SCALING VIA DAAL4PY**

Intel(R) Xeon(R) Gold 6148 CPU @ 2.40GHz, EIST/Turbo on



Dat



*On a 32-node cluster (1280 cores) daal4py computed linear regression of 2.15 TB of data in 1.18 seconds and 68.66 GB of data in less than 48 milliseconds.*



*On a 32-node cluster (1280 cores) daal4py computed K-Means (10 clusters) of 1.12 TB of data in 107.4 seconds and 35.76 GB of data in 4.8 seconds.*

\*Other names and brands may be claimed as the property of others.



### **Distributed K-Means using daal4py**

- 1) Performs a pixel-wise Vector Quantization (VQ) using K-Means
- 2) Implemented the domain decomposition according to:
	- d4p.num\_procs()
	- d4p.my procid()
- 3) Using the distributed algorithm from Daal4Py
	- d4p.kmeans init(n colors, method="plusPlusDense", distributed=True)
- 4) What is the meaning of d4p.daalinit() & d4p.daalfini()?
- 5) How does threading compare to multiprocessing in terms of performance?



22

### **Distributed K-Means Summary**

- Each process (MPI rank) get's a different chunk of data
- Only process #0 reports results
- Inference is using the same routines as training with 0 maximum iterations and centroid assignment
- There is no oversubscription since DAAL only sees the cores "owned" by the corresponding MPI rank



23



# DISTRIBUTING STRATEGY FOR DEEP LEARNING

### **deep learning Training procedure**

- **Forward propagation**: calculate loss function based on the input batch and current weights;
- **Backward propagation**: calculate error gradients w.r.t. weights for all layers (using chain rule);
- **Weights update**: use gradients to update weights; there are different algorithms exist - vanilla SGD, Momentum, Adam, etc.



Copyright © 2019, Intel Corporation. All rights reserved. \*Other names and brands may be claimed as the property of others. **Ontimization Notice** 

25

### **Parallelism options**



### Several options for parallelization



### **Neural network parallelism**



Data is processed in increments of N. Work on minibatch samples and distributed among the available resources.

**MODEL PARALLELISM** DATA

The work is divided according to the neurons in each layer. The sample minibatch is copied to all processors which compute part of the DNN.

source: https://arxiv.org/pdf/1802.09941.pdf



## **Multi-node parallelization**

- **Data parallelism**:
	- Replicate the model across nodes;
	- Feed each node with its own batch of input data;
	- Communication for gradients is required to get their average across nodes;
	- Can be either
		- *AllReduce* pattern
		- *ReduceScatter* + *AllGather* patterns





### **Data parallelism**



Copyright © 2019, Intel Corporation. All rights reserved. \*Other names and brands may be claimed as the property of others. Optimization Notice

29

ʻintel

### **Multi-node parallelization**

### • **Model parallelism**:

- Model is split across nodes;
- Feed each node with the same batch of input data;
- Communication for partial activations is required to gather the result;



### **Multi-node parallelization**

- **What parallelism flavor to use?**
	- Use model parallelism when volume of gradients is much higher than volume of activations or when model doesn't fit memory;
	- Use data parallelism otherwise;
	- Parallelism choice affects activations/gradients ratio
		- Data parallelism at scale makes activations << weights
		- Model parallelism at scale makes weights << activations
	- There're also other parallelism flavors pipelined, spatial, etc.



## **Intel® machine learning scaling library (MLSL)**

### **Distributed Deep Leaning Requirements:**

✓Compute/communication overlap  $\checkmark$ Choosing optimal communication algorithm ✓Prioritizing latency-bound communication ✓Portable / efficient implementation  $\checkmark$  Ease of integration with quantization algorithms ✓Integration with Deep Learning Frameworks



Communication dependent on work partitioning strategy Data parallelism = Allreduce (or) Reduce Scatter + Allgather Model parallelism = AlltoAll



### Optimization Notice

## **Intel® machine learning scaling library (MLSL)**

<https://github.com/01org/MLSL/releases>

Some of the Intel MLSL features include:

- Built on top of MPI, transparently supports various interconnects: Intel<sup>®</sup> Omni-Path Architecture, InfiniBand\*, and Ethernet;
- Optimized to drive scalability of DL communication patterns
- Ability to trade off compute for communication performance beneficial for communication-bound scenarios
- New domain-specific features are coming soon



33

### **Distributed TensorFlow\***



The parameter server model for distributed training jobs can be configured with different ratios of parameter servers to workers, each with different performance profiles.

Source: https://eng.uber.com/horovod/

Copyright © 2019, Intel Corporation. All rights reserved. \*Other names and brands may be claimed as the property of others.

Optimization Notice **Intel and the Intel and the Intel logo are trademarks of Intel Corporation in the U. S. and/or other countries. \*Other names and brands** may be claimed as the property of others. Copyright © 2018, Intel Corporation.



### **Distributed TensorFlow\* with horovod**

### **ring-allreduce**



- ➢ Horovod is a distributed training framework for TensorFlow, Keras, PyTorch, and MXNet.
- $\triangleright$  The goal of Horovod is to make distributed Deep Learning fast and easy to use
- ➢ Horovod core principles are based on MPI concepts such as **size, rank, local rank, allreduce, allgather and broadcast**.
- Separate infrastructure with model development
- **Advantages** 
	- ➢ Minimal code changes to run distributed TensorFlow
	- Network-optimal
	- $\triangleright$  No parameter server

Distributed training framework for TensorFlow

Source: https://eng.uber.com/horovod/

More info:<https://github.com/horovod/horovod/>

Copyright © 2019, Intel Corporation. All rights reserved. \*Other names and brands may be claimed as the property of others.

Optimization Notice **Intel and the Intel and the Intel logo are trademarks of Intel Corporation in the U.S. and/or other countries. \*Other names and brands** may be claimed as the property of others. Copyright © 2018, Intel Corporation.



### **Distributed Training with horovod\* MPI Lib**

### **Interconnect Fabric (Intel® OPA or Ethernet)** Intel<br>UPI **TID**  $\ddagger \ddagger \ddagger$ DMIx4<sup>1</sup>  $DMtx4$ <sup>†</sup>  $DMtxa$ 3y16 PClot 3x16 PCle 3y16 PCIe  $1x,1006$  $1x 1000$ Omni-Path Fabric 1 Intel<sup>®</sup> Omni-Path Fabric 3 Omni-Path Fabric 1 Intel® Omni-Path Fabric: Intel<sup>e</sup> Omni-Path Fabric 1 Node 1 Node 2 Node N

**Distributed Deep Learning Training Across Multiple nodes** Each node running multiple workers/node Uses optimized MPI Library for gradient updates over network fabric Caffe – Use Optimized Intel® MPI ML Scaling Library (Intel® MLSL) TensorFlow\* – Uber horovod MPI Library

Intel Best Known Methods:<https://ai.intel.com/accelerating-deep-learning-training-inference-system-level-optimizations/>

Copyright © 2019, Intel Corporation. All rights reserved. \*Other names and brands may be claimed as the property of others.

Optimization Notice **Intel and the Intel Logo are trademarks of Intel Corporation in the U.S.** and/or other countries. \*Other names and brands may be claimed as the property of others. Copyright © 2018, Intel Corporation.



**tti** 

 $1x1006$ 

Intel<sup>e</sup> Omni-Path Fabric 1

### **Horovod: How to change the code**

- Add **import** horovod.tensorflow as hvd and run hvd.init() in the beginning of the program
- Scale the learning rate by number of workers. Effective batch size in synchronous distributed training is scaled by the number of workers. An increase in learning rate compensates for the increased batch size.
- Wrap optimizer in **hvd.DistributedOptimizer**. The distributed optimizer delegates gradient computation to the original optimizer, averages gradients using **allreduce** or **allgather**, and then applies those averaged gradients.
- Add hvd.BroadcastGlobalVariablesHook(0) to broadcast initial variable states from rank 0 to all other processes. This is necessary to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint. Alternatively, if you're not using MonitoredTrainingSession, you can simply execute the **hvd.broadcast\_global\_variables** op after global variables have been initialized.
- Modify your code to save checkpoints only on worker 0 to prevent other workers from corrupting them. This can be accomplished by passing **checkpoint\_dir=None** to **tf.train.MonitoredTrainingSession,** if **hvd.rank() != 0**.



## **Horovod101 Quick start**

```
import horovod.tensorflow as hvd
hvd.init()
```
#Scale the optimizer opt =  $tf.train.AdagradOptimizer(0.01 * hvd.size())$ 

- # Add Horovod Distributed Optimizer opt = hvd.DistributedOptimizer(opt)
- hooks = [hvd.BroadcastGlobalVariablesHook(0)]

# Save checkpoints only on worker 0 to prevent other workers from corrupting them. checkpoint dir = '/tmp/train logs' if hvd.rank() == 0 else None





### **Scaling Tensorflow\***

### There is way more to consider when striking for peak performance on distributed deep learning training.:

https://ai.intel.com/white-papers/best-known-methods-forscaling-deep-learning-with-tensorflow-on-intel-xeonprocessor-based-clusters/

### Home Comm View Form Protec Share Help Q Tel C Ta Find  $Q \rightarrow \mathbb{C}$ п *WHITE PAPER* D Al Products Group, Intel Corporation **Customer Solutions Technical Enabling Best Practices for Scaling Deep Learning Training** Ø and Inference with TensorFlow\* On Intel® Xeon® **Processor-Based HPC Infrastructures** a.  $\mathscr{Q}$ **Table of Contents** 1. Rest Practices For Tensorflow\* On Intel\* Xeon\* Processors 1.2 Install MPI. If not already installed ............. 1.3 Install Uber's Horovod" Library. . . . . . . . . . 1.4 Installing tf can benchmarks. 1.5 Preparing the ImageNet\* 2012-1K Dataset... 1.5.1 Steps to download and prepare the ImageNet Dataset.  $\frac{y}{20}$ 1.5.2 Alexady have the ImageNet, SV Dataset 1.5.3 Dataset Strined on Lustre\* 1.6 Example: Running ResNet-50 with tf. can. henchmarks. 1.6.1 tf.con. henchmarks: ResNet-50 1.6.2 Training on Single-Node with Multiple Worker 1.6.3 Using Open MPI....... 1.6.4 Using Intel\* MPL....... 1.6.5 Using MVAPICH2\*....... 1.6.6 Training on Multiple Nodes with Multiple Workers 1.6.7 Evaluating the Accuracy of the Trained Model 1.6.8 Multi-Stream Inference on the Trained Model 2. Using Singularity\*. . . . . . . . . . . . 2.1 Installing Singularity\* 2.2 Building Singularity Image 2.3 Running TensorFlow With Singularity... 3. Usine Nfs And Slurm" ... 3.1 Using NFS Mounted File System 3.2 Using Slurm\* Scheduler. . . . 4. Tennorflow Build Instructions 4.1 Building TensorFlow 4.2 Install TensorFlow using script . . . . . . . 5 Sample Scripts 5.1 TensorFlow build script... 5.3.1 Install script . . . . . . . . . . . . . . . . . 5.3.2 Recipe file for TensorFlow wheel downloaded from a URI 5.3.3 Recipe file for TensorFlow wheel on local file system . . . . . . . . 5.3.4 Singularity run-script . . . . . . . . . . . . . . . . . 5.4 Inference scripts 6. Traubleshooting ...... 6.1.1 Importing TensorFlow 6.1.2 Run Idd to find the dynamically linked libraries. 6.1.4 Another Common Error when Importing TensorFlow... 6.1.5 Verify that TensorFlow is Using right the version of GCC. . . 6.1.6 Run Idd again after adding the correct version of GCC.

 $41 4 1/39$ 

G → 四日的約つ C - S- + ScalingDLwithTensorFlow\_WP.pdf - Intel Enterprise PDF Reader

Copyright © 2019, Intel Corporation. All rights reserved. \*Other names and brands may be claimed as the property of others. Optimization Notice



 $\mathbb{R}^n$ 

## **Intel® MLSL backend for Horovod**

Install procedure:

- Install the latest versions of Intel MLSL and Intel MPI;
- source <mlsl install>/intel64/bin/mlslvars.sh **thread**
- source <intel mpi 2019>/intel64/bin/mpivars.sh release mt
- Download Horovod and build it from source code or
	- pip install horovod



## **Intel® MLSL backend for Horovod**

### Launch procedure:

- export MLSL\_LOG\_LEVEL=1
	- output from within MLSL
- export MLSL\_NUM\_SERVERS=X
	- X is the number of cores you'd like to dedicate for driving communication
- export MLSL SERVER AFFINITY=c1,c2,..,cX
	- Core IDs dedicated to MLSL servers (uses X 'last' cores by default)
- export HOROVOD MLSLbackground BGT AFFINITY=c0
	- Affinity for thread of Horovod
- Adjust OpenMP settings to avoid intersection with c0,c1,..,cX





## TENSORFLOW+HOROVOD/CNN MNIST-HVD.IPYNB

### Delete the checkpoint if needed, otherwise TF won't train any further

- rm -rf checkpoints

### Let's start changing the numer of MPI tasks, what performance difference would you expect?

- mpirun -prepend-rank -genv OMP NUM THREADS=2 -genv I MPI DEBUG=5 -n 2 python -u cnn mnist-hvd.py
- mpirun -prepend-rank -genv OMP NUM THREADS=2 -genv I MPI DEBUG=5 -n 4 python -u cnn mnist-hvd.py
- check the size of the dataset:
	- 1s -1ha ~/.keras/datasets/

### Intel Python and Optimized Tensorflow

- source activate hvd-impi
- pip show tensorflow | grep Location
	- useful to locate the TF installation for see the library linked: ldd \$Location/tensorflow/libtensorflow...so
- $rm-rf /tmp/*$
- export export MKLDNN VERBOSE=1

### **Optimization Notice**



## **Tensorflow+Horovod/cnn\_mnist-hvd.ipynb**

- 1) How to initialize Horovod and why is it necessary?
- 2) Why is it necessary to adept the learning rate with larger batches?
- 3) How can you dynamically adept the learning rate?
- 4) How to identify rank #1 (0)?
- 5) Why is it necessary to adept the number of training steps according to the number of workers / larger batches?
- 6) How can you dynamically adept the number of training steps?
- 7) How is the single process performance vs 2 ranks vs 4 ranks?



## **MNIST CNN HOROVOD DEMO SUMMARY**

- Horovod initializes the MPI communication underneath and therefore defines rank() and size()
- In order to reduce the Time To Train with multiple workers, therefore increasing the batch size, the learning rate needs to scale
- Same for the # of steps for training
- 4 ranks can be faster since less threading efficiency is required in small convolutions





### Legal Disclaimer & Optimization Notice

Software and workloads used in performance tests may have been optimized for performance only on Intel microprocessors. Performance tests, such as SYSmark and MobileMark, are measured using specific computer systems, components, software, operations and functions. Any change to any of those factors may cause the results to vary. You should consult other information and performance tests to assist you in fully evaluating your contemplated purchases, including the performance of that product when combined with other products. For more complete information visit [www.intel.com/benchmarks](http://www.intel.com/benchmarks).

INFORMATION IN THIS DOCUMENT IS PROVIDED "AS IS". NO LICENSE, EXPRESS OR IMPLIED, BY ESTOPPEL OR OTHERWISE, TO ANY INTELLECTUAL PROPERTY RIGHTS IS GRANTED BY THIS DOCUMENT. INTEL ASSUMES NO LIABILITY WHATSOEVER AND INTEL DISCLAIMS ANY EXPRESS OR IMPLIED WARRANTY, RELATING TO THIS INFORMATION INCLUDING LIABILITY OR WARRANTIES RELATING TO FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABILITY, OR INFRINGEMENT OF ANY PATENT, COPYRIGHT OR OTHER INTELLECTUAL PROPERTY **RIGHT** 

Copyright © 2019, Intel Corporation. All rights reserved. Intel, the Intel logo, Pentium, Xeon, Core, VTune, OpenVINO, Cilk, are trademarks of Intel Corporation or its subsidiaries in the U.S. and other countries.

### **[Optimization Notice](https://software.intel.com/en-us/articles/optimization-notice)**

Intel's compilers may or may not optimize to the same degree for non-Intel microprocessors for optimizations that are not unique to Intel microprocessors. These optimizations include SSE2, SSE3, and SSSE3 instruction sets and other optimizations. Intel does not guarantee the availability, functionality, or effectiveness of any optimization on microprocessors not manufactured by Intel. Microprocessor-dependent optimizations in this product are intended for use with Intel microprocessors. Certain optimizations not specific to Intel microarchitecture are reserved for Intel microprocessors. Please refer to the applicable product User and Reference Guides for more information regarding the specific instruction sets covered by this notice.

Notice revision #20110804

46





Software