<center><img src="../../fig/ICHEC_Logo.jpg" alt="Drawing" style="width: 500px;"/>

# <center>Dask<center/>
******
***




- Dask allows us to create multiple tasks with a minimum of effort.

- There are different apects to Dask of which we will cover just a few.

- Dask is integrated with different projects such as pandas, numpy and scipy.

- This allows parallelisation of say scipy functions with minimum changes to the python code.

- Some of the projects have their own multithreaded functions, so dask is not essential.

## Dask Distributed

- The first step is to create a set of workers with a scheduler.

- Here we are creating 2 worker processes with 2 threads each.

- There will be 1GB for all workers.

In [None]:
import dask
from dask.distributed import Client

client = Client(n_workers=2, threads_per_worker=2, memory_limit='1GB')

client

- This creates a LocalCluster in Dask.

- This means that all the workers will be on a single node.

- You can see there is a link to the scheduler.

- Unfortunately if you click on the link you cannot access the page.

- There are a couple of ways around this, create another tunnel to port 8787.

- Or install jupyter-server-proxy.

In [None]:
client.close()

In [None]:
import numpy as np

- Here we have a simple example where we estimate $\pi$.

- This version uses numpy functions to perform the calculations.

- Note there are no explicit loops.

- The code is serial.

In [None]:
def calculate_pi(size_in_bytes):
    
    """Calculate pi using a Monte Carlo method."""
    
    rand_array_shape = (int(size_in_bytes / 8 / 2), 2)
    
    # 2D random array with positions (x, y)
    xy = np.random.uniform(low=0.0, high=1.0, size=rand_array_shape)
    
    # check if position (x, y) is in unit circle
    xy_inside_circle = (xy ** 2).sum(axis=1) < 1

    # pi is the fraction of points in circle x 4
    pi = 4 * xy_inside_circle.sum() / xy_inside_circle.size

    print(f"\nfrom {xy.nbytes / 1e9} GB randomly chosen positions")
    print(f"   pi estimate: {pi}")
    print(f"   pi error: {abs(pi - np.pi)}\n")
    
    return pi

In [None]:
%time calculate_pi(10000)

- Without needing to change the function we can use Dask to parallelise the work.

- In this case we have two workers.

In [None]:
client = Client(n_workers=2, threads_per_worker=2, memory_limit='1GB')

In [None]:
dask_calpi = dask.delayed(calculate_pi)(10000)

In [None]:
%time dask.compute(dask_calpi)

In [None]:
client.restart()
client.close()

- But wait it is taking longer with Dask!

- You can change the size of the problem but still find that Dask is slower.

- This is because we are using Dask incorrectly, the function cannot be divided into tasks.

- We can visualise how the tasks are decomposed.

In [None]:
dask.visualize(dask_calpi)

- We can only get task parallelism if we call the function more than once.

- Below is an example of this.

In [None]:
results = []
for i in range(5):
    dask_calpi = dask.delayed(calculate_pi)(10000*(i+1))
    results.append(dask_calpi)
    
dask.visualize(results)

#to run all the tasks use
#dask.compute(*results)

## [Exercise 1](exercises/06-Dask_Array_Exercise1.ipynb)

# Dask Array

- From the previous example parallelisation comes with a downside.

- If we have 1 task that needs 10GB memory, having 10 simultanteously will need 100GB in total.

- Each kay node has 180GB of memory, what if we need more than that?

- Dask Array splits a larger array into smaller chunks.

- We can work on larger than memory arrays but still use all of the cores.

- You can think of the Dask array as a set of smaller numpy arrays.

<img src="../../fig/notebooks/dask-array-black-text.svg">

- Below we have some simple examples of using dask arrays

- Again we need to setup a LocalCluster.

In [None]:
client = Client(n_workers=2, threads_per_worker=1, memory_limit='1GB')
client

In [None]:
import dask.array as da

*These examples courtesy of Dask contributor James Bourbeau*

In [None]:
a_np = np.arange(1, 50, 3)   # Create array with numbers 1->50 but with a stride of 3
print(a_np)
a_np.shape

In [None]:
a_da = da.arange(1, 50, 3, chunks=5)
a_da

- Notice that we split the 1D array into 4 chunks with a maximum size of 5 elements.

- Also that dask is smart enough to have setup a set of tasks.

In [None]:
print(a_da.dtype)
print(a_da.shape)

In [None]:
print(a_da.chunks)
print(a_da.chunksize)

In [None]:
a_da.visualize()

- Four tasks have been created for each of the subarrays.

- Take an operation where the tasks are independent.

In [None]:
(a_da ** 2).visualize()

- Up to now we have omly been setting up the task graph or how the workload is split.

- To actually perform the set of tasks we need to *compute* the result.

In [None]:
(a_da ** 2).compute()

- You can see that the parallelisation is very simple.

- A numpy array is returned.

- There are other operations that can be done.

In [None]:
type((a_da ** 2).compute())

Dask arrays support a large portion of the NumPy interface:

- Arithmetic and scalar mathematics: `+`, `*`, `exp`, `log`, ...

- Reductions along axes: `sum()`, `mean()`, `std()`, `sum(axis=0)`, ...

- Tensor contractions / dot products / matrix multiply: `tensordot`

- Axis reordering / transpose: `transpose`

- Slicing: `x[:100, 500:100:-2]`

- Fancy indexing along single axes with lists or numpy arrays: `x[:, [10, 1, 5]]`

- Array protocols like `__array__` and `__array_ufunc__`

- Some linear algebra: `svd`, `qr`, `solve`, `solve_triangular`, `lstsq`, ...

- ...

See the [Dask array API docs](http://docs.dask.org/en/latest/array-api.html) for full details about what portion of the NumPy API is implemented for Dask arrays.

### Blocked Algorithms

- Dask arrays are implemented using _blocked algorithms_. 

- These algorithms break up a computation on a large array into many computations on smaller pieces of the array. 

- This minimizes the memory load (amount of RAM) of computations and allows for working with larger-than-memory datasets in parallel.

- Dask supports a large protion of the numpy functions.

In [None]:
x = da.random.random(20, chunks=5)
x

In [None]:
result = x.sum()
result

- Here we have a reduction operation.

- The results of each task are accumulated.

In [None]:
result.visualize()

In [None]:
result.compute()

- Here we have a much more complicated operation.

- Because we are taking the tranpose of a matrix, there are many more tasks.

- Again this is handled seamlessly.

In [None]:
x = da.random.random(size=(15, 15), chunks=(10, 5))
x

In [None]:
x.chunks

In [None]:
result = (x + x.T).sum()
result

In [None]:
result.visualize()

In [None]:
result.compute()

We can perform computations on larger-than-memory arrays!

In [None]:
client.restart()
client.close()

## Calculate Pi

- Going back to the previous example, calculating $\pi$.

- We could use dask arrays rather than numpy.

- We can parallelise the function itself and tackle larger problems.

In [None]:
def dask_calculate_pi(size_in_bytes,nchunks):
    
    """Calculate pi using a Monte Carlo method."""
    
    rand_array_shape = (int(size_in_bytes / 8 / 2), 2)
    chunk_size = int(rand_array_shape[0]/nchunks)
    
    # 2D random array with positions (x, y)
    xy = da.random.uniform(low=0.0, high=1.0, size=rand_array_shape, chunks=chunk_size)
    print(xy)
    
    # check if position (x, y) is in unit circle
    xy_inside_circle = (xy ** 2).sum(axis=1) < 1

    # pi is the fraction of points in circle x 4
    pi = 4 * xy_inside_circle.sum() / xy_inside_circle.size
    
    result = pi.compute()

    print(f"\nfrom {xy.nbytes / 1e9} GB randomly chosen positions")
    print(f"   pi estimate: {result}")
    print(f"   pi error: {abs(result - np.pi)}\n")
    
    return result

In [None]:
client = Client(n_workers=5,threads_per_worker=1,memory_limit='1GB')
client

In [None]:
%time dask_calculate_pi(10000000,20)

In [None]:
client.close()

## [Exercise 2](exercises/06-Dask_Array_Exercise2.ipynb)

# Summary

- Dask (distributed) allows task parallelization of problems.

- The changes to the code base are minimal but you do need to identify the tasks.

- A scheduler is created to allocate the tasks to the workers.

- Dask-array is a method to create a set of tasks automatically through operations on arrays.

- Dask-array sits on top of dask.

- Tasks are created by splitting a larger array(s), which means that larger than memory problems can be handled.

# Links

__[Dask Home Page](http://dask.org)__

__[Dask Distributed](https://distributed.dask.org/en/stable/)__

__[Dask Array](https://docs.dask.org/en/stable/array.html)__