Planning service revisited#
Website: https://discovery.gitlabpages.inria.fr/enoslib/index.html
Instant chat: https://framateam.org/enoslib
Source code: https://gitlab.inria.fr/discovery/enoslib
Prerequisites#
⚠️ Make sure you’ve run the one time setup for your environment
⚠️ Make sure you’re running this notebook under the right kernel
[ ]:
import enoslib as en
en.check()
Setup#
[ ]:
import signal
import os
from datetime import datetime, timedelta
import enoslib as en
en.init_logging()
en.check()
Reservation#
[ ]:
CLUSTER = "nova"
HERE = os.getcwd()
# claim the resources
conf = (
en.G5kConf.from_settings(
job_name="fault-injection tutorial",
job_type=["deploy"],
env_name="debian11-nfs"
)
.add_machine(roles=["server"], cluster=CLUSTER, nodes=1)
.add_machine(
roles=["producer"], cluster=CLUSTER, nodes=1
) # all the producers are running on the same machine
.add_machine(
roles=["consumer"], cluster=CLUSTER, nodes=1
) # all the consumers are running on the same machine
)
provider = en.G5k(conf)
roles, networks = provider.init()
# Fill in network information from nodes
roles = en.sync_info(roles, networks)
[ ]:
roles
Rabbitmq configuration#
Common node’s configuration#
Each node must have this minimal configuration : - having python and pip - having procps (to use kill) - having pika (for the rabbitmq connection)
[ ]:
# Common configuration
with en.actions(roles=roles) as p:
p.apt(task_name="Installing python", name="python3")
p.command("apt update")
p.apt(task_name="Installing pip", name="python3-pip")
p.pip(task_name="Installing pika", name="pika")
p.file(path="/tmp/rabbitmq", state="absent")
p.file(path="/tmp/rabbitmq", state="directory")
Server configuration#
Here, we does not launch anything yet, we just setup the server node to accept all our producer(s) and consumer(s). We also add a new administrator in order to have access to the management interface, the default one being blocked by the remote configuration.
[ ]:
username_monitoring = "user"
password_monitoring = "password"
# SETUP
## Server configuration
with en.actions(roles=roles["server"]) as p:
# Setting the rabbimq server
p.apt(task_name="Installing rabbitmq-server", name="rabbitmq-server")
p.command("rabbitmq-plugins enable rabbitmq_management")
p.command("systemctl start rabbitmq-server")
p.command("systemctl enable rabbitmq-server")
# For the management interface, adding a new admin
p.command(f"rabbitmqctl add_user {username_monitoring} {password_monitoring}")
p.command(f"rabbitmqctl set_user_tags {username_monitoring} administrator")
p.command(f"rabbitmqctl set_permissions {username_monitoring} .* .* .* -p '/'")
# Allow all connections (no credentials needed for consumers and producers)
p.shell('echo "loopback_users.guest = false" | sudo tee -a /etc/rabbitmq/rabbitmq.conf')
p.command("systemctl restart rabbitmq-server")
Producers’ node configuration#
The producers’ node has to be configured such that it contains its specific script.
[ ]:
with en.actions(roles=roles["producer"]) as p:
p.copy(
src=HERE + "/producer.py",
dest="/tmp/rabbitmq/producer.py",
task_name="copying producer file",
)
Consumers’ node configuration#
The consumers’ node has to be configured such that it contains its specific script.
[ ]:
with en.actions(roles=roles["consumer"]) as p:
p.copy(
src=HERE + "/consumer.py",
dest="/tmp/rabbitmq/consumer.py",
task_name="copying consumer file",
)
Utility functions#
The only purpose of these functions is to facilitate and to make this experiment more readable. Their objectives are : - to gather and show general statistics about the current state of the experiment (timestamp, number of received and processed messages, queue depth, number of consumer(s) and producer(s)) - to clear the experiment (kill all instances of producer(s)/consumer(s) if any, delete all output files if any, purges the rabbitmq queues) - to launch all producer(s) and consumer(s) by specifying the number of each - to reset the experiment by going back to its initial state (clean + launch)
[ ]:
import pandas as pd
import logging
import json
from typing import List
from enoslib.log import DisableLogging
from enoslib.config import config_context
from IPython.display import clear_output
server = roles["server"][0]
ip_address_obj = server.filter_addresses(networks=networks["prod"])[0]
server_ip = ip_address_obj.ip.ip
# list all producer(s) ip(s)
producer_ips : List[str] = []
for p in roles["producer"]:
ip_address_obj = p.filter_addresses(networks=networks["prod"])[0]
producer_ips.append(str(ip_address_obj.ip.ip))
# list all consumer(s) ip(s)
consumer_ips : List[str] = []
for c in roles["consumer"]:
ip_address_obj = c.filter_addresses(networks=networks["prod"])[0]
consumer_ips.append(str(ip_address_obj.ip.ip))
def _get_queues_info() -> List:
with DisableLogging(level=logging.ERROR):
with config_context(ansible_stdout="noop"):
result = en.run_command(
"rabbitmqctl list_queues name messages --formatter json; echo '---'; "
"rabbitmqctl list_connections name --formatter json",
task_name="Gathering statistics from the queues",
roles=roles["server"],
gather_facts=False,
on_error_continue=True,
)
queues_info : List = []
r = result[0]
if r.status == "FAILED" or r.rc != 0:
return
queues_info, connections = r.stdout.split('---')
return (json.loads(queues_info), json.loads(connections))
def get_queues_info_for(duration: int) -> pd.DataFrame:
results = {}
results["Time"] = []
results["nb_received_messages"] = []
results["queue_depth"] = []
results["nb_consumers"] = []
results["nb_producers"] = []
for _ in range(duration):
time = str(datetime.now().strftime("%H:%M:%S"))
results["Time"].append(time)
queues_info, connections = _get_queues_info()
queue_depth = 0
nb_consumers = 0
nb_recv_msg = 0
nb_producers = 0
for d in queues_info:
if d["name"] == "fault_injection":
queue_depth = int(d["messages"])
elif d["name"] == "received_messages":
nb_recv_msg = int(d["messages"])
for actor in connections:
actor_ip = actor["name"].split(":")[0]
if actor_ip in producer_ips:
nb_producers +=1
elif actor_ip in consumer_ips:
nb_consumers +=1
results["queue_depth"].append(queue_depth)
results["nb_consumers"].append(nb_consumers)
results["nb_producers"].append(nb_producers)
results["nb_received_messages"].append(nb_recv_msg)
clear_output(wait=False)
print(
f"Time : {time}\n"
f"nb_received_messages : {nb_recv_msg}\n"
f"queue_depth : {queue_depth}\n"
f"nb_consumers: {nb_consumers}\n"
f"nb_producers: {nb_producers}\n"
)
df = pd.DataFrame(data=results)
return df
def clean():
"""
Kill all previouses launched processes,
removes all cronjobs,
purges the queue.
"""
cleaning_registry = en.ProcessRegistry()
cleaning_registry.build(
"{producer,consumer}",
roles["consumer"] + roles["producer"],
)
cleaning_registry.kill(signal.SIGKILL)
with DisableLogging(level=logging.ERROR):
with config_context(ansible_stdout="noop"):
en.run_command(
"rabbitmqctl purge_queue fault_injection & "\
"rabbitmqctl purge_queue received_messages & ",
task_name="purging the queue",
roles=roles["server"],
on_error_continue=True,
gather_facts=False,
)
en.run_command(
"crontab -r",
task_name="purging crontab file",
roles=roles["consumer"] + roles["producer"],
on_error_continue=True,
gather_facts=False,
)
Examples#
A first deployment#
Here, we are gonna schedule : - the start of 5 rabbitmq producers every 10 seconds after 2 minute 20 seconds - the start of 5 rabbitmq consumers every 10 seconds, 1 minute after the last producer is launched - the kill of 3 rabbitmq producers 4 minutes after the start of the experiment - the kill of all the rabbitmq consumers 30 seconds after the producers are killed
[ ]:
clean()
[ ]:
planning = en.PlanningService()
n = 5
time_now = datetime.now()
for idx in range(n):
planning.add_event(
en.StartEvent(
date = time_now + timedelta(minutes = 2, seconds = (idx * 10)),
host = roles["producer"][0],
cmd = f"python3 /tmp/rabbitmq/producer.py {server_ip}",
name = f"producer_{idx}",
)
)
planning.add_event(
en.StartEvent(
date = time_now + timedelta(minutes = 3, seconds = (idx * 10)),
host = roles["consumer"][0],
cmd = f"python3 /tmp/rabbitmq/consumer.py {server_ip}",
name = f"consumer_{idx}",
)
)
for idx in range(3):
planning.add_event(
en.KillEvent(
date = time_now + timedelta(minutes = 4),
host = roles["producer"][0],
name = f"producer_{idx}",
)
)
for idx in range(n):
planning.add_event(
en.KillEvent(
date = time_now + timedelta(minutes = 4, seconds = 30),
host = roles["consumer"][0],
name = f"consumer_{idx}",
)
)
planning.deploy()
We can have a first advice regarding the consistency of the planning.
We can have an up-to-date state of all the processes.
Let’s observe the evolution of our events.
[ ]:
results = get_queues_info_for(150)
results
[ ]:
results.plot(x="Time", y=["nb_received_messages", "queue_depth"], rot=45)
results.plot(x="Time", y=["nb_consumers", "nb_producers"], rot=45)
We can also make the planning unactive, we mean here that all next event(s) (depending on the current time) will not be executed.
[ ]:
planning.destroy()
Overload example#
Set-up an overload can be easily made using a service such as PlanningService
.
Let’s see an example of it.
We are gonna schedule : - The launch of 20 rabbitmq producers after 4 minutes - The launch of 100 rabbitmq consumers after 5 minutes - The kill of 70 rabbitmq consumers after 6 minutes
[ ]:
clean()
[ ]:
planning = en.PlanningService()
time_now = datetime.now()
for idx in range(20):
planning.add_event(
en.StartEvent(
date = time_now + timedelta(minutes = 4),
host = roles["producer"][0],
cmd = f"python3 /tmp/rabbitmq/producer.py {server_ip}",
name = f"producer_{idx}",
)
)
for idx in range(100):
planning.add_event(
en.StartEvent(
date = time_now + timedelta(minutes = 5),
host = roles["consumer"][0],
cmd = f"python3 /tmp/rabbitmq/consumer.py {server_ip}",
name = f"consumer_{idx}",
)
)
for idx in range(70):
planning.add_event(
en.KillEvent(
date = time_now + timedelta(minutes = 6),
host = roles["consumer"][0],
name = f"consumer_{idx}",
)
)
planning.check()
planning.deploy()
[ ]:
results = get_queues_info_for(150)
[ ]:
results.plot(x="Time", y=["nb_received_messages", "queue_depth"], rot=45)
results.plot(x="Time", y=["nb_consumers", "nb_producers"], rot=45)
[ ]:
planning.status()
[ ]:
r = planning.status()
[ ]:
planning.destroy()
Cleaning#
[ ]:
provider.destroy()