Planning service revisited#



Prerequisites#

Make sure you’ve run the one time setup for your environment

Make sure you’ve done the tutorial : 07_fault_injection_on_processes

Setup#

[ ]:
import signal
import os
from datetime import datetime, timedelta

import enoslib as en

en.init_logging()
en.check()

Reservation#

[ ]:
CLUSTER = "nova"
HERE = os.getcwd()
# claim the resources
conf = (
    en.G5kConf.from_settings(
        job_name="fault-injection tutorial",
        job_type=["deploy"],
        env_name="debian11-nfs"
    )
    .add_machine(roles=["server"], cluster=CLUSTER, nodes=1)
    .add_machine(
        roles=["producer"], cluster=CLUSTER, nodes=1
    )  # all the producers are running on the same machine
    .add_machine(
        roles=["consumer"], cluster=CLUSTER, nodes=1
    )  # all the consumers are running on the same machine
)

provider = en.G5k(conf)

roles, networks = provider.init()

# Fill in network information from nodes
roles = en.sync_info(roles, networks)
[ ]:
roles

Rabbitmq configuration#

Common node’s configuration#

Each node must have this minimal configuration : - having python and pip - having procps (to use kill) - having pika (for the rabbitmq connection)

[ ]:
# Common configuration
with en.actions(roles=roles) as p:
    p.apt(task_name="Installing python", name="python3")

    p.command("apt update")

    p.apt(task_name="Installing pip", name="python3-pip")
    p.pip(task_name="Installing pika", name="pika")

    p.file(path="/tmp/rabbitmq", state="absent")
    p.file(path="/tmp/rabbitmq", state="directory")

Server configuration#

Here, we does not launch anything yet, we just setup the server node to accept all our producer(s) and consumer(s). We also add a new administrator in order to have access to the management interface, the default one being blocked by the remote configuration.

[ ]:
username_monitoring = "user"
password_monitoring = "password"

# SETUP
## Server configuration
with en.actions(roles=roles["server"]) as p:
    # Setting the rabbimq server
    p.apt(task_name="Installing rabbitmq-server", name="rabbitmq-server")
    p.command("rabbitmq-plugins enable rabbitmq_management")
    p.command("systemctl start rabbitmq-server")
    p.command("systemctl enable rabbitmq-server")

    # For the management interface, adding a new admin
    p.command(f"rabbitmqctl add_user {username_monitoring} {password_monitoring}")
    p.command(f"rabbitmqctl set_user_tags {username_monitoring} administrator")
    p.command(f"rabbitmqctl set_permissions {username_monitoring} .* .* .* -p '/'")

    # Allow all connections (no credentials needed for consumers and producers)
    p.shell('echo "loopback_users.guest = false" | sudo tee -a /etc/rabbitmq/rabbitmq.conf')

    p.command("systemctl restart rabbitmq-server")

Producers’ node configuration#

The producers’ node has to be configured such that it contains its specific script.

[ ]:
with en.actions(roles=roles["producer"]) as p:
    p.copy(
        src=HERE + "/producer.py",
        dest="/tmp/rabbitmq/producer.py",
        task_name="copying producer file",
    )

Consumers’ node configuration#

The consumers’ node has to be configured such that it contains its specific script.

[ ]:
with en.actions(roles=roles["consumer"]) as p:
    p.copy(
        src=HERE + "/consumer.py",
        dest="/tmp/rabbitmq/consumer.py",
        task_name="copying consumer file",
    )

Utility functions#

The only purpose of these functions is to facilitate and to make this experiment more readable. Their objectives are : - to gather and show general statistics about the current state of the experiment (timestamp, number of received and processed messages, queue depth, number of consumer(s) and producer(s)) - to clear the experiment (kill all instances of producer(s)/consumer(s) if any, delete all output files if any, purges the rabbitmq queues) - to launch all producer(s) and consumer(s) by specifying the number of each - to reset the experiment by going back to its initial state (clean + launch)

[ ]:
import pandas as pd
import logging
import json

from typing import List

from enoslib.log import DisableLogging
from enoslib.config import config_context
from IPython.display import clear_output


server = roles["server"][0]
ip_address_obj = server.filter_addresses(networks=networks["prod"])[0]
server_ip = ip_address_obj.ip.ip

# list all producer(s) ip(s)
producer_ips : List[str] = []
for p in roles["producer"]:
    ip_address_obj = p.filter_addresses(networks=networks["prod"])[0]
    producer_ips.append(str(ip_address_obj.ip.ip))

# list all consumer(s) ip(s)
consumer_ips : List[str] = []
for c in roles["consumer"]:
    ip_address_obj = c.filter_addresses(networks=networks["prod"])[0]
    consumer_ips.append(str(ip_address_obj.ip.ip))

def _get_queues_info() -> List:
    with DisableLogging(level=logging.ERROR):
        with config_context(ansible_stdout="noop"):
            result = en.run_command(
                "rabbitmqctl list_queues name messages --formatter json; echo '---'; "
                "rabbitmqctl list_connections name --formatter json",
                task_name="Gathering statistics from the queues",
                roles=roles["server"],
                gather_facts=False,
                on_error_continue=True,
            )

    queues_info : List = []
    r = result[0]
    if r.status == "FAILED" or r.rc != 0:
        return

    queues_info, connections = r.stdout.split('---')

    return (json.loads(queues_info), json.loads(connections))

def get_queues_info_for(duration: int) -> pd.DataFrame:
    results = {}
    results["Time"] = []
    results["nb_received_messages"] = []
    results["queue_depth"] = []
    results["nb_consumers"] = []
    results["nb_producers"] = []
    for _ in range(duration):
        time = str(datetime.now().strftime("%H:%M:%S"))
        results["Time"].append(time)

        queues_info, connections = _get_queues_info()

        queue_depth = 0
        nb_consumers = 0
        nb_recv_msg = 0
        nb_producers = 0

        for d in queues_info:
            if d["name"] == "fault_injection":
                queue_depth = int(d["messages"])
            elif d["name"] == "received_messages":
                nb_recv_msg = int(d["messages"])

        for actor in connections:
            actor_ip = actor["name"].split(":")[0]

            if actor_ip in producer_ips:
                nb_producers +=1
            elif actor_ip in consumer_ips:
                nb_consumers +=1

        results["queue_depth"].append(queue_depth)
        results["nb_consumers"].append(nb_consumers)
        results["nb_producers"].append(nb_producers)
        results["nb_received_messages"].append(nb_recv_msg)

        clear_output(wait=False)
        print(
            f"Time : {time}\n"
            f"nb_received_messages : {nb_recv_msg}\n"
            f"queue_depth : {queue_depth}\n"
            f"nb_consumers: {nb_consumers}\n"
            f"nb_producers: {nb_producers}\n"
        )

    df = pd.DataFrame(data=results)

    return df

def clean():
    """
    Kill all previouses launched processes,
    removes all cronjobs,
    purges the queue.
    """
    cleaning_registry = en.ProcessRegistry()
    cleaning_registry.build(
        "{producer,consumer}",
        roles["consumer"] + roles["producer"],
    )
    cleaning_registry.kill(signal.SIGKILL)

    with DisableLogging(level=logging.ERROR):
        with config_context(ansible_stdout="noop"):
            en.run_command(
                "rabbitmqctl purge_queue fault_injection & "\
                "rabbitmqctl purge_queue received_messages & ",
                task_name="purging the queue",
                roles=roles["server"],
                on_error_continue=True,
                gather_facts=False,
            )

            en.run_command(
                "crontab -r",
                task_name="purging crontab file",
                roles=roles["consumer"] + roles["producer"],
                on_error_continue=True,
                gather_facts=False,
            )

Examples#

A first deployment#

Here, we are gonna schedule : - the start of 5 rabbitmq producers every 10 seconds after 2 minute 20 seconds - the start of 5 rabbitmq consumers every 10 seconds, 1 minute after the last producer is launched - the kill of 3 rabbitmq producers 4 minutes after the start of the experiment - the kill of all the rabbitmq consumers 30 seconds after the producers are killed

[ ]:
clean()
[ ]:
planning = en.PlanningService()

n = 5

time_now = datetime.now()
for idx in range(n):
    planning.add_event(
        en.StartEvent(
            date = time_now + timedelta(minutes = 2, seconds = (idx * 10)),
            host = roles["producer"][0],
            cmd = f"python3 /tmp/rabbitmq/producer.py {server_ip}",
            name = f"producer_{idx}",
        )
    )
    planning.add_event(
        en.StartEvent(
            date = time_now + timedelta(minutes = 3, seconds = (idx * 10)),
            host = roles["consumer"][0],
            cmd = f"python3 /tmp/rabbitmq/consumer.py {server_ip}",
            name = f"consumer_{idx}",
        )
    )

for idx in range(3):
    planning.add_event(
        en.KillEvent(
            date = time_now + timedelta(minutes = 4),
            host = roles["producer"][0],
            name = f"producer_{idx}",
        )
    )

for idx in range(n):
    planning.add_event(
        en.KillEvent(
            date = time_now + timedelta(minutes = 4, seconds = 30),
            host = roles["consumer"][0],
            name = f"consumer_{idx}",
        )
    )
planning.deploy()

We can have a first advice regarding the consistency of the planning.

We can have an up-to-date state of all the processes.

Let’s observe the evolution of our events.

[ ]:
results = get_queues_info_for(150)
results
[ ]:
results.plot(x="Time", y=["nb_received_messages", "queue_depth"], rot=45)
results.plot(x="Time", y=["nb_consumers", "nb_producers"], rot=45)

We can also make the planning unactive, we mean here that all next event(s) (depending on the current time) will not be executed.

[ ]:
planning.destroy()

Overload example#

Set-up an overload can be easily made using a service such as PlanningService.

Let’s see an example of it.

We are gonna schedule : - The launch of 20 rabbitmq producers after 4 minutes - The launch of 100 rabbitmq consumers after 5 minutes - The kill of 70 rabbitmq consumers after 6 minutes

[ ]:
clean()
[ ]:
planning = en.PlanningService()

time_now = datetime.now()
for idx in range(20):
    planning.add_event(
        en.StartEvent(
            date = time_now + timedelta(minutes = 4),
            host = roles["producer"][0],
            cmd = f"python3 /tmp/rabbitmq/producer.py {server_ip}",
            name = f"producer_{idx}",
        )
    )
for idx in range(100):
    planning.add_event(
        en.StartEvent(
            date = time_now + timedelta(minutes = 5),
            host = roles["consumer"][0],
            cmd = f"python3 /tmp/rabbitmq/consumer.py {server_ip}",
            name = f"consumer_{idx}",
        )
    )

for idx in range(70):
    planning.add_event(
        en.KillEvent(
            date = time_now + timedelta(minutes = 6),
            host = roles["consumer"][0],
            name = f"consumer_{idx}",
        )
    )

planning.check()
planning.deploy()
[ ]:
results = get_queues_info_for(150)
[ ]:
results.plot(x="Time", y=["nb_received_messages", "queue_depth"], rot=45)
results.plot(x="Time", y=["nb_consumers", "nb_producers"], rot=45)
[ ]:
planning.status()
[ ]:
r = planning.status()
[ ]:
planning.destroy()

Cleaning#

[ ]:
provider.destroy()