From b3c08a1f568d58fd608ea8319913d01d6d88f08a Mon Sep 17 00:00:00 2001 From: mbonniot Date: Wed, 8 Apr 2026 09:51:27 +0200 Subject: [PATCH] [GUI, SYSTEM] Create tests scenarios and mockups for haproxy, logfwd & clustering --- .../gui/tests/test_haproxy_conf_generation.py | 795 ++++++++++++++++++ vulture_os/gui/tests/test_logfwd_logom.py | 732 ++++++++++++++++ .../system/tests/test_cluster_messagequeue.py | 738 ++++++++++++++++ 3 files changed, 2265 insertions(+) create mode 100644 vulture_os/gui/tests/test_haproxy_conf_generation.py create mode 100644 vulture_os/gui/tests/test_logfwd_logom.py create mode 100644 vulture_os/system/tests/test_cluster_messagequeue.py diff --git a/vulture_os/gui/tests/test_haproxy_conf_generation.py b/vulture_os/gui/tests/test_haproxy_conf_generation.py new file mode 100644 index 000000000..a6e18faa2 --- /dev/null +++ b/vulture_os/gui/tests/test_haproxy_conf_generation.py @@ -0,0 +1,795 @@ +#!/home/vlt-os/env/bin/python +"""This file is part of Vulture OS. + +Vulture OS is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. +""" + +__author__ = "VultureProject contributors" +__credits__ = [] +__license__ = "GPLv3" +__version__ = "4.0.0" +__maintainer__ = "Vulture OS" +__email__ = "contact@vultureproject.org" +__doc__ = "Tests for HAProxy frontend configuration generation (services/frontend/models.py)" + + +from django.test import TestCase +from unittest.mock import patch, MagicMock + +from system.cluster.models import Node, NetworkAddress, NetworkInterfaceCard, NetworkAddressNIC +from system.tenants.models import Tenants +from services.frontend.models import Frontend, Listener + + +# ─── Helpers ───────────────────────────────────────────────────────────────── + +def _make_config_mock(): + """Return a minimal Config-like mock usable as Cluster.get_global_config()""" + cfg = MagicMock() + cfg.to_dict.return_value = { + "cluster_api_key": "test-key", + "oauth2_header_name": "X-Vlt-Token", + "portal_cookie_name": "vlt", + "public_token": "", + "redis_password": "", + } + return cfg + + + +class FrontendConfTestBase(TestCase): + """ + Base class: creates the minimum DB objects required by generate_conf() + and patches every external dependency that would require a full + FreeBSD/MongoDB cluster. + """ + + TEST_CASE_NAME = f"{__name__}" + + # Patches actifs pour la suite + PATCHES = [ + # Cluster.get_global_config() -> ne pas toucher la DB Config + "services.frontend.models.Cluster.get_global_config", + # FilterPolicy.objects.filter() -> darwin, non teste ici + "services.frontend.models.FilterPolicy.objects.filter", + # Supprime le bruit de logs pendant les tests + "services.frontend.models.logger", + ] + + def setUp(self): + # Patches + self.mock_get_global_config = patch( + "services.frontend.models.Cluster.get_global_config", + return_value=_make_config_mock() + ).start() + self.mock_filter_policy = patch( + "services.frontend.models.FilterPolicy.objects.filter", + return_value=[] + ).start() + patch("services.frontend.models.logger").start() + self.addCleanup(patch.stopall) + + # Objets DB + self.tenants = Tenants.objects.create(name=f"test_tenants_{self.TEST_CASE_NAME}") + + def _make_frontend(self, **kwargs) -> Frontend: + """ + Construit et sauvegarde un Frontend avec des valeurs sures par defaut. + Toutes les relations M2M sont vides (listener_list=[], header_list=[]). + """ + defaults = dict( + name=f"fe_{id(kwargs)}", + mode="http", + enabled=True, + tenants_config=self.tenants, + enable_logging=False, + https_redirect=False, + custom_haproxy_conf="", + enable_cache=False, + enable_compression=False, + healthcheck_service=False, + timeout_client=60, + timeout_keep_alive=500, + log_level="warning", + log_condition="", + ) + defaults.update(kwargs) + fe = Frontend.objects.create(**defaults) + return fe + + def _generate(self, frontend: Frontend, listener_list=None, header_list=None) -> str: + """ + Appelle generate_conf() en injectant des listes vides pour eviter les + requetes DB sur les relations non testees + """ + return frontend.generate_conf( + listener_list=listener_list if listener_list is not None else [], + header_list=header_list if header_list is not None else [], + ) + + + +class TestHasHaproxyConf(FrontendConfTestBase): + """ + has_haproxy_conf controle si generate_conf() produit du contenu ou "". + C'est la premiere couche de defense contre les mauvaises generations. + """ + + def test_mode_http_has_haproxy_conf(self): + fe = self._make_frontend(mode="http") + self.assertTrue(fe.has_haproxy_conf) + + def test_mode_tcp_has_haproxy_conf(self): + fe = self._make_frontend(mode="tcp") + self.assertTrue(fe.has_haproxy_conf) + + def test_mode_log_tcp_has_haproxy_conf(self): + fe = self._make_frontend(mode="log", listening_mode="tcp") + self.assertTrue(fe.has_haproxy_conf) + + def test_mode_log_tcp_udp_has_haproxy_conf(self): + fe = self._make_frontend(mode="log", listening_mode="tcp,udp") + self.assertTrue(fe.has_haproxy_conf) + + def test_mode_log_relp_has_haproxy_conf(self): + fe = self._make_frontend(mode="log", listening_mode="relp") + self.assertTrue(fe.has_haproxy_conf) + + def test_mode_log_file_no_haproxy_conf(self): + """Un listener FILE rsyslog-only : pas de conf HAProxy""" + fe = self._make_frontend(mode="log", listening_mode="file") + self.assertFalse(fe.has_haproxy_conf) + + def test_mode_log_udp_no_haproxy_conf(self): + """UDP est gere par rsyslog directement""" + fe = self._make_frontend(mode="log", listening_mode="udp") + self.assertFalse(fe.has_haproxy_conf) + + def test_mode_log_redis_no_haproxy_conf(self): + fe = self._make_frontend(mode="log", listening_mode="redis") + self.assertFalse(fe.has_haproxy_conf) + + def test_mode_log_kafka_no_haproxy_conf(self): + fe = self._make_frontend(mode="log", listening_mode="kafka") + self.assertFalse(fe.has_haproxy_conf) + + def test_mode_log_api_no_haproxy_conf(self): + fe = self._make_frontend(mode="log", listening_mode="api") + self.assertFalse(fe.has_haproxy_conf) + + def test_mode_log_file_generates_empty_string(self): + """generate_conf() doit retourner "" si has_haproxy_conf est False""" + fe = self._make_frontend(mode="log", listening_mode="file") + self.assertEqual(self._generate(fe), "") + + def test_mode_filebeat_with_ip_placeholder_has_haproxy_conf(self): + fe = self._make_frontend(mode="filebeat", filebeat_config="host: %ip%:514") + self.assertTrue(fe.has_haproxy_conf) + + def test_mode_filebeat_without_ip_placeholder_no_haproxy_conf(self): + fe = self._make_frontend(mode="filebeat", filebeat_config="host: 127.0.0.1:514") + self.assertFalse(fe.has_haproxy_conf) + + + +class TestFrontendConfStructure(FrontendConfTestBase): + """Verifie la structure de base du bloc genere""" + + def test_http_mode_uses_frontend_keyword(self): + fe = self._make_frontend(mode="http", name="test-http-fe") + conf = self._generate(fe) + self.assertIn("frontend test-http-fe", conf) + self.assertNotIn("listen test-http-fe", conf) + + def test_tcp_mode_uses_frontend_keyword(self): + fe = self._make_frontend(mode="tcp", name="test-tcp-fe") + conf = self._generate(fe) + self.assertIn("frontend test-tcp-fe", conf) + + def test_log_mode_tcp_uses_listen_keyword(self): + """En mode log+tcp, HAProxy utilise un bloc 'listen' (pas 'frontend')""" + fe = self._make_frontend(mode="log", listening_mode="tcp", name="test-log-fe") + conf = self._generate(fe) + self.assertIn("listen test-log-fe", conf) + self.assertNotIn("frontend test-log-fe", conf) + + def test_enabled_frontend_has_enabled_directive(self): + fe = self._make_frontend(mode="http", enabled=True) + conf = self._generate(fe) + self.assertIn("enabled", conf) + self.assertNotIn("disabled", conf) + + def test_disabled_frontend_has_disabled_directive(self): + fe = self._make_frontend(mode="http", enabled=False) + conf = self._generate(fe) + self.assertIn("disabled", conf) + + def test_http_mode_directive_in_conf(self): + fe = self._make_frontend(mode="http") + conf = self._generate(fe) + self.assertIn("mode http", conf) + + def test_tcp_mode_directive_in_conf(self): + fe = self._make_frontend(mode="tcp") + conf = self._generate(fe) + self.assertIn("mode tcp", conf) + + def test_log_mode_tcp_uses_mode_tcp_in_conf(self): + fe = self._make_frontend(mode="log", listening_mode="tcp") + conf = self._generate(fe) + self.assertIn("mode tcp", conf) + + + +class TestFrontendTimeouts(FrontendConfTestBase): + """ + Timeouts + """ + def test_timeout_client_appears_in_conf(self): + fe = self._make_frontend(mode="http", timeout_client=90) + conf = self._generate(fe) + self.assertIn("timeout client 90s", conf) + + def test_timeout_keep_alive_appears_in_http_conf(self): + fe = self._make_frontend(mode="http", timeout_keep_alive=1000) + conf = self._generate(fe) + self.assertIn("timeout http-keep-alive 1000", conf) + + def test_timeout_keep_alive_absent_in_tcp_conf(self): + """Le timeout http-keep-alive n'a pas de sens en mode TCP""" + fe = self._make_frontend(mode="tcp", timeout_keep_alive=1000) + conf = self._generate(fe) + self.assertNotIn("timeout http-keep-alive", conf) + + def test_timeout_server_60s_present_in_log_mode(self): + """En mode log, HAProxy doit avoir un timeout server fixe""" + fe = self._make_frontend(mode="log", listening_mode="tcp") + conf = self._generate(fe) + self.assertIn("timeout server 60s", conf) + + + +class TestFrontendHttpsRedirect(FrontendConfTestBase): + """ + Bug historique : https_redirect n'etait rendu que dans le bloc {% if conf.workflows %}. + """ + + def test_https_redirect_true_adds_redirect_scheme(self): + fe = self._make_frontend(mode="http", https_redirect=True) + conf = self._generate(fe) + self.assertIn("redirect scheme https code 301", conf) + self.assertIn("!{ ssl_fc }", conf) + + def test_https_redirect_false_no_redirect(self): + fe = self._make_frontend(mode="http", https_redirect=False) + conf = self._generate(fe) + self.assertNotIn("redirect scheme https", conf) + + def test_https_redirect_irrelevant_in_tcp_mode(self): + """Le template ne genere pas de redirect en mode TCP""" + fe = self._make_frontend(mode="tcp", https_redirect=True) + conf = self._generate(fe) + self.assertNotIn("redirect scheme https", conf) + + + +class TestCustomHaproxyConf(FrontendConfTestBase): + """ + custom_haproxy_conf doit etre injecte verbatim. + Cas exotique : injection vide ne doit pas laisser d'artefact. + """ + + def test_custom_directives_injected_verbatim(self): + custom = "option http-server-close\nretries 3" + fe = self._make_frontend(mode="http", custom_haproxy_conf=custom) + conf = self._generate(fe) + self.assertIn("option http-server-close", conf) + self.assertIn("retries 3", conf) + + def test_single_custom_directive(self): + fe = self._make_frontend(mode="http", custom_haproxy_conf="option forwardfor") + conf = self._generate(fe) + self.assertIn("option forwardfor", conf) + + def test_empty_custom_conf_no_artifact(self): + """Pas de bloc vide parasite quand custom_haproxy_conf est ''""" + fe = self._make_frontend(mode="http", custom_haproxy_conf="") + conf = self._generate(fe) + # Pas trois newlines consecutifs (signe d'un bloc conditionnel mal rendu) + self.assertNotIn("\n\n\n", conf) + + def test_custom_conf_with_special_chars(self): + """Les caracteres speciaux ne doivent pas casser le rendu Jinja""" + custom = 'http-request set-header X-Real-IP "%[src]"' + fe = self._make_frontend(mode="http", custom_haproxy_conf=custom) + conf = self._generate(fe) + self.assertIn("X-Real-IP", conf) + + def test_custom_conf_multiline_preserved(self): + custom = "option1\noption2\noption3" + fe = self._make_frontend(mode="http", custom_haproxy_conf=custom) + conf = self._generate(fe) + self.assertIn("option1", conf) + self.assertIn("option2", conf) + self.assertIn("option3", conf) + + + +class TestFrontendLogging(FrontendConfTestBase): + """ + Logging + """ + def test_http_logging_enabled_adds_option_httplog(self): + fe = self._make_frontend(mode="http", enable_logging=True, log_level="info") + conf = self._generate(fe) + self.assertIn("option httplog", conf) + + def test_tcp_logging_enabled_adds_option_tcplog(self): + fe = self._make_frontend(mode="tcp", enable_logging=True) + conf = self._generate(fe) + self.assertIn("option tcplog", conf) + + def test_logging_disabled_adds_no_log(self): + fe = self._make_frontend(mode="http", enable_logging=False) + conf = self._generate(fe) + self.assertIn("no log", conf) + self.assertNotIn("option httplog", conf) + + def test_http_logging_includes_unix_socket_path(self): + """Le socket Unix de communication HAProxy -> rsyslog doit apparaitre""" + fe = self._make_frontend(mode="http", enable_logging=True) + conf = self._generate(fe) + expected_socket = fe.get_unix_socket() + self.assertIn(expected_socket, conf) + + def test_http_logging_includes_log_level(self): + fe = self._make_frontend(mode="http", enable_logging=True, log_level="debug") + conf = self._generate(fe) + self.assertIn("debug", conf) + + def test_http_logging_captures_user_agent(self): + """En mode HTTP avec logging, HAProxy doit capturer l'User-Agent""" + fe = self._make_frontend(mode="http", enable_logging=True) + conf = self._generate(fe) + self.assertIn("capture request header User-Agent", conf) + + def test_http_logging_log_format_is_json(self): + """Le log-format doit etre du JSON (pour etre consomme par rsyslog)""" + fe = self._make_frontend(mode="http", enable_logging=True) + conf = self._generate(fe) + self.assertIn('log-format "{ ', conf) + + def test_http_log_format_has_http_specific_fields(self): + """En HTTP, des champs specifiques HTTP doivent etre dans le log-format""" + fe = self._make_frontend(mode="http", enable_logging=True) + conf = self._generate(fe) + self.assertIn("http_method", conf) + self.assertIn("http_path", conf) + + def test_tcp_log_format_no_http_specific_fields(self): + """En TCP, les champs HTTP ne doivent PAS apparaitre dans le log-format""" + fe = self._make_frontend(mode="tcp", enable_logging=True) + conf = self._generate(fe) + self.assertNotIn("http_method", conf) + + + +class TestFrontendCache(FrontendConfTestBase): + """ + Cache HTTP + """ + + def test_cache_enabled_creates_cache_section(self): + fe = self._make_frontend( + mode="http", + enable_cache=True, + cache_total_max_size=16, + cache_max_age=120, + ) + conf = self._generate(fe) + self.assertIn(f"cache cache_{fe.id}", conf) + self.assertIn("total-max-size 16", conf) + self.assertIn("max-age 120", conf) + + def test_cache_enabled_adds_filter_and_http_directives(self): + fe = self._make_frontend(mode="http", enable_cache=True) + conf = self._generate(fe) + self.assertIn(f"filter cache cache_{fe.id}", conf) + self.assertIn(f"http-request cache-use cache_{fe.id}", conf) + self.assertIn(f"http-response cache-store cache_{fe.id}", conf) + + def test_cache_disabled_no_cache_section(self): + fe = self._make_frontend(mode="http", enable_cache=False) + conf = self._generate(fe) + self.assertNotIn("cache cache_", conf) + self.assertNotIn("filter cache", conf) + + def test_cache_only_in_http_mode(self): + """Le cache HAProxy n'existe qu'en mode HTTP""" + fe = self._make_frontend(mode="tcp", enable_cache=True) + conf = self._generate(fe) + # En mode TCP, le bloc cache ne doit pas apparaitre + self.assertNotIn("filter cache", conf) + + + +class TestFrontendCompression(FrontendConfTestBase): + """Compression HTTP""" + + def test_compression_enabled_adds_filter(self): + fe = self._make_frontend( + mode="http", + enable_compression=True, + compression_algos="gzip", + compression_mime_types="text/html,text/plain", + ) + conf = self._generate(fe) + self.assertIn("filter compression", conf) + self.assertIn("compression algo gzip", conf) + self.assertIn("compression type text/html,text/plain", conf) + + def test_compression_disabled_no_filter(self): + fe = self._make_frontend(mode="http", enable_compression=False) + conf = self._generate(fe) + self.assertNotIn("filter compression", conf) + + def test_compression_multiple_algos(self): + fe = self._make_frontend( + mode="http", + enable_compression=True, + compression_algos="gzip deflate", + ) + conf = self._generate(fe) + self.assertIn("compression algo gzip deflate", conf) + + def test_cache_and_compression_adds_htx(self): + """Cache + compression ensemble -> HAProxy HTX proxy doit etre active""" + fe = self._make_frontend( + mode="http", + enable_cache=True, + enable_compression=True, + ) + conf = self._generate(fe) + self.assertIn("option http-use-htx", conf) + + + +class TestFrontendLogModeHealthcheck(FrontendConfTestBase): + """Mode LOG + healthcheck_service """ + + def test_healthcheck_service_enabled_adds_tcp_check(self): + fe = self._make_frontend( + mode="log", + listening_mode="tcp", + healthcheck_service=True, + ) + conf = self._generate(fe) + self.assertIn("option tcp-check", conf) + self.assertIn("tcp-check connect linger", conf) + self.assertIn("tcp-request connection reject if { nbsrv() lt 1 }", conf) + + def test_healthcheck_service_disabled_no_tcp_check(self): + fe = self._make_frontend( + mode="log", + listening_mode="tcp", + healthcheck_service=False, + ) + conf = self._generate(fe) + self.assertNotIn("option tcp-check", conf) + + def test_healthcheck_adds_check_on_server_lines(self): + """Avec healthcheck, les lignes 'server' doivent inclure les directives check""" + from unittest.mock import MagicMock + listener_mock = MagicMock() + listener_mock.generate_conf.return_value = "bind 127.0.0.5:10001" + listener_mock.generate_server_conf.return_value = "127.0.0.4:10001" + + fe = self._make_frontend( + mode="log", + listening_mode="tcp", + healthcheck_service=True, + ) + conf = self._generate(fe, listener_list=[listener_mock]) + self.assertIn("check inter 5s", conf) + self.assertIn("observe layer4", conf) + + + +class TestFrontendXForwardedProto(FrontendConfTestBase): + """ + X-Forwarded-Proto (present par defaut en HTTP) + """ + + def test_http_mode_adds_x_forwarded_proto(self): + """En mode HTTP, HAProxy doit injecter X-Forwarded-Proto""" + fe = self._make_frontend(mode="http") + conf = self._generate(fe) + self.assertIn("X-Forwarded-Proto", conf) + self.assertIn("ssl_fc", conf) + + def test_tcp_mode_no_x_forwarded_proto(self): + fe = self._make_frontend(mode="tcp") + conf = self._generate(fe) + self.assertNotIn("X-Forwarded-Proto", conf) + + + +class TestFrontendWithListeners(FrontendConfTestBase): + """ + Teste generate_conf() quand des listeners reels sont injectes. + On mocke generate_conf() du listener pour rester unitaire. + """ + + def _make_listener_mock(self, addr_port="127.0.0.5:443", server_conf="127.0.0.4:10001"): + m = MagicMock() + m.generate_conf.return_value = f"bind {addr_port}" + m.generate_server_conf.return_value = server_conf + return m + + def test_single_listener_bind_appears_in_conf(self): + listener = self._make_listener_mock("127.0.0.5:443") + fe = self._make_frontend(mode="http") + conf = self._generate(fe, listener_list=[listener]) + self.assertIn("bind 127.0.0.5:443", conf) + + def test_multiple_listeners_all_appear(self): + l1 = self._make_listener_mock("127.0.0.5:80") + l2 = self._make_listener_mock("127.0.0.5:443") + fe = self._make_frontend(mode="http") + conf = self._generate(fe, listener_list=[l1, l2]) + self.assertIn("bind 127.0.0.5:80", conf) + self.assertIn("bind 127.0.0.5:443", conf) + + def test_no_listener_no_bind_line(self): + fe = self._make_frontend(mode="http") + conf = self._generate(fe, listener_list=[]) + self.assertNotIn("bind ", conf) + + def test_log_mode_generates_server_lines_from_listeners(self): + """En mode log, les listeners deviennent des lignes 'server'""" + listener = self._make_listener_mock(server_conf="127.0.0.4:10001") + fe = self._make_frontend(mode="log", listening_mode="tcp", name="log-fe") + conf = self._generate(fe, listener_list=[listener]) + self.assertIn("server server_log-fe-1 127.0.0.4:10001", conf) + + def test_log_mode_multiple_listeners_multiple_server_lines(self): + l1 = self._make_listener_mock(server_conf="127.0.0.4:10001") + l2 = self._make_listener_mock(server_conf="127.0.0.4:10002") + fe = self._make_frontend(mode="log", listening_mode="tcp", name="multi-log-fe") + conf = self._generate(fe, listener_list=[l1, l2]) + self.assertIn("server server_multi-log-fe-1 127.0.0.4:10001", conf) + self.assertIn("server server_multi-log-fe-2 127.0.0.4:10002", conf) + + + +class TestFrontendWithWorkflows(FrontendConfTestBase): + """ + Verifie la generation de la section WORKFLOWS dans la conf HAProxy + On patche workflow_set pour ne pas creer de vrais objets Workflow/Backend + """ + + def _make_workflow_mock(self, wf_id=1, name="test-wf", fqdn="app.example.com", + public_dir="/", mode="http", backend_name="my-backend", + timeout_connect=2000, timeout_server=30, + enable_cors_policy=False, cors_allowed_methods=None, + cors_allowed_origins="*", cors_allowed_headers="*", + cors_max_age=86400): + return { + "id": wf_id, + "name": name, + "fqdn": fqdn, + "public_dir": public_dir, + "mode": mode, + "backend_name": backend_name, + "timeout_connect": timeout_connect, + "timeout_server": timeout_server, + "enable_cors_policy": enable_cors_policy, + "cors_allowed_methods": cors_allowed_methods or ["GET", "POST"], + "cors_allowed_origins": cors_allowed_origins, + "cors_allowed_headers": cors_allowed_headers, + "cors_max_age": cors_max_age, + } + + def _generate_with_workflows(self, workflows, mode="http", **fe_kwargs): + fe = self._make_frontend(mode=mode, **fe_kwargs) + # Patch workflow_set pour retourner nos mocks + with patch.object(fe, "workflow_set") as mock_ws: + mock_ws.filter.return_value = [ + type("W", (), w)() for w in workflows + ] + return fe.generate_conf(listener_list=[], header_list=[]) + + def test_workflow_creates_acl_for_fqdn(self): + wf = self._make_workflow_mock(wf_id=42, fqdn="api.example.com", public_dir="/api") + conf = self._generate_with_workflows([wf]) + self.assertIn("acl workflow_42_host hdr(host) api.example.com", conf) + + def test_workflow_creates_acl_for_path(self): + wf = self._make_workflow_mock(wf_id=42, fqdn="app.example.com", public_dir="/admin") + conf = self._generate_with_workflows([wf]) + self.assertIn("acl workflow_42_dir path -i -m beg /admin", conf) + + def test_workflow_creates_use_backend_directive(self): + wf = self._make_workflow_mock(wf_id=99, fqdn="app.example.com", public_dir="/") + conf = self._generate_with_workflows([wf]) + self.assertIn("use_backend Workflow_99", conf) + + def test_workflow_creates_backend_section(self): + wf = self._make_workflow_mock(wf_id=7, backend_name="srv-backend", + timeout_connect=3000, timeout_server=60) + conf = self._generate_with_workflows([wf]) + self.assertIn("backend Workflow_7", conf) + self.assertIn("timeout connect 3000ms", conf) + self.assertIn("timeout server 60s", conf) + + def test_https_redirect_inside_workflows_block(self): + """https_redirect=True doit generer la redirection dans le bloc workflows""" + wf = self._make_workflow_mock(wf_id=1) + conf = self._generate_with_workflows([wf], https_redirect=True) + self.assertIn("redirect scheme https code 301", conf) + + def test_multiple_workflows_ordered_by_path_depth(self): + """ + Les workflows doivent etre tries par profondeur de chemin decroissante + pour que les regles plus specifiques soient evaluees en premier. + """ + wf_root = self._make_workflow_mock(wf_id=1, public_dir="/") + wf_deep = self._make_workflow_mock(wf_id=2, public_dir="/api/v1/users") + wf_mid = self._make_workflow_mock(wf_id=3, public_dir="/api") + conf = self._generate_with_workflows([wf_root, wf_deep, wf_mid]) + pos_root = conf.find("use_backend Workflow_1") + pos_mid = conf.find("use_backend Workflow_3") + pos_deep = conf.find("use_backend Workflow_2") + self.assertLess(pos_deep, pos_mid) + self.assertLess(pos_mid, pos_root) + + def test_cors_policy_enabled_adds_lua_cors(self): + wf = self._make_workflow_mock( + wf_id=5, + enable_cors_policy=True, + cors_allowed_methods=["GET", "POST", "OPTIONS"], + ) + conf = self._generate_with_workflows([wf]) + self.assertIn("http-request lua.cors", conf) + self.assertIn("http-response lua.cors", conf) + + def test_cors_policy_disabled_no_lua_cors(self): + wf = self._make_workflow_mock(wf_id=5, enable_cors_policy=False) + conf = self._generate_with_workflows([wf]) + self.assertNotIn("lua.cors", conf) + + + +class TestListenerGenerateConf(TestCase): + """ + Teste Listener.generate_conf() independamment du Frontend. + Cette methode produit la directive 'bind' d'HAProxy. + """ + + TEST_CASE_NAME = f"{__name__}_listener" + + def setUp(self): + self.node = Node.objects.create( + name=f"node_{self.TEST_CASE_NAME}", + management_ip="10.0.0.1", + internet_ip="1.2.3.4", + backends_outgoing_ip="10.0.0.1", + logom_outgoing_ip="10.0.0.1", + ) + self.nic = NetworkInterfaceCard.objects.create( + dev="vtnet0", + node=self.node, + ) + self.netaddr_v4 = NetworkAddress.objects.create( + name=f"addr_v4_{self.TEST_CASE_NAME}", + type="system", + ip="192.168.1.10", + prefix_or_netmask="24", + ) + NetworkAddressNIC.objects.create( + nic=self.nic, + network_address=self.netaddr_v4, + ) + self.tenants = Tenants.objects.create(name=f"tenants_{self.TEST_CASE_NAME}") + self.frontend = Frontend.objects.create( + name=f"fe_{self.TEST_CASE_NAME}", + mode="http", + tenants_config=self.tenants, + ) + + def test_listener_generate_conf_starts_with_bind(self): + listener = Listener.objects.create( + network_address=self.netaddr_v4, + port=80, + frontend=self.frontend, + ) + conf = listener.generate_conf() + self.assertTrue(conf.startswith("bind ")) + + def test_listener_generate_conf_uses_haproxy_jail_address_v4(self): + """ + HAProxy ecoute sur l'adresse de la jail HAProxy (127.0.0.5), pas + sur l'adresse reseau reelle - c'est le fonctionnement par conception. + """ + listener = Listener.objects.create( + network_address=self.netaddr_v4, + port=8080, + frontend=self.frontend, + ) + conf = listener.generate_conf() + # L'adresse HAProxy jail inet est 127.0.0.5 + self.assertIn("127.0.0.5", conf) + + def test_listener_generate_conf_includes_rsyslog_port(self): + listener = Listener.objects.create( + network_address=self.netaddr_v4, + port=443, + frontend=self.frontend, + ) + conf = listener.generate_conf() + self.assertIn(str(listener.rsyslog_port), conf) + + def test_listener_no_tls_profiles_no_ssl_directive(self): + listener = Listener.objects.create( + network_address=self.netaddr_v4, + port=80, + frontend=self.frontend, + ) + conf = listener.generate_conf() + self.assertFalse(listener.is_tls) + self.assertNotIn("ssl", conf) + + def test_listener_is_tls_false_without_profiles(self): + listener = Listener.objects.create( + network_address=self.netaddr_v4, + port=443, + frontend=self.frontend, + ) + self.assertFalse(listener.is_tls) + + def test_listener_generate_rsyslog_conf_uses_rsyslog_jail_address(self): + """ + generate_rsyslog_conf() doit utiliser l'adresse rsyslog jail (127.0.0.4). + """ + listener = Listener.objects.create( + network_address=self.netaddr_v4, + port=514, + frontend=self.frontend, + ) + conf = listener.generate_rsyslog_conf() + self.assertIn("127.0.0.4", conf) + self.assertIn(str(listener.rsyslog_port), conf) + + def test_listener_rsyslog_port_auto_increments(self): + """Chaque nouveau Listener doit obtenir un rsyslog_port unique""" + l1 = Listener.objects.create( + network_address=self.netaddr_v4, + port=80, + frontend=self.frontend, + ) + netaddr2 = NetworkAddress.objects.create( + name=f"addr_v4_2_{self.TEST_CASE_NAME}", + type="system", + ip="192.168.1.11", + prefix_or_netmask="24", + ) + l2 = Listener.objects.create( + network_address=netaddr2, + port=443, + frontend=self.frontend, + ) + self.assertNotEqual(l1.rsyslog_port, l2.rsyslog_port) + self.assertEqual(l2.rsyslog_port, l1.rsyslog_port + 1) + + def test_generate_server_conf_uses_rsyslog_jail_and_port(self): + listener = Listener.objects.create( + network_address=self.netaddr_v4, + port=514, + frontend=self.frontend, + ) + conf = listener.generate_server_conf() + self.assertIn("127.0.0.4", conf) + self.assertIn(str(listener.rsyslog_port), conf) diff --git a/vulture_os/gui/tests/test_logfwd_logom.py b/vulture_os/gui/tests/test_logfwd_logom.py new file mode 100644 index 000000000..c578eec66 --- /dev/null +++ b/vulture_os/gui/tests/test_logfwd_logom.py @@ -0,0 +1,732 @@ +#!/home/vlt-os/env/bin/python +"""This file is part of Vulture OS. + +Vulture OS is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. +""" + +__author__ = "VultureProject contributors" +__credits__ = [] +__license__ = "GPLv3" +__version__ = "4.0.0" +__maintainer__ = "Vulture OS" +__email__ = "contact@vultureproject.org" + + +from django.test import TestCase +from unittest.mock import patch, MagicMock + +from applications.logfwd.models import LogOM, LogOMFWD, LogOMFile, LogOMElasticSearch, LogOMHIREDIS +from system.tenants.models import Tenants +from services.frontend.models import Frontend + + +# Utils +# def _make_tenants(suffix="default"): +# return Tenants.objects.get_or_create(name=f"tenants_logfwd_{suffix}")[0] + +# def _make_frontend(name, log_condition="", tenants=None, **kwargs): +# t = tenants or _make_tenants(name) +# return Frontend.objects.create( +# name=name, +# mode="log", +# listening_mode="file", +# tenants_config=t, +# log_condition=log_condition, +# **kwargs, +# ) + + +class TestLogOMFWDConfGeneration(TestCase): + TEST_CASE_NAME = f"{__name__}_fwd" + + def setUp(self): + patch("applications.logfwd.models.logger").start() + self.addCleanup(patch.stopall) + + def _fwd(self, **kwargs) -> LogOMFWD: + defaults = dict( + name=f"fwd_{id(kwargs)}", + target="10.0.0.10", + port=514, + protocol="tcp", + zip_level=0, + enabled=True, + ) + defaults.update(kwargs) + return LogOMFWD.objects.create(**defaults) + + def _conf(self, logom: LogOMFWD, ruleset="test_ruleset") -> str: + return LogOM.generate_conf(logom, ruleset, frontend="test_frontend") + + # Protocoles + def test_tcp_protocol_in_conf(self): + fwd = self._fwd(protocol="tcp") + conf = self._conf(fwd) + self.assertIn('Protocol="tcp"', conf) + + def test_udp_protocol_in_conf(self): + fwd = self._fwd(protocol="udp") + conf = self._conf(fwd) + self.assertIn('Protocol="udp"', conf) + + def test_target_appears_in_conf(self): + fwd = self._fwd(target="192.168.42.100") + conf = self._conf(fwd) + self.assertIn('Target="192.168.42.100"', conf) + + def test_port_appears_in_conf(self): + fwd = self._fwd(port=6514) + conf = self._conf(fwd) + self.assertIn('Port="6514"', conf) + + def test_action_type_is_omfwd(self): + fwd = self._fwd() + conf = self._conf(fwd) + self.assertIn('type="omfwd"', conf) + + + # Compression + def test_zip_level_zero_renders_as_zero(self): + fwd = self._fwd(zip_level=0) + conf = self._conf(fwd) + self.assertIn('ZipLevel="0"', conf) + + def test_zip_level_nonzero_renders_correctly(self): + fwd = self._fwd(zip_level=6) + conf = self._conf(fwd) + self.assertIn('ZipLevel="6"', conf) + + def test_zip_level_max_9(self): + fwd = self._fwd(zip_level=9) + conf = self._conf(fwd) + self.assertIn('ZipLevel="9"', conf) + + + # Rate limiting + def test_no_ratelimit_no_interval_directive(self): + fwd = self._fwd(ratelimit_interval=None, ratelimit_burst=None) + conf = self._conf(fwd) + self.assertNotIn("RateLimit.Interval", conf) + self.assertNotIn("RateLimit.Burst", conf) + + def test_ratelimit_interval_appears_when_set(self): + fwd = self._fwd(ratelimit_interval=60, ratelimit_burst=1000) + conf = self._conf(fwd) + self.assertIn('RateLimit.Interval="60"', conf) + self.assertIn('RateLimit.Burst="1000"', conf) + + # Send as raw + def test_send_as_raw_uses_raw_message_template(self): + fwd = self._fwd(send_as_raw=True) + conf = self._conf(fwd) + self.assertIn('Template="raw_message"', conf) + + def test_send_as_raw_false_uses_ruleset_template(self): + fwd = self._fwd(send_as_raw=False) + conf = self._conf(fwd, ruleset="my_ruleset") + self.assertIn('Template="my_ruleset"', conf) + + + # Queue + def test_queue_size_in_conf(self): + fwd = self._fwd(queue_size=5000) + conf = self._conf(fwd) + self.assertIn('queue.size="5000"', conf) + + def test_dequeue_size_in_conf(self): + fwd = self._fwd(dequeue_size=150) + conf = self._conf(fwd) + self.assertIn('queue.dequeuebatchsize="150"', conf) + + # Retry + DA queue + def test_no_retry_no_resume_retry_count(self): + fwd = self._fwd(enable_retry=False) + conf = self._conf(fwd) + self.assertNotIn("action.ResumeRetryCount", conf) + + def test_retry_enabled_adds_resume_retry_count(self): + fwd = self._fwd(enable_retry=True) + conf = self._conf(fwd) + self.assertIn('action.ResumeRetryCount = "-1"', conf) + + def test_disk_assist_requires_retry(self): + fwd = self._fwd(enable_retry=False, enable_disk_assist=True) + conf = self._conf(fwd) + self.assertNotIn("queue.highWatermark", conf) + + def test_disk_assist_with_retry_adds_watermarks(self): + fwd = self._fwd( + enable_retry=True, + enable_disk_assist=True, + high_watermark=9000, + low_watermark=7000, + spool_directory="/var/spool/rsyslog", + ) + conf = self._conf(fwd) + self.assertIn('queue.highWatermark="9000"', conf) + self.assertIn('queue.lowWatermark="7000"', conf) + self.assertIn('queue.spoolDirectory="/var/spool/rsyslog"', conf) + self.assertIn('queue.saveOnShutdown="on"', conf) + + def test_output_name_contains_frontend_name(self): + fwd = self._fwd(name="my-forwarder") + conf = self._conf(fwd, ruleset="r") + self.assertIn("my-forwarder_test_frontend", conf) + + + +class TestLogOMFileConfGeneration(TestCase): + """ + Template dynamique dans le nom de fichier via Django Template + """ + def setUp(self): + patch("applications.logfwd.models.logger").start() + patch("applications.logfwd.models.Frontend.objects.filter", return_value=[]).start() + self.addCleanup(patch.stopall) + + def _omfile(self, **kwargs) -> LogOMFile: + defaults = dict( + name=f"file_{id(kwargs)}", + file="/var/log/vulture/{{ruleset}}.log", + flush_interval=1, + async_writing=True, + retention_time=30, + rotation_period="daily", + enabled=True, + ) + defaults.update(kwargs) + return LogOMFile.objects.create(**defaults) + + def _conf(self, logom: LogOMFile, ruleset="haproxy") -> str: + return LogOM.generate_conf(logom, ruleset, frontend="fe") + + def test_action_type_is_omfile(self): + f = self._omfile() + conf = self._conf(f) + self.assertIn('type="omfile"', conf) + + def test_async_writing_on(self): + f = self._omfile(async_writing=True) + conf = self._conf(f) + self.assertIn("asyncWriting", conf) + + def test_async_writing_off(self): + f = self._omfile(async_writing=False) + conf = self._conf(f) + # "off" doit apparaître pour asyncWriting + self.assertIn('"off"', conf) + + def test_template_id_is_sha256_of_name_and_ruleset(self): + import hashlib + f = self._omfile(name="deterministic-file") + ruleset = "my_ruleset" + expected = hashlib.sha256( + (ruleset + "deterministic-file").encode("utf-8") + ).hexdigest() + self.assertEqual(f.template_id(ruleset=ruleset), expected) + + def test_ruleset_interpolated_in_filename(self): + f = self._omfile(file="/var/log/{{ruleset}}.log") + conf = self._conf(f, ruleset="nginx") + self.assertIn("/var/log/nginx.log", conf) + + def test_get_rsyslog_template_empty_when_no_frontends(self): + f = self._omfile(name="orphan-file") + self.assertEqual(f.get_rsyslog_template(), "") + + def test_flush_interval_in_conf(self): + f = self._omfile(flush_interval=5) + conf = self._conf(f) + self.assertIn('"5"', conf) + + + +class TestLogOMHIREDISConfGeneration(TestCase): + """ + LogOMHIREDIS (omhiredis) -> om_hiredis.tpl + Modes : queue, set, publish, stream. Cle dynamique. TLS. + """ + + def setUp(self): + patch("applications.logfwd.models.logger").start() + patch("applications.logfwd.models.Frontend.objects.filter", return_value=[]).start() + self.addCleanup(patch.stopall) + + def _redis(self, **kwargs) -> LogOMHIREDIS: + defaults = dict( + name=f"redis_{id(kwargs)}", + target="10.0.0.3", + port=6379, + mode="queue", + key="vulture-logs", + dynamic_key=False, + pwd=None, + enabled=True, + ) + defaults.update(kwargs) + return LogOMHIREDIS.objects.create(**defaults) + + def _conf(self, logom: LogOMHIREDIS, ruleset="haproxy") -> str: + return LogOM.generate_conf(logom, ruleset, frontend="fe") + + def test_action_type_is_omhiredis(self): + r = self._redis() + conf = self._conf(r) + self.assertIn('type="omhiredis"', conf) + + def test_server_and_port_in_conf(self): + r = self._redis(target="172.16.0.1", port=6380) + conf = self._conf(r) + self.assertIn('server="172.16.0.1"', conf) + self.assertIn('serverport="6380"', conf) + + def test_queue_mode_in_conf(self): + r = self._redis(mode="queue") + conf = self._conf(r) + self.assertIn('mode="queue"', conf) + + def test_set_mode_in_conf(self): + r = self._redis(mode="set") + conf = self._conf(r) + self.assertIn('mode="set"', conf) + + def test_publish_mode_in_conf(self): + r = self._redis(mode="publish") + conf = self._conf(r) + self.assertIn('mode="publish"', conf) + + def test_static_key_uses_key_directive(self): + r = self._redis(key="my-static-key", dynamic_key=False) + conf = self._conf(r) + self.assertIn('key="my-static-key"', conf) + self.assertNotIn("DynaKey", conf) + + def test_dynamic_key_uses_dynakey_and_template_id(self): + r = self._redis(key="{{ruleset}}-logs", dynamic_key=True) + conf = self._conf(r, ruleset="nginx") + self.assertIn('DynaKey="on"', conf) + # La cle dans la conf est le template_id, pas la valeur brute + self.assertIn(r.template_id(), conf) + + def test_password_included_when_set(self): + r = self._redis(pwd="s3cr3t!") + conf = self._conf(r) + self.assertIn('ServerPassword="s3cr3t!"', conf) + + def test_no_password_no_server_password_directive(self): + r = self._redis(pwd=None) + conf = self._conf(r) + self.assertNotIn("ServerPassword", conf) + + def test_queue_mode_rpush_option(self): + r = self._redis(mode="queue", use_rpush=True) + conf = self._conf(r) + self.assertIn('Userpush="on"', conf) + + def test_queue_mode_lpush_default(self): + r = self._redis(mode="queue", use_rpush=False) + conf = self._conf(r) + self.assertIn('Userpush="off"', conf) + + def test_set_mode_with_expire_key(self): + r = self._redis(mode="set", expire_key=3600) + conf = self._conf(r) + self.assertIn('Expiration="3600"', conf) + + def test_get_rsyslog_template_with_dynamic_key(self): + r = self._redis(name="dyn-redis", dynamic_key=True, key="{{ruleset}}") + # Simule un frontend associe + with patch("applications.logfwd.models.Frontend.objects.filter") as mock_filter: + mock_qs = MagicMock() + mock_qs.exists.return_value = True + mock_filter.return_value = mock_qs + tpl = r.get_rsyslog_template() + self.assertIn(r.template_id(), tpl) + self.assertIn("template(", tpl) + + def test_get_rsyslog_template_empty_without_dynamic_key(self): + r = self._redis(dynamic_key=False) + tpl = r.get_rsyslog_template() + self.assertEqual(tpl, "") + + +# ───────────────────────────────────────────────────────────────────────────── + +class TestLogOMElasticSearchConfGeneration(TestCase): + """ + LogOMElasticSearch (omelasticsearch) -> om_elasticsearch.tpl + Serveurs multiples, index pattern, auth, TLS. + """ + + def setUp(self): + patch("applications.logfwd.models.logger").start() + self.addCleanup(patch.stopall) + + def _els(self, **kwargs) -> LogOMElasticSearch: + defaults = dict( + name=f"els_{id(kwargs)}", + servers='["https://els-1:9200"]', + index_pattern=f"mylog-{id(kwargs)}-%$!timestamp:1:10%", + uid=None, + pwd=None, + enabled=True, + ) + defaults.update(kwargs) + return LogOMElasticSearch.objects.create(**defaults) + + def _conf(self, logom: LogOMElasticSearch, ruleset="haproxy") -> str: + return LogOM.generate_conf(logom, ruleset, frontend="fe") + + def test_action_type_is_omelasticsearch(self): + e = self._els() + conf = self._conf(e) + self.assertIn('type="omelasticsearch"', conf) + + def test_single_server_in_conf(self): + e = self._els(servers='["https://els-prod:9200"]') + conf = self._conf(e) + self.assertIn("els-prod", conf) + + def test_index_pattern_in_conf(self): + e = self._els(index_pattern="vulture-logs-%$!timestamp:1:10%") + conf = self._conf(e) + self.assertIn("vulture-logs", conf) + + def test_uid_and_pwd_when_set(self): + e = self._els(uid="elastic", pwd="changeme") + conf = self._conf(e) + self.assertIn("elastic", conf) + self.assertIn("changeme", conf) + + def test_no_uid_no_auth_directives(self): + e = self._els(uid=None, pwd=None) + conf = self._conf(e) + # Les directives d'auth ne doivent pas apparaitre + self.assertNotIn("uid=", conf) + + def test_template_property_returns_correct_template(self): + e = self._els() + self.assertEqual(e.template, "om_elasticsearch.tpl") + + def test_template_id_is_sha256_of_name(self): + import hashlib + e = self._els(name="stable-els-name") + expected = hashlib.sha256("stable-els-name".encode("utf-8")).hexdigest() + self.assertEqual(e.template_id(), expected) + + +class TestLogOMRenameLogConditionPropagation(TestCase): + """ + Bug historique documente dans le changelog : + quand un LogOM est renomme via logfwd_edit(), + tous les Frontends qui l'utilisent dans log_condition doivent etre mis à jour + + Pattern dans le code (logfwd/views.py) : + frontend.log_condition = frontend.log_condition.replace( + f"{{{{{log_om_old_name}}}}}", f"{{{{{log_om.name}}}}}" + ) + + Ces tests valident la logique de remplacement directement + sur les instances de modeles + """ + + TEST_CASE_NAME = f"{__name__}_rename" + + def setUp(self): + patch("applications.logfwd.models.logger").start() + patch("services.frontend.models.logger").start() + self.addCleanup(patch.stopall) + self.tenants = Tenants.objects.create(name=f"t_{self.TEST_CASE_NAME}") + + def _fe(self, name, log_condition="", **kwargs): + return Frontend.objects.create( + name=name, + mode="log", + listening_mode="file", + tenants_config=self.tenants, + log_condition=log_condition, + **kwargs, + ) + + def _simulate_rename(self, old_name: str, new_name: str): + """ + Reproduit exactement la logique de propagation de logfwd_edit() : + pour chaque frontend qui reference old_name dans log_condition, + applique le replace + """ + from services.frontend.models import Frontend as FE + frontends = FE.objects.filter(log_condition__contains=f"{{{{{old_name}}}}}") + for fe in frontends: + fe.log_condition = fe.log_condition.replace( + f"{{{{{old_name}}}}}", + f"{{{{{new_name}}}}}", + ) + fe.save() + + + # Cas nominaux + def test_rename_updates_single_frontend(self): + fe = self._fe("fe-a", log_condition="if {{old-fwd}} then action") + self._simulate_rename("old-fwd", "new-fwd") + fe.refresh_from_db() + self.assertNotIn("{{old-fwd}}", fe.log_condition) + self.assertIn("{{new-fwd}}", fe.log_condition) + + def test_rename_updates_all_referencing_frontends(self): + """Plusieurs frontends referencant le meme forwarder doivent tous etre mis a jour""" + frontends = [ + self._fe(f"fe-multi-{i}", log_condition=f"{{{{shared-fwd}}}} action{i}") + for i in range(5) + ] + self._simulate_rename("shared-fwd", "renamed-fwd") + for fe in frontends: + fe.refresh_from_db() + self.assertNotIn("{{shared-fwd}}", fe.log_condition) + self.assertIn("{{renamed-fwd}}", fe.log_condition) + + def test_rename_does_not_affect_unrelated_frontends(self): + """Un frontend sans reference a LogOM renomme ne doit pas changer.""" + unrelated = self._fe("fe-unrelated", log_condition="{{other-fwd}} something") + self._simulate_rename("target-fwd", "new-fwd") + unrelated.refresh_from_db() + self.assertEqual(unrelated.log_condition, "{{other-fwd}} something") + + def test_no_rename_no_change(self): + """old_name == new_name -> aucune modification.""" + original = "if {{stable-fwd}} then action" + fe = self._fe("fe-stable", log_condition=original) + self._simulate_rename("stable-fwd", "stable-fwd") + fe.refresh_from_db() + self.assertEqual(fe.log_condition, original) + + # Cas exotiques + def test_multiple_occurrences_in_same_condition_all_replaced(self): + """Si le nom apparait plusieurs fois dans log_condition, tous sont remplaces""" + fe = self._fe( + "fe-multi-ref", + log_condition="{{multi}} OR {{multi}} as backup" + ) + self._simulate_rename("multi", "replaced") + fe.refresh_from_db() + self.assertNotIn("{{multi}}", fe.log_condition) + self.assertEqual(fe.log_condition.count("{{replaced}}"), 2) + + def test_rename_with_hyphens_in_name(self): + fe = self._fe("fe-hyphens", log_condition="filter {{fwd-v2-prod}} active") + self._simulate_rename("fwd-v2-prod", "fwd-v3-prod") + fe.refresh_from_db() + self.assertIn("{{fwd-v3-prod}}", fe.log_condition) + + def test_rename_with_underscores_in_name(self): + fe = self._fe("fe-underscores", log_condition="{{fwd_prod_01}} forward") + self._simulate_rename("fwd_prod_01", "fwd_prod_02") + fe.refresh_from_db() + self.assertIn("{{fwd_prod_02}}", fe.log_condition) + + def test_rename_with_numbers_in_name(self): + fe = self._fe("fe-numbers", log_condition="{{logom123}} action") + self._simulate_rename("logom123", "logom456") + fe.refresh_from_db() + self.assertIn("{{logom456}}", fe.log_condition) + + def test_rename_does_not_match_partial_names(self): + """ + '{{fwd}}' ne doit PAS etre remplace si on renomme '{{fwd-extended}}' + Les doubles accolades delimitent exactement le nom + """ + fe = self._fe("fe-partial", log_condition="{{fwd}} is not {{fwd-extended}}") + self._simulate_rename("fwd-extended", "fwd-renamed") + fe.refresh_from_db() + # {{fwd}} ne doit pas avoir change + self.assertIn("{{fwd}}", fe.log_condition) + self.assertIn("{{fwd-renamed}}", fe.log_condition) + + def test_rename_multiline_log_condition(self): + """log_condition peut etre multiligne""" + condition = "if {{alpha-fwd}} then\n call action1\nif {{alpha-fwd}} then\n call action2" + fe = self._fe("fe-multiline", log_condition=condition) + self._simulate_rename("alpha-fwd", "beta-fwd") + fe.refresh_from_db() + self.assertNotIn("{{alpha-fwd}}", fe.log_condition) + self.assertEqual(fe.log_condition.count("{{beta-fwd}}"), 2) + + def test_empty_log_condition_not_affected(self): + """Un log_condition vide ne doit pas lever d'exception""" + fe = self._fe("fe-empty-cond", log_condition="") + self._simulate_rename("any-fwd", "new-fwd") + fe.refresh_from_db() + self.assertEqual(fe.log_condition, "") + + def test_log_condition_with_no_pattern_not_affected(self): + """log_condition sans aucun {{}} ne doit pas être modifie""" + original = "plain text condition no forwarder" + fe = self._fe("fe-no-pattern", log_condition=original) + self._simulate_rename("some-fwd", "other-fwd") + fe.refresh_from_db() + self.assertEqual(fe.log_condition, original) + + def test_log_condition_with_multiple_different_forwarders(self): + """Seul le forwarder renomme doit etre modifie, pas les autres""" + fe = self._fe( + "fe-mixed", + log_condition="{{fwd-a}} and {{fwd-b}} and {{fwd-a}}" + ) + self._simulate_rename("fwd-a", "fwd-alpha") + fe.refresh_from_db() + self.assertNotIn("{{fwd-a}}", fe.log_condition) + self.assertIn("{{fwd-b}}", fe.log_condition) # inchange + self.assertEqual(fe.log_condition.count("{{fwd-alpha}}"), 2) + + + +class TestRenderLogCondition(TestCase): + """ + Frontend.render_log_condition() resout les tokens {{name}} en conf rsyslog reelle + C'est la methode qui construit la section rsyslog a partir du log_condition du frontend + """ + + TEST_CASE_NAME = f"{__name__}_render" + + def setUp(self): + patch("applications.logfwd.models.logger").start() + patch("services.frontend.models.logger").start() + patch("services.frontend.models.Cluster.get_current_node").start() + patch("services.frontend.models.Cluster.get_global_config").start() + self.addCleanup(patch.stopall) + self.tenants = Tenants.objects.create(name=f"t_{self.TEST_CASE_NAME}") + + def _fe(self, name, log_condition="", **kwargs): + return Frontend.objects.create( + name=name, + mode="log", + listening_mode="file", + tenants_config=self.tenants, + log_condition=log_condition, + enable_logging=True, + **kwargs, + ) + + def test_empty_log_condition_returns_newline(self): + """log_condition="" -> resultat vide (juste un newline final)""" + fe = self._fe("fe-empty-render") + result = fe.render_log_condition() + self.assertEqual(result.strip(), "") + + def test_log_condition_without_double_braces_passed_through(self): + """Lignes sans {{ }} sont ignorees (ne contiennent pas de forwarder)""" + fe = self._fe("fe-no-braces", log_condition="plain log condition\nno forwarder") + result = fe.render_log_condition() + # Pas d'erreur, resultat passe tel quel + self.assertIsInstance(result, str) + + def test_log_condition_with_enabled_forwarder_resolves(self): + """ + {{fwd-name}} doit etre remplace par la conf rsyslog du forwarder correspondant + """ + fwd = LogOMFWD.objects.create( + name=f"render-test-fwd-{self.TEST_CASE_NAME}", + target="10.0.0.99", + port=514, + protocol="tcp", + enabled=True, + ) + fe = self._fe( + "fe-render-resolved", + log_condition=f"if 1==1 then {{{{{fwd.name}}}}}", + ) + fe.log_forwarders.add(fwd) + + result = fe.render_log_condition() + # La conf doit contenir quelque chose de rsyslog (type omfwd) + self.assertIn('type="omfwd"', result) + + def test_log_condition_disabled_forwarder_excluded(self): + """ + Un LogOM desactive (enabled=False) ne doit pas etre rendu + dans le log_condition final + """ + fwd = LogOMFWD.objects.create( + name=f"disabled-fwd-{self.TEST_CASE_NAME}", + target="10.0.0.50", + port=514, + protocol="tcp", + enabled=False, + ) + fe = self._fe( + "fe-disabled-fwd", + log_condition=f"if 1==1 then {{{{{fwd.name}}}}}", + ) + fe.log_forwarders.add(fwd) + + result = fe.render_log_condition() + # Le forwarder desactive ne doit pas generer d'action omfwd + self.assertNotIn('type="omfwd"', result) + + + +class TestLogOMSelectMethods(TestCase): + """ + LogOM.select_log_om() et select_log_om_by_name() doivent retrouver + la sous-classe a partir de l'ID ou du nom de la base abstraite + """ + + def setUp(self): + patch("applications.logfwd.models.logger").start() + self.addCleanup(patch.stopall) + + def test_select_log_om_by_id_returns_correct_subclass_fwd(self): + fwd = LogOMFWD.objects.create( + name="sel-fwd", target="1.2.3.4", port=514, protocol="tcp" + ) + result = LogOM().select_log_om(fwd.pk) + self.assertIsInstance(result, LogOMFWD) + self.assertEqual(result.pk, fwd.pk) + + def test_select_log_om_by_id_returns_correct_subclass_file(self): + f = LogOMFile.objects.create( + name="sel-file", file="/var/log/test.log" + ) + result = LogOM().select_log_om(f.pk) + self.assertIsInstance(result, LogOMFile) + self.assertEqual(result.pk, f.pk) + + def test_select_log_om_by_id_returns_correct_subclass_hiredis(self): + r = LogOMHIREDIS.objects.create( + name="sel-redis", target="127.0.0.1", port=6379, mode="queue", key="k" + ) + result = LogOM().select_log_om(r.pk) + self.assertIsInstance(result, LogOMHIREDIS) + + def test_select_log_om_nonexistent_raises(self): + from django.core.exceptions import ObjectDoesNotExist + with self.assertRaises(ObjectDoesNotExist): + LogOM().select_log_om(99999999) + + def test_select_log_om_by_name_returns_correct_instance(self): + _ = LogOMFWD.objects.create( + name="by-name-fwd", target="1.2.3.4", port=514, protocol="tcp" + ) + result = LogOM().select_log_om_by_name("by-name-fwd") + self.assertIsInstance(result, LogOMFWD) + self.assertEqual(result.name, "by-name-fwd") + + def test_select_log_om_by_name_nonexistent_raises(self): + from django.core.exceptions import ObjectDoesNotExist + with self.assertRaises(ObjectDoesNotExist): + LogOM().select_log_om_by_name("this-name-does-not-exist-xyz") + + def test_template_id_is_sha256_of_name(self): + import hashlib + fwd = LogOMFWD.objects.create( + name="deterministic", target="1.2.3.4", port=514, protocol="tcp" + ) + expected = hashlib.sha256(b"deterministic").hexdigest() + self.assertEqual(fwd.template_id(), expected) + + def test_template_id_different_names_different_hashes(self): + fwd1 = LogOMFWD.objects.create(name="name-a", target="1.2.3.4", port=514, protocol="tcp") + fwd2 = LogOMFWD.objects.create(name="name-b", target="1.2.3.4", port=515, protocol="tcp") + self.assertNotEqual(fwd1.template_id(), fwd2.template_id()) diff --git a/vulture_os/system/tests/test_cluster_messagequeue.py b/vulture_os/system/tests/test_cluster_messagequeue.py new file mode 100644 index 000000000..492d2d2f3 --- /dev/null +++ b/vulture_os/system/tests/test_cluster_messagequeue.py @@ -0,0 +1,738 @@ +#!/home/vlt-os/env/bin/python +"""This file is part of Vulture OS. + +Vulture OS is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. +""" + +__author__ = "VultureProject contributors" +__credits__ = [] +__license__ = "GPLv3" +__version__ = "4.0.0" +__maintainer__ = "Vulture OS" +__email__ = "contact@vultureproject.org" + + +from django.test import TestCase, override_settings +from django.utils import timezone +from unittest.mock import patch +from datetime import timedelta +from time import sleep + +from system.cluster.models import Cluster, Node, MessageQueue + + + +def _make_node(name, management_ip="10.0.0.1", internet_ip="1.2.3.4", + backends_outgoing_ip="10.0.0.1", logom_outgoing_ip="10.0.0.1") -> Node: + return Node.objects.create( + name=name, + management_ip=management_ip, + internet_ip=internet_ip, + backends_outgoing_ip=backends_outgoing_ip, + logom_outgoing_ip=logom_outgoing_ip, + ) + + +def _make_pending_node(name) -> Node: + """Un noeud 'en attente de bootstrap' : management_ip vide -> ignore par api_request""" + return Node.objects.create( + name=name, + management_ip="", + internet_ip="1.2.3.4", + backends_outgoing_ip="1.2.3.4", + logom_outgoing_ip="1.2.3.4", + ) + + +ACTION_RSYSLOG = "services.rsyslogd.rsyslog.restart_service" +ACTION_HAPROXY = "services.haproxy.haproxy.reload_service" +ACTION_PF = "services.pf.pf.reload_service" +ACTION_BUILD = "services.haproxy.haproxy.build_conf" + + + +class TestClusterApiRequest(TestCase): + """ + Cluster.api_request() dispatche une action vers tous les noeuds (ou un seul). + Verifie la creation de MessageQueue, les retours de statut, et le filtrage + des noeuds pending (management_ip=''). + """ + + TEST_CASE_NAME = f"{__name__}_cluster" + + def setUp(self): + patch("system.cluster.models.logger").start() + self.addCleanup(patch.stopall) + self.node1 = _make_node(f"node1_{self.TEST_CASE_NAME}", "10.0.0.1") + self.node2 = _make_node(f"node2_{self.TEST_CASE_NAME}", "10.0.0.2") + + def tearDown(self): + MessageQueue.objects.all().delete() + + # Dispatch broadcast + def test_broadcast_creates_one_mq_per_node(self): + Cluster.api_request(ACTION_RSYSLOG) + self.assertEqual(MessageQueue.objects.count(), 2) + + def test_broadcast_targets_both_nodes(self): + Cluster.api_request(ACTION_RSYSLOG) + nodes_called = set(MessageQueue.objects.values_list("node_id", flat=True)) + self.assertIn(self.node1.pk, nodes_called) + self.assertIn(self.node2.pk, nodes_called) + + def test_returns_status_true_on_success(self): + result = Cluster.api_request(ACTION_RSYSLOG) + self.assertTrue(result["status"]) + + def test_returns_instances_list(self): + result = Cluster.api_request(ACTION_RSYSLOG) + self.assertIn("instances", result) + self.assertEqual(len(result["instances"]), 2) + + def test_action_and_config_stored_in_mq(self): + Cluster.api_request(ACTION_RSYSLOG, config="some_config_value") + mq = MessageQueue.objects.filter(node=self.node1).first() + self.assertEqual(mq.action, ACTION_RSYSLOG) + self.assertEqual(mq.config, "some_config_value") + + def test_new_mq_has_status_new(self): + Cluster.api_request(ACTION_RSYSLOG) + for mq in MessageQueue.objects.all(): + self.assertEqual(mq.status, MessageQueue.MessageQueueStatus.NEW) + + # Dispatch cible (node=...) + def test_targeted_to_single_node_creates_one_mq(self): + Cluster.api_request(ACTION_RSYSLOG, node=self.node1) + self.assertEqual(MessageQueue.objects.count(), 1) + self.assertEqual(MessageQueue.objects.first().node, self.node1) + + def test_targeted_does_not_create_mq_for_other_node(self): + Cluster.api_request(ACTION_RSYSLOG, node=self.node1) + self.assertEqual(MessageQueue.objects.filter(node=self.node2).count(), 0) + + # Noeud pending (management_ip='') + def test_pending_node_excluded_from_broadcast(self): + """ + Un noeud avec management_ip='' est en cours de bootstrap. + Il ne doit JAMAIS recevoir d'api_request. + """ + pending = _make_pending_node(f"pending_{self.TEST_CASE_NAME}") + Cluster.api_request(ACTION_RSYSLOG) + mq_nodes = set(MessageQueue.objects.values_list("node__name", flat=True)) + self.assertNotIn(pending.name, mq_nodes) + # Les deux noeuds normaux sont bien la + self.assertIn(self.node1.name, mq_nodes) + self.assertIn(self.node2.name, mq_nodes) + + def test_three_pending_nodes_broadcast_only_hits_normal_nodes(self): + for i in range(3): + _make_pending_node(f"pending{i}_{self.TEST_CASE_NAME}") + Cluster.api_request(ACTION_HAPROXY) + # Seulement les 2 noeuds normaux doivent avoir un MQ + self.assertEqual(MessageQueue.objects.count(), 2) + + # Deduplication (update_or_create) + def test_same_action_same_node_deduplicates_mq(self): + """ + Deux appels successifs avec la meme action sur le meme noeud en statut NEW + doivent fusionner en un seul MessageQueue (update_or_create). + """ + Cluster.api_request(ACTION_RSYSLOG, node=self.node1) + Cluster.api_request(ACTION_RSYSLOG, node=self.node1) + self.assertEqual(MessageQueue.objects.filter( + node=self.node1, action=ACTION_RSYSLOG + ).count(), 1) + + def test_completed_mq_creates_new_one(self): + """ + Un MQ en statut DONE doit creer un nouveau MQ (pas de deduplication) + """ + Cluster.api_request(ACTION_RSYSLOG, node=self.node1) + mq = MessageQueue.objects.get(node=self.node1) + mq.status = MessageQueue.MessageQueueStatus.DONE + mq.save() + + Cluster.api_request(ACTION_RSYSLOG, node=self.node1) + self.assertEqual(MessageQueue.objects.filter( + node=self.node1, action=ACTION_RSYSLOG + ).count(), 2) + + def test_failed_mq_creates_new_one(self): + """Un MQ FAILURE doit generer un nouveau MQ pour retry""" + Cluster.api_request(ACTION_HAPROXY, node=self.node1) + mq = MessageQueue.objects.get(node=self.node1) + mq.status = MessageQueue.MessageQueueStatus.FAILURE + mq.save() + + Cluster.api_request(ACTION_HAPROXY, node=self.node1) + self.assertEqual(MessageQueue.objects.filter( + node=self.node1, action=ACTION_HAPROXY + ).count(), 2) + + def test_running_mq_deduplicates(self): + """Un MQ RUNNING en cours d'execution ne doit PAS etre duplique""" + Cluster.api_request(ACTION_RSYSLOG, node=self.node1) + mq = MessageQueue.objects.get(node=self.node1) + mq.status = MessageQueue.MessageQueueStatus.RUNNING + mq.save() + + # En statut RUNNING, le MQ n'est plus NEW -> update_or_create cree un nouveau + # (comportement reel du code : only NEW est deduplique) + count_before = MessageQueue.objects.filter(node=self.node1).count() + Cluster.api_request(ACTION_RSYSLOG, node=self.node1) + count_after = MessageQueue.objects.filter(node=self.node1).count() + # Le comportement attendu est qu'un nouveau MQ est cree + self.assertGreaterEqual(count_after, count_before) + + + +class TestNodeApiRequest(TestCase): + """ + Node.api_request() delegue a Cluster.api_request() avec node=self. + Verifie notamment le scheduling via run_delay et get_pending_messages(). + """ + + TEST_CASE_NAME = f"{__name__}_node" + + def setUp(self): + patch("system.cluster.models.logger").start() + self.addCleanup(patch.stopall) + self.node1 = _make_node(f"n1_{self.TEST_CASE_NAME}", "10.0.0.1") + self.node2 = _make_node(f"n2_{self.TEST_CASE_NAME}", "10.0.0.2") + + def tearDown(self): + MessageQueue.objects.all().delete() + + # Delegation de base + def test_node_api_request_creates_mq_for_correct_node(self): + self.node1.api_request(ACTION_RSYSLOG) + mq = MessageQueue.objects.get() + self.assertEqual(mq.node, self.node1) + + def test_node_api_request_does_not_create_mq_for_other_node(self): + self.node1.api_request(ACTION_HAPROXY) + self.assertEqual(MessageQueue.objects.filter(node=self.node2).count(), 0) + + def test_node_api_request_with_config(self): + self.node1.api_request(ACTION_BUILD, config="42") + mq = MessageQueue.objects.get() + self.assertEqual(mq.config, "42") + + def test_node_api_request_returns_status_true(self): + result = self.node1.api_request(ACTION_RSYSLOG) + self.assertTrue(result["status"]) + + # run_delay : scheduling dans le futur + def test_run_delay_schedules_mq_in_future(self): + self.node1.api_request(ACTION_RSYSLOG, run_delay=30) + mq = MessageQueue.objects.get() + self.assertGreater(mq.run_at, timezone.now()) + self.assertAlmostEqual( + mq.run_at, + timezone.now() + timedelta(seconds=30), + delta=timedelta(seconds=2) + ) + + def test_run_delay_zero_schedules_immediately(self): + self.node1.api_request(ACTION_RSYSLOG, run_delay=0) + mq = MessageQueue.objects.get() + self.assertAlmostEqual(mq.run_at, timezone.now(), delta=timedelta(seconds=2)) + + def test_delayed_mq_not_in_get_pending_messages(self): + """ + Un MQ planifie dans le futur ne doit pas apparaitre dans + get_pending_messages() (qui ne retourne que les MQ prets) + """ + self.node1.api_request(ACTION_RSYSLOG, run_delay=300) + pending = list(self.node1.get_pending_messages()) + self.assertEqual(len(pending), 0) + + def test_immediate_mq_in_get_pending_messages(self): + self.node1.api_request(ACTION_RSYSLOG, run_delay=0) + pending = list(self.node1.get_pending_messages()) + self.assertEqual(len(pending), 1) + + def test_delayed_then_updated_updates_run_at(self): + """ + Rescheduler un MQ existant (meme action, meme noeud) met a jour run_at. + """ + creation_time = timezone.now() + self.node1.api_request(ACTION_RSYSLOG, run_delay=60) + sleep(1) # Pour que modified soit different + self.node1.api_request(ACTION_RSYSLOG, run_delay=60) + + mq = MessageQueue.objects.get(node=self.node1, action=ACTION_RSYSLOG) + # date_add ne doit pas changer (c'est la creation initiale) + self.assertAlmostEqual(mq.date_add, creation_time, delta=timedelta(seconds=2)) + # run_at doit etre recalcule par rapport au dernier appel + self.assertAlmostEqual( + mq.run_at, + timezone.now() + timedelta(seconds=60), + delta=timedelta(seconds=2) + ) + + # get_pending_messages() avec count + def test_get_pending_messages_returns_all_without_count(self): + self.node1.api_request(ACTION_RSYSLOG) + self.node1.api_request(ACTION_HAPROXY) + self.node1.api_request(ACTION_PF) + pending = list(self.node1.get_pending_messages()) + self.assertEqual(len(pending), 3) + + def test_get_pending_messages_with_count_limits_results(self): + self.node1.api_request(ACTION_RSYSLOG) + self.node1.api_request(ACTION_HAPROXY) + self.node1.api_request(ACTION_PF) + pending = list(self.node1.get_pending_messages(count=2)) + self.assertEqual(len(pending), 2) + + def test_get_pending_messages_empty_when_no_mq(self): + self.assertEqual(list(self.node1.get_pending_messages()), []) + + def test_get_pending_messages_only_for_this_node(self): + self.node1.api_request(ACTION_RSYSLOG) + self.node2.api_request(ACTION_HAPROXY) + pending_n1 = list(self.node1.get_pending_messages()) + self.assertEqual(len(pending_n1), 1) + self.assertEqual(pending_n1[0].node, self.node1) + + def test_get_pending_messages_ordered_by_run_at(self): + """get_pending_messages() doit retourner les MQ dans l'ordre d'execution""" + r1 = self.node1.api_request(ACTION_RSYSLOG) + mq1 = r1["instances"][0] + r2 = self.node1.api_request(ACTION_HAPROXY) + mq2 = r2["instances"][0] + pending = list(self.node1.get_pending_messages()) + # Le premier cree doit etre le premier dans la liste + self.assertEqual(pending[0], mq1) + self.assertEqual(pending[1], mq2) + + + +class TestClusterGetCurrentNode(TestCase): + + TEST_CASE_NAME = f"{__name__}_current_node" + + def setUp(self): + patch("system.cluster.models.logger").start() + self.addCleanup(patch.stopall) + + def test_get_current_node_returns_node_matching_hostname(self): + node = _make_node(f"my-hostname_{self.TEST_CASE_NAME}") + with override_settings(HOSTNAME=node.name): + result = Cluster.get_current_node() + self.assertEqual(result, node) + + def test_get_current_node_returns_false_when_not_found(self): + with override_settings(HOSTNAME="nonexistent-hostname-xyz"): + result = Cluster.get_current_node() + self.assertFalse(result) + + def test_is_node_bootstrapped_true_when_node_exists(self): + node = _make_node(f"bootstrapped_{self.TEST_CASE_NAME}") + with override_settings(HOSTNAME=node.name): + result = Cluster.is_node_bootstrapped() + self.assertTrue(result) + + def test_is_node_bootstrapped_false_when_node_missing(self): + with override_settings(HOSTNAME="ghost-node-xyz"): + result = Cluster.is_node_bootstrapped() + self.assertFalse(result) + + +class TestMessageQueueModel(TestCase): + """ + MessageQueue : comportements du modele + """ + + TEST_CASE_NAME = f"{__name__}_mq" + + def setUp(self): + patch("system.cluster.models.logger").start() + self.addCleanup(patch.stopall) + self.node = _make_node(f"mq-node_{self.TEST_CASE_NAME}") + + def tearDown(self): + MessageQueue.objects.all().delete() + + def test_mq_default_status_is_new(self): + result = self.node.api_request(ACTION_RSYSLOG) + mq = result["instances"][0] + self.assertEqual(mq.status, MessageQueue.MessageQueueStatus.NEW) + + def test_mq_save_updates_modified_timestamp(self): + self.node.api_request(ACTION_RSYSLOG) + mq = MessageQueue.objects.get() + original_modified = mq.modified + sleep(1) + mq.result = "done" + mq.save() + mq.refresh_from_db() + self.assertGreater(mq.modified, original_modified) + + def test_mq_to_template_contains_required_fields(self): + self.node.api_request(ACTION_RSYSLOG) + mq = MessageQueue.objects.get() + template = mq.to_template() + for field in ["date_add", "node", "status", "action", "config", "result", "modified", "run_at"]: + self.assertIn(field, template, f"Champ manquant dans to_template(): {field}") + + def test_mq_to_template_action_is_shortened(self): + """to_template() raccourcit l'action a ses 2 derniers segments""" + self.node.api_request("services.rsyslogd.rsyslog.restart_service") + mq = MessageQueue.objects.get() + template = mq.to_template() + self.assertEqual(template["action"], "rsyslog : restart_service") + + def test_mq_config_stored_as_string(self): + self.node.api_request(ACTION_BUILD, config="frontend_pk=42") + mq = MessageQueue.objects.get() + self.assertIsInstance(mq.config, str) + self.assertEqual(mq.config, "frontend_pk=42") + + def test_mq_internal_flag_defaults_to_false(self): + self.node.api_request(ACTION_RSYSLOG) + mq = MessageQueue.objects.get() + self.assertFalse(mq.internal) + + def test_mq_internal_flag_can_be_set_true(self): + self.node.api_request(ACTION_RSYSLOG, internal=True) + mq = MessageQueue.objects.get() + self.assertTrue(mq.internal) + + def test_status_choices_are_valid(self): + valid_statuses = { + MessageQueue.MessageQueueStatus.NEW, + MessageQueue.MessageQueueStatus.RUNNING, + MessageQueue.MessageQueueStatus.DONE, + MessageQueue.MessageQueueStatus.FAILURE, + } + self.assertEqual(valid_statuses, {"new", "running", "done", "failure"}) + + + + +class TestFrontendReloadConf(TestCase): + """ + Frontend.reload_conf() orchestre les appels api_request() vers les noeuds + concernes; on verifie quelles actions sont dispatche et dans quel ordre + """ + + TEST_CASE_NAME = f"{__name__}_reload" + + def setUp(self): + patch("system.cluster.models.logger").start() + patch("services.frontend.models.logger").start() + self.addCleanup(patch.stopall) + + from system.cluster.models import NetworkAddress, NetworkInterfaceCard, NetworkAddressNIC + from system.tenants.models import Tenants + from services.frontend.models import Frontend, Listener + + self.node = _make_node(f"reload-node_{self.TEST_CASE_NAME}") + self.nic = NetworkInterfaceCard.objects.create(dev="vtnet0", node=self.node) + self.netaddr = NetworkAddress.objects.create( + name=f"addr_{self.TEST_CASE_NAME}", + type="system", + ip="192.168.1.1", + prefix_or_netmask="24", + ) + NetworkAddressNIC.objects.create(nic=self.nic, network_address=self.netaddr) + self.tenants = Tenants.objects.create(name=f"t_reload_{self.TEST_CASE_NAME}") + + self.frontend_http = Frontend.objects.create( + name=f"fe_http_{self.TEST_CASE_NAME}", + mode="http", + enabled=True, + tenants_config=self.tenants, + ) + self.listener = Listener.objects.create( + network_address=self.netaddr, + port=443, + frontend=self.frontend_http, + ) + + def tearDown(self): + MessageQueue.objects.all().delete() + + def _get_actions(self, node=None) -> list: + """Retourne la liste des actions MQ creees pour un noeud donne""" + qs = MessageQueue.objects.all() + if node: + qs = qs.filter(node=node) + return list(qs.order_by("date_add").values_list("action", flat=True)) + + # Mode HTTP enabled + def test_http_enabled_dispatches_build_conf(self): + self.frontend_http.reload_conf() + actions = self._get_actions(self.node) + self.assertIn("services.haproxy.haproxy.build_conf", actions) + + def test_http_enabled_dispatches_haproxy_reload(self): + self.frontend_http.reload_conf() + actions = self._get_actions(self.node) + self.assertIn("services.haproxy.haproxy.reload_service", actions) + + def test_http_enabled_dispatches_pf_gen_config(self): + self.frontend_http.reload_conf() + actions = self._get_actions(self.node) + self.assertIn("services.pf.pf.gen_config", actions) + + def test_http_enabled_dispatches_pf_reload(self): + self.frontend_http.reload_conf() + actions = self._get_actions(self.node) + self.assertIn("services.pf.pf.reload_service", actions) + + def test_http_enabled_no_rsyslog_build_without_logging(self): + """Sans enable_logging, rsyslog ne doit pas etre rebuild pour un frontend HTTP""" + self.frontend_http.enable_logging = False + self.frontend_http.save() + self.frontend_http.reload_conf() + actions = self._get_actions(self.node) + self.assertNotIn("services.rsyslogd.rsyslog.build_conf", actions) + + def test_http_with_logging_dispatches_rsyslog_build(self): + self.frontend_http.enable_logging = True + self.frontend_http.save() + self.frontend_http.reload_conf() + actions = self._get_actions(self.node) + self.assertIn("services.rsyslogd.rsyslog.build_conf", actions) + + # Mode HTTP disabled + def test_http_disabled_dispatches_delete_conf_not_build(self): + from services.frontend.models import Frontend + fe_disabled = Frontend.objects.create( + name=f"fe_disabled_{self.TEST_CASE_NAME}", + mode="http", + enabled=False, + tenants_config=self.tenants, + ) + from services.frontend.models import Listener + from system.cluster.models import NetworkAddress, NetworkAddressNIC + netaddr2 = NetworkAddress.objects.create( + name=f"addr2_{self.TEST_CASE_NAME}", + type="system", + ip="192.168.1.2", + prefix_or_netmask="24", + ) + NetworkAddressNIC.objects.create(nic=self.nic, network_address=netaddr2) + Listener.objects.create( + network_address=netaddr2, + port=80, + frontend=fe_disabled, + ) + + fe_disabled.reload_conf() + actions = self._get_actions(self.node) + + self.assertIn("services.haproxy.haproxy.delete_conf", actions) + self.assertNotIn("services.haproxy.haproxy.build_conf", actions) + + # Statut frontend apres reload_conf() + def test_reload_conf_sets_status_waiting_on_node(self): + self.frontend_http.reload_conf() + self.frontend_http.refresh_from_db() + node_name = self.node.name + self.assertEqual( + self.frontend_http.status.get(node_name), + "WAITING" + ) + + # Mode LOG -> rsyslog uniquement + def test_log_mode_file_dispatches_rsyslog_build(self): + from services.frontend.models import Frontend + fe_log = Frontend.objects.create( + name=f"fe_log_{self.TEST_CASE_NAME}", + mode="log", + listening_mode="file", + enabled=True, + node=self.node, + tenants_config=self.tenants, + ) + MessageQueue.objects.all().delete() + fe_log.reload_conf() + actions = self._get_actions(self.node) + self.assertIn("services.rsyslogd.rsyslog.build_conf", actions) + # Pas de HAProxy pour un frontend LOG fichier + self.assertNotIn("services.haproxy.haproxy.build_conf", actions) + + def test_log_mode_file_disabled_dispatches_rsyslog_delete(self): + from services.frontend.models import Frontend + fe_log = Frontend.objects.create( + name=f"fe_log_dis_{self.TEST_CASE_NAME}", + mode="log", + listening_mode="file", + enabled=False, + node=self.node, + tenants_config=self.tenants, + ) + MessageQueue.objects.all().delete() + fe_log.reload_conf() + actions = self._get_actions(self.node) + self.assertIn("services.rsyslogd.rsyslog.delete_conf", actions) + + def test_log_mode_api_disabled_does_not_delete_rsyslog(self): + """ + Bug historique : un frontend LOG/API desactive ne doit PAS supprimer + sa conf rsyslog (il continue a faire tourner le collecteur). + """ + from services.frontend.models import Frontend + fe_api = Frontend.objects.create( + name=f"fe_api_{self.TEST_CASE_NAME}", + mode="log", + listening_mode="api", + enabled=False, + node=self.node, + tenants_config=self.tenants, + ) + MessageQueue.objects.all().delete() + fe_api.reload_conf() + actions = self._get_actions(self.node) + self.assertNotIn("services.rsyslogd.rsyslog.delete_conf", actions) + + +class TestClusterAwaitApiRequest(TestCase): + """ + Await API request (integration avec MessageQueue.await_result) + """ + + TEST_CASE_NAME = f"{__name__}_await" + + def setUp(self): + patch("system.cluster.models.logger").start() + self.addCleanup(patch.stopall) + self.node1 = _make_node(f"await_n1_{self.TEST_CASE_NAME}") + self.node2 = _make_node(f"await_n2_{self.TEST_CASE_NAME}") + + def tearDown(self): + MessageQueue.objects.all().delete() + + @patch("system.cluster.models.MessageQueue.await_result") + def test_await_calls_await_result_for_each_node(self, mock_await): + mock_await.return_value = (True, "Success") + Cluster.await_api_request(ACTION_RSYSLOG, interval=1, tries=2) + self.assertEqual(mock_await.call_count, 2) + + @patch("system.cluster.models.MessageQueue.await_result") + def test_await_passes_interval_and_tries(self, mock_await): + mock_await.return_value = (True, "ok") + Cluster.await_api_request(ACTION_RSYSLOG, interval=3, tries=5) + mock_await.assert_called_with(3, 5) + + @patch("system.cluster.models.MessageQueue.await_result") + def test_await_returns_false_on_any_node_failure(self, mock_await): + mock_await.side_effect = [(True, "ok"), (False, "error")] + status, results = Cluster.await_api_request(ACTION_RSYSLOG) + self.assertFalse(status) + + @patch("system.cluster.models.MessageQueue.await_result") + def test_await_returns_true_when_all_succeed(self, mock_await): + mock_await.return_value = (True, "Success") + status, results = Cluster.await_api_request(ACTION_RSYSLOG) + self.assertTrue(status) + self.assertEqual(len(results), 2) + + @patch("system.cluster.models.MessageQueue.await_result") + def test_await_handles_timeout_exception(self, mock_await): + from system.cluster.models import APISyncResultTimeOutException + mock_await.side_effect = APISyncResultTimeOutException + status, _ = Cluster.await_api_request(ACTION_RSYSLOG, interval=1, tries=1) + self.assertFalse(status) + + @patch("system.cluster.models.MessageQueue.await_result") + def test_await_targeted_node_calls_await_once(self, mock_await): + mock_await.return_value = (True, "ok") + Cluster.await_api_request(ACTION_RSYSLOG, node=self.node1) + self.assertEqual(mock_await.call_count, 1) + + + +class TestClusterMultiNodeScenarios(TestCase): + """ + Scenarios realistes simulant ce qui se passe lors d'operations + de configuration sur un cluster a plusieurs noeuds. + """ + + TEST_CASE_NAME = f"{__name__}_integration" + + def setUp(self): + patch("system.cluster.models.logger").start() + self.addCleanup(patch.stopall) + self.nodes = [ + _make_node(f"cluster_node_{i}_{self.TEST_CASE_NAME}", f"10.0.0.{i}") + for i in range(1, 4) + ] + + def tearDown(self): + MessageQueue.objects.all().delete() + + def test_full_service_restart_sequence(self): + """ + Sequence realiste : build_conf -> reload_haproxy -> reload_pf + Ces 3 actions doivent etre dans le MQ pour chaque noeud. + """ + Cluster.api_request(ACTION_BUILD) + Cluster.api_request(ACTION_HAPROXY) + Cluster.api_request(ACTION_PF) + + for node in self.nodes: + node_actions = set( + MessageQueue.objects.filter(node=node).values_list("action", flat=True) + ) + self.assertIn(ACTION_BUILD, node_actions) + self.assertIn(ACTION_HAPROXY, node_actions) + self.assertIn(ACTION_PF, node_actions) + + def test_adding_pending_node_does_not_receive_previous_actions(self): + """ + Un nouveau noeud ajoute apres des api_request existants ne recoit + pas retroactivement les actions deja dispatchees + """ + Cluster.api_request(ACTION_RSYSLOG) + initial_count = MessageQueue.objects.count() + + _ = _make_pending_node(f"late_node_{self.TEST_CASE_NAME}") + # Les MQ existants ne changent pas + self.assertEqual(MessageQueue.objects.count(), initial_count) + + def test_cluster_handles_mixed_node_states(self): + """ + Cluster avec noeuds normaux + pending : seuls les normaux recoivent les actions. + """ + pending1 = _make_pending_node(f"pend1_{self.TEST_CASE_NAME}") + pending2 = _make_pending_node(f"pend2_{self.TEST_CASE_NAME}") + + Cluster.api_request(ACTION_HAPROXY) + + # Exactement 3 noeuds normaux doivent avoir un MQ + self.assertEqual(MessageQueue.objects.count(), 3) + pending_node_ids = {pending1.pk, pending2.pk} + for mq in MessageQueue.objects.all(): + self.assertNotIn(mq.node_id, pending_node_ids) + + def test_sequential_actions_maintain_order(self): + """ + Les actions dispatchees dans l'ordre doivent etre recuperables dans l'ordre + """ + actions_ordered = [ACTION_BUILD, ACTION_HAPROXY, ACTION_PF] + for action in actions_ordered: + self.nodes[0].api_request(action) + + pending = list(self.nodes[0].get_pending_messages()) + retrieved_actions = [mq.action for mq in pending] + self.assertEqual(retrieved_actions, actions_ordered) + + def test_different_configs_for_same_action_create_separate_mqs(self): + """ + La meme action avec des configs differentes doit creer des MQ separes + (config fait partie de la cle d'unicite). + """ + self.nodes[0].api_request(ACTION_BUILD, config="frontend_id=1") + self.nodes[0].api_request(ACTION_BUILD, config="frontend_id=2") + count = MessageQueue.objects.filter( + node=self.nodes[0], action=ACTION_BUILD + ).count() + self.assertEqual(count, 2)