{ "cells": [ { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# Planning service revisited\n", "\n", "---\n", "\n", "- Website: https://discovery.gitlabpages.inria.fr/enoslib/index.html\n", "- Instant chat: https://framateam.org/enoslib\n", "- Source code: https://gitlab.inria.fr/discovery/enoslib\n", "\n", "---\n", "\n", "## Prerequisites\n", "\n", "
\n", " Make sure you've run the one time setup for your environment\n", "
\n", "\n", "
\n", " Make sure you've done the tutorial : 07_fault_injection_on_processes\n", "
" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Setup" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import signal\n", "import os\n", "from datetime import datetime, timedelta\n", "\n", "import enoslib as en\n", "\n", "en.init_logging()\n", "en.check()\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Reservation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "CLUSTER = \"nova\"\n", "HERE = os.getcwd()\n", "# claim the resources\n", "conf = (\n", " en.G5kConf.from_settings(\n", " job_name=\"fault-injection tutorial\",\n", " job_type=[\"deploy\"],\n", " env_name=\"debian11-nfs\"\n", " )\n", " .add_machine(roles=[\"server\"], cluster=CLUSTER, nodes=1)\n", " .add_machine(\n", " roles=[\"producer\"], cluster=CLUSTER, nodes=1\n", " ) # all the producers are running on the same machine\n", " .add_machine(\n", " roles=[\"consumer\"], cluster=CLUSTER, nodes=1\n", " ) # all the consumers are running on the same machine\n", ")\n", "\n", "provider = en.G5k(conf)\n", "\n", "roles, networks = provider.init()\n", "\n", "# Fill in network information from nodes\n", "roles = en.sync_info(roles, networks)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "roles" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Rabbitmq configuration" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "#### Common node's configuration" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Each node must have this minimal configuration :\n", "- having python and pip\n", "- having procps (to use kill)\n", "- having pika (for the rabbitmq connection)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Common configuration\n", "with en.actions(roles=roles) as p:\n", " p.apt(task_name=\"Installing python\", name=\"python3\")\n", "\n", " p.command(\"apt update\")\n", "\n", " p.apt(task_name=\"Installing pip\", name=\"python3-pip\")\n", " p.pip(task_name=\"Installing pika\", name=\"pika\")\n", "\n", " p.file(path=\"/tmp/rabbitmq\", state=\"absent\")\n", " p.file(path=\"/tmp/rabbitmq\", state=\"directory\")\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "#### Server configuration" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Here, we does not launch anything yet, we just setup the server node to accept all our producer(s) and consumer(s). We also add a new administrator in order to have access to the management interface, the default one being blocked by the remote configuration." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "username_monitoring = \"user\"\n", "password_monitoring = \"password\"\n", "\n", "# SETUP\n", "## Server configuration\n", "with en.actions(roles=roles[\"server\"]) as p:\n", " # Setting the rabbimq server\n", " p.apt(task_name=\"Installing rabbitmq-server\", name=\"rabbitmq-server\")\n", " p.command(\"rabbitmq-plugins enable rabbitmq_management\")\n", " p.command(\"systemctl start rabbitmq-server\")\n", " p.command(\"systemctl enable rabbitmq-server\")\n", " \n", " # For the management interface, adding a new admin\n", " p.command(f\"rabbitmqctl add_user {username_monitoring} {password_monitoring}\")\n", " p.command(f\"rabbitmqctl set_user_tags {username_monitoring} administrator\")\n", " p.command(f\"rabbitmqctl set_permissions {username_monitoring} .* .* .* -p '/'\")\n", "\n", " # Allow all connections (no credentials needed for consumers and producers)\n", " p.shell('echo \"loopback_users.guest = false\" | sudo tee -a /etc/rabbitmq/rabbitmq.conf')\n", "\n", " p.command(\"systemctl restart rabbitmq-server\")\n", " " ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "#### Producers' node configuration" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "The producers' node has to be configured such that it contains its specific script." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "with en.actions(roles=roles[\"producer\"]) as p:\n", " p.copy(\n", " src=HERE + \"/producer.py\",\n", " dest=\"/tmp/rabbitmq/producer.py\",\n", " task_name=\"copying producer file\",\n", " )" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "#### Consumers' node configuration" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "The consumers' node has to be configured such that it contains its specific script." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "with en.actions(roles=roles[\"consumer\"]) as p:\n", " p.copy(\n", " src=HERE + \"/consumer.py\",\n", " dest=\"/tmp/rabbitmq/consumer.py\",\n", " task_name=\"copying consumer file\",\n", " )" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "#### Utility functions" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "The only purpose of these functions is to facilitate and to make this experiment more readable. Their objectives are :\n", "- to gather and show general statistics about the current state of the experiment (timestamp, number of received and processed messages, queue depth, number of consumer(s) and producer(s))\n", "- to clear the experiment (kill all instances of producer(s)/consumer(s) if any, delete all output files if any, purges the rabbitmq queues)\n", "- to launch all producer(s) and consumer(s) by specifying the number of each\n", "- to reset the experiment by going back to its initial state (clean + launch) " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import logging\n", "import json\n", "\n", "from typing import List\n", "\n", "from enoslib.log import DisableLogging\n", "from enoslib.config import config_context\n", "from IPython.display import clear_output\n", "\n", "\n", "server = roles[\"server\"][0]\n", "ip_address_obj = server.filter_addresses(networks=networks[\"prod\"])[0]\n", "server_ip = ip_address_obj.ip.ip\n", "\n", "# list all producer(s) ip(s)\n", "producer_ips : List[str] = []\n", "for p in roles[\"producer\"]:\n", " ip_address_obj = p.filter_addresses(networks=networks[\"prod\"])[0]\n", " producer_ips.append(str(ip_address_obj.ip.ip))\n", "\n", "# list all consumer(s) ip(s)\n", "consumer_ips : List[str] = []\n", "for c in roles[\"consumer\"]:\n", " ip_address_obj = c.filter_addresses(networks=networks[\"prod\"])[0]\n", " consumer_ips.append(str(ip_address_obj.ip.ip))\n", "\n", "def _get_queues_info() -> List:\n", " with DisableLogging(level=logging.ERROR):\n", " with config_context(ansible_stdout=\"noop\"):\n", " result = en.run_command(\n", " \"rabbitmqctl list_queues name messages --formatter json; echo '---'; \"\n", " \"rabbitmqctl list_connections name --formatter json\",\n", " task_name=\"Gathering statistics from the queues\",\n", " roles=roles[\"server\"],\n", " gather_facts=False,\n", " on_error_continue=True,\n", " )\n", "\n", " queues_info : List = []\n", " r = result[0]\n", " if r.status == \"FAILED\" or r.rc != 0:\n", " return\n", " \n", " queues_info, connections = r.stdout.split('---')\n", "\n", " return (json.loads(queues_info), json.loads(connections))\n", "\n", "def get_queues_info_for(duration: int) -> pd.DataFrame:\n", " results = {}\n", " results[\"Time\"] = []\n", " results[\"nb_received_messages\"] = []\n", " results[\"queue_depth\"] = []\n", " results[\"nb_consumers\"] = []\n", " results[\"nb_producers\"] = []\n", " for _ in range(duration):\n", " time = str(datetime.now().strftime(\"%H:%M:%S\"))\n", " results[\"Time\"].append(time)\n", "\n", " queues_info, connections = _get_queues_info()\n", "\n", " queue_depth = 0\n", " nb_consumers = 0\n", " nb_recv_msg = 0\n", " nb_producers = 0\n", "\n", " for d in queues_info:\n", " if d[\"name\"] == \"fault_injection\":\n", " queue_depth = int(d[\"messages\"])\n", " elif d[\"name\"] == \"received_messages\":\n", " nb_recv_msg = int(d[\"messages\"])\n", "\n", " for actor in connections:\n", " actor_ip = actor[\"name\"].split(\":\")[0]\n", "\n", " if actor_ip in producer_ips:\n", " nb_producers +=1\n", " elif actor_ip in consumer_ips:\n", " nb_consumers +=1\n", "\n", " results[\"queue_depth\"].append(queue_depth)\n", " results[\"nb_consumers\"].append(nb_consumers)\n", " results[\"nb_producers\"].append(nb_producers)\n", " results[\"nb_received_messages\"].append(nb_recv_msg)\n", "\n", " clear_output(wait=False)\n", " print(\n", " f\"Time : {time}\\n\"\n", " f\"nb_received_messages : {nb_recv_msg}\\n\"\n", " f\"queue_depth : {queue_depth}\\n\"\n", " f\"nb_consumers: {nb_consumers}\\n\"\n", " f\"nb_producers: {nb_producers}\\n\"\n", " )\n", "\n", " df = pd.DataFrame(data=results)\n", "\n", " return df\n", "\n", "def clean():\n", " \"\"\"\n", " Kill all previouses launched processes, \n", " removes all cronjobs,\n", " purges the queue.\n", " \"\"\"\n", " cleaning_registry = en.ProcessRegistry()\n", " cleaning_registry.build(\n", " \"{producer,consumer}\",\n", " roles[\"consumer\"] + roles[\"producer\"],\n", " )\n", " cleaning_registry.kill(signal.SIGKILL)\n", "\n", " with DisableLogging(level=logging.ERROR):\n", " with config_context(ansible_stdout=\"noop\"):\n", " en.run_command(\n", " \"rabbitmqctl purge_queue fault_injection & \"\\\n", " \"rabbitmqctl purge_queue received_messages & \",\n", " task_name=\"purging the queue\",\n", " roles=roles[\"server\"],\n", " on_error_continue=True,\n", " gather_facts=False,\n", " )\n", "\n", " en.run_command(\n", " \"crontab -r\",\n", " task_name=\"purging crontab file\",\n", " roles=roles[\"consumer\"] + roles[\"producer\"],\n", " on_error_continue=True,\n", " gather_facts=False,\n", " )" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Examples" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### A first deployment\n", "\n", "Here, we are gonna schedule :\n", "- the start of 5 rabbitmq producers every 10 seconds after 2 minute 20 seconds\n", "- the start of 5 rabbitmq consumers every 10 seconds, 1 minute after the last producer is launched\n", "- the kill of 3 rabbitmq producers 4 minutes after the start of the experiment\n", "- the kill of all the rabbitmq consumers 30 seconds after the producers are killed" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "clean()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "planning = en.PlanningService()\n", "\n", "n = 5\n", "\n", "time_now = datetime.now()\n", "for idx in range(n):\n", " planning.add_event(\n", " en.StartEvent(\n", " date = time_now + timedelta(minutes = 2, seconds = (idx * 10)),\n", " host = roles[\"producer\"][0],\n", " cmd = f\"python3 /tmp/rabbitmq/producer.py {server_ip}\",\n", " name = f\"producer_{idx}\", \n", " )\n", " )\n", " planning.add_event(\n", " en.StartEvent(\n", " date = time_now + timedelta(minutes = 3, seconds = (idx * 10)),\n", " host = roles[\"consumer\"][0],\n", " cmd = f\"python3 /tmp/rabbitmq/consumer.py {server_ip}\",\n", " name = f\"consumer_{idx}\", \n", " )\n", " )\n", "\n", "for idx in range(3):\n", " planning.add_event(\n", " en.KillEvent(\n", " date = time_now + timedelta(minutes = 4),\n", " host = roles[\"producer\"][0],\n", " name = f\"producer_{idx}\", \n", " )\n", " )\n", "\n", "for idx in range(n):\n", " planning.add_event(\n", " en.KillEvent(\n", " date = time_now + timedelta(minutes = 4, seconds = 30),\n", " host = roles[\"consumer\"][0],\n", " name = f\"consumer_{idx}\", \n", " )\n", " )\n", "planning.deploy()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "We can have a first advice regarding the consistency of the planning." ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "We can have an up-to-date state of all the processes." ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Let's observe the evolution of our events." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "results = get_queues_info_for(150)\n", "results" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "results.plot(x=\"Time\", y=[\"nb_received_messages\", \"queue_depth\"], rot=45)\n", "results.plot(x=\"Time\", y=[\"nb_consumers\", \"nb_producers\"], rot=45)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "We can also make the planning unactive, we mean here that all next event(s) (depending on the current time) will not be executed. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "planning.destroy()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Overload example" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Set-up an overload can be easily made using a service such as ```PlanningService```.\n", "\n", "Let's see an example of it.\n", "\n", "We are gonna schedule : \n", "- The launch of 20 rabbitmq producers after 4 minutes\n", "- The launch of 100 rabbitmq consumers after 5 minutes\n", "- The kill of 70 rabbitmq consumers after 6 minutes" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "clean()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "planning = en.PlanningService()\n", "\n", "time_now = datetime.now()\n", "for idx in range(20):\n", " planning.add_event(\n", " en.StartEvent(\n", " date = time_now + timedelta(minutes = 4),\n", " host = roles[\"producer\"][0],\n", " cmd = f\"python3 /tmp/rabbitmq/producer.py {server_ip}\",\n", " name = f\"producer_{idx}\", \n", " )\n", " )\n", "for idx in range(100):\n", " planning.add_event(\n", " en.StartEvent(\n", " date = time_now + timedelta(minutes = 5),\n", " host = roles[\"consumer\"][0],\n", " cmd = f\"python3 /tmp/rabbitmq/consumer.py {server_ip}\",\n", " name = f\"consumer_{idx}\", \n", " )\n", " )\n", "\n", "for idx in range(70):\n", " planning.add_event(\n", " en.KillEvent(\n", " date = time_now + timedelta(minutes = 6),\n", " host = roles[\"consumer\"][0],\n", " name = f\"consumer_{idx}\", \n", " )\n", " )\n", "\n", "planning.check()\n", "planning.deploy()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "results = get_queues_info_for(150)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "results.plot(x=\"Time\", y=[\"nb_received_messages\", \"queue_depth\"], rot=45)\n", "results.plot(x=\"Time\", y=[\"nb_consumers\", \"nb_producers\"], rot=45)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "planning.status()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "r = planning.status()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "planning.destroy()" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Cleaning" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "provider.destroy()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.5" } }, "nbformat": 4, "nbformat_minor": 4 }