Dask deployment on Grid’5000#

This notebook will deploy a Dask cluster on Grid’5000 and launch a simpe computation.

Requirements: - A conda[1] environment setup on the Grid’5000 frontend with dask installed and EnOSlib. - The same environment can be use to run this notebook from your local machine.

Initial impors#

[ ]:
from enoslib import *
import logging

# get some logs
logging.basicConfig(level=logging.INFO)

Get some resources on Grid’5000#

This will reserve two nodes, where the Dask cluster will be deployed later.

[ ]:
prod = G5kNetworkConf(id="prod", roles=["network"], type="prod", site="rennes")
conf = (
    G5kConf.from_settings(job_name="dask", job_type=[])
    .add_machine(roles=["scheduler"], cluster="parapide", nodes=1, primary_network=prod)
    .add_machine(roles=["worker"], cluster="parapide", nodes=1, primary_network=prod)
    .add_network_conf(prod)
).finalize()
provider = G5k(conf)
roles, _ = provider.init()

Deploy Dask on the nodes#

This assumes that the conda environment (dask-base) is configured in your home directory in /home/<user>/miniconda3.

If the installation path differs, you can specify it using the conda_prefix parameter.

[ ]:
username = g5k_api_utils.get_api_username()
dask = Dask("dask-base", scheduler=roles["scheduler"][0], workers=roles["worker"], run_as=username)
dask.deploy()

Using Dask#

Here we go with a simple computation (3 tasks, 2 dependent-ones). The below code will create all the tunnels needed to access the Dask dashboard and the scheduler.

[ ]:
from dask import delayed
import time

def inc(x):
    time.sleep(5)
    return x + 1

def dec(x):
    time.sleep(3)
    return x - 1

def add(x, y):
    time.sleep(7)
    return x + y

x = delayed(inc)(1)
y = delayed(dec)(2)
total = delayed(add)(x, y)

Launch the computation#

In the mean time you can check the web dashboard. The connection URL will be displayed.

[ ]:
from dask.distributed import Client
# Tunnel to the dashboard
addr, port, tunnel = G5kTunnel(dask.scheduler.address, 8787).start()
print(f"dashboard: http://{addr}:{port}")
with G5kTunnel(dask.scheduler.address, 8786) as (addr, port, _):
    print(f"Scheduler address: {addr}:{port}")
    client = Client(f"tcp://{addr}:{port}")
    # launch a computation
    print(f"result={total.compute()}")


[ ]:
# will stop the tunnel to the dashboard and the Dask cluster.
if tunnel is not None:
    tunnel.stop(force=True)
dask.destroy()
[ ]: