Dask deployment on Grid’5000#
This notebook will deploy a Dask cluster on Grid’5000 and launch a simpe computation.
Requirements: - A conda[1] environment setup on the Grid’5000 frontend with dask installed and EnOSlib. - The same environment can be use to run this notebook from your local machine.
Initial impors#
[ ]:
from enoslib import *
import logging
# get some logs
logging.basicConfig(level=logging.INFO)
Get some resources on Grid’5000#
This will reserve two nodes, where the Dask cluster will be deployed later.
[ ]:
prod = G5kNetworkConf(id="prod", roles=["network"], type="prod", site="rennes")
conf = (
G5kConf.from_settings(job_name="dask", job_type=[])
.add_machine(roles=["scheduler"], cluster="parapide", nodes=1, primary_network=prod)
.add_machine(roles=["worker"], cluster="parapide", nodes=1, primary_network=prod)
.add_network_conf(prod)
).finalize()
provider = G5k(conf)
roles, _ = provider.init()
Deploy Dask on the nodes#
This assumes that the conda environment (dask-base) is configured in your home directory in /home/<user>/miniconda3
.
If the installation path differs, you can specify it using the conda_prefix
parameter.
[ ]:
username = g5k_api_utils.get_api_username()
dask = Dask("dask-base", scheduler=roles["scheduler"][0], workers=roles["worker"], run_as=username)
dask.deploy()
Using Dask#
Here we go with a simple computation (3 tasks, 2 dependent-ones). The below code will create all the tunnels needed to access the Dask dashboard and the scheduler.
[ ]:
from dask import delayed
import time
def inc(x):
time.sleep(5)
return x + 1
def dec(x):
time.sleep(3)
return x - 1
def add(x, y):
time.sleep(7)
return x + y
x = delayed(inc)(1)
y = delayed(dec)(2)
total = delayed(add)(x, y)
Launch the computation#
In the mean time you can check the web dashboard. The connection URL will be displayed.
[ ]:
from dask.distributed import Client
# Tunnel to the dashboard
addr, port, tunnel = G5kTunnel(dask.scheduler.address, 8787).start()
print(f"dashboard: http://{addr}:{port}")
with G5kTunnel(dask.scheduler.address, 8786) as (addr, port, _):
print(f"Scheduler address: {addr}:{port}")
client = Client(f"tcp://{addr}:{port}")
# launch a computation
print(f"result={total.compute()}")
[ ]:
# will stop the tunnel to the dashboard and the Dask cluster.
if tunnel is not None:
tunnel.stop(force=True)
dask.destroy()
[ ]: