diff --git a/scripts/route_check.py b/scripts/route_check.py index c867421c2c4..9549dfe250d 100755 --- a/scripts/route_check.py +++ b/scripts/route_check.py @@ -47,7 +47,6 @@ import traceback import subprocess import concurrent.futures -import ijson from ipaddress import ip_network from swsscommon import swsscommon @@ -391,10 +390,30 @@ def is_suppress_fib_pending_enabled(namespace): def fetch_routes(ipv6=False, namespace=multi_asic.DEFAULT_NAMESPACE): """ - Fetch routes using the given command. - Uses ijson for streaming JSON parsing to handle large route tables efficiently. - Parses each prefix entry as soon as it becomes available instead of waiting for the entire JSON. - """ + Fetch non-offloaded FRR routes by streaming vtysh TEXT output line-by-line. + + FRR zebra_vty.c assembles each route line as three fixed-position characters: + [0] protocol char : 'B' for BGP (zebra_route_char) + [1] selected char : '>' if ZEBRA_FLAG_SELECTED, else ' ' + [2] status char : single char from re_status_output_char(): + 'q' ROUTE_ENTRY_QUEUED (suppress-fib-pending) + 'o' ZEBRA_FLAG_OFFLOAD_FAILED + 't' ZEBRA_FLAG_TRAPPED + '*' nexthop active / installed + ' ' none of the above + Examples: + B>* 10.0.0.0/24 selected, offloaded (healthy) + B>q 10.0.0.0/24 selected, queued (suppress-fib-pending) + B>o 10.0.0.0/24 selected, offload failed + The status char is always at position [2], never before '>'. + Only lines with 'q' or 'o' at position [2] are collected; on a healthy + device the output is scanned and discarded without any allocations. + + Returns lists of plain prefix strings. The protocol is always 'bgp' for + these routes; callers that need it (e.g. mitigate_installed_not_offloaded_frr_routes) + hardcode that value rather than carrying it in the list entries. + """ + import re missing_routes = [] failing_routes = [] @@ -402,59 +421,37 @@ def fetch_routes(ipv6=False, namespace=multi_asic.DEFAULT_NAMESPACE): if namespace is not multi_asic.DEFAULT_NAMESPACE: asic_id = ['-n', str(multi_asic.get_asic_id_from_name(namespace))] - if ipv6: - cmd = ["sudo", "vtysh"] + asic_id + ["-c", "show ipv6 route json"] - else: - cmd = ["sudo", "vtysh"] + asic_id + ["-c", "show ip route json"] - - def process_route_entry(prefix, route_entry): - """Process a single route entry and add to missing_routes if needed.""" - if route_entry.get('protocol') in ('connected', 'kernel', 'static'): - return - if route_entry.get('vrfName') != 'default': - return - # skip if this bgp source prefix is not selected as best - if not route_entry.get('selected', False): - return - if not route_entry.get('offloaded', False): - missing_routes.append(prefix) - if route_entry.get('failed', False): - failing_routes.append(prefix) + af = 'ipv6' if ipv6 else 'ip' + cmd = ["sudo", "vtysh"] + asic_id + ["-c", f"show {af} route"] - try: - with subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=0) as proc: - try: - # Use ijson to parse the JSON stream incrementally - # kvitems('') iterates over key-value pairs at the root level - # This gives us each prefix and its route entries as they become available - for prefix, route_entries in ijson.kvitems(proc.stdout, ''): - # Process each route entry for this prefix - if isinstance(route_entries, list): - for route_entry in route_entries: - process_route_entry(prefix, route_entry) - else: - # Handle case where route_entries is not a list (shouldn't happen with valid FRR output) - print_message(syslog.LOG_WARNING, f"Unexpected route entry format for prefix {prefix}") - - except ijson.JSONError as e: - # Handle JSON parsing errors - print_message(syslog.LOG_WARNING, f"Failed to parse JSON stream: {e}") - except UnicodeDecodeError as e: - # Handle UTF-8 decoding errors - print_message(syslog.LOG_WARNING, f"UTF-8 decoding error: {e}") - except Exception as e: - # Handle any other unexpected errors during parsing - print_message(syslog.LOG_WARNING, f"Error during JSON parsing: {e}") + # Match selected BGP routes with a non-offloaded status flag: + # B>q prefix -- queued (suppress-fib-pending) + # B>o prefix -- offload failed + # Format is always: proto_char + selected_char(>) + status_char + space + prefix + pat = re.compile(r'^B>([qo])\s+(\S+)\s') - # Wait for the process to terminate and get the return code + try: + with subprocess.Popen(cmd, stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + bufsize=1, text=True) as proc: + for line in proc.stdout: + m = pat.match(line) + if not m: + continue + flag = m.group(1) + prefix = m.group(2) + if flag == 'q': + missing_routes.append(prefix) + if flag == 'o': + failing_routes.append(prefix) return_code = proc.wait() if return_code != 0: - print_message(syslog.LOG_WARNING, f"Subprocess exited with non-zero return code: {return_code}") - + print_message(syslog.LOG_WARNING, + f"vtysh exited with non-zero return code: {return_code}") except FileNotFoundError: print_message(syslog.LOG_ERR, f"Error: Command '{cmd[0]}' not found.") except Exception as e: - print_message(syslog.LOG_ERR, f"An error occurred: {e}") + print_message(syslog.LOG_ERR, f"fetch_routes error: {e}") return missing_routes, failing_routes @@ -733,20 +730,46 @@ def is_feature_bgp_enabled(namespace): def check_frr_pending_routes(namespace): """ - Check FRR routes for offload flag presence by executing "show ip route json" - Returns a list of routes that have no offload flag. + Check FRR routes for offload flag presence by streaming vtysh text output. + Returns lists of route dicts {'prefix', 'protocol'} that are persistently + non-offloaded across all retry iterations (intersection logic). + + Intersection approach: only routes that appear as stuck in *every* poll + are returned for mitigation. A route that clears between iterations is + converging normally and should not be mitigated. + + Example with FRR_CHECK_RETRIES=3: + iter 0: {A, B, C, D} + iter 1: {A, B, C} (D converged) + iter 2: {A, B} (C converged) + result: {A, B} <- only truly stuck routes """ + acc_miss = [] + acc_fail = [] - missed_rt = [] - failed_rt = [] - retries = FRR_CHECK_RETRIES - for i in range(retries): - missed_rt, failed_rt = get_frr_routes_parallel(namespace) + for i in range(FRR_CHECK_RETRIES): + curr_miss, curr_fail = get_frr_routes_parallel(namespace) - if not missed_rt and not failed_rt: + if i == 0: + # Seed: first iteration defines the candidate set + acc_miss = curr_miss + acc_fail = curr_fail + else: + # Intersect: keep only prefixes still stuck in this iteration + curr_miss_set = set(curr_miss) + curr_fail_set = set(curr_fail) + acc_miss = [e for e in acc_miss if e in curr_miss_set] + acc_fail = [e for e in acc_fail if e in curr_fail_set] + + if not curr_miss and not curr_fail: + # Nothing queued right now; intersection already empty -- stop early break - time.sleep(FRR_WAIT_TIME) + if i < FRR_CHECK_RETRIES - 1: + time.sleep(FRR_WAIT_TIME) + + missed_rt = acc_miss + failed_rt = acc_fail print_message(syslog.LOG_DEBUG, f"FRR missed routes: {missed_rt}") print_message(syslog.LOG_DEBUG, f"FRR failed routes: {failed_rt}") return missed_rt, failed_rt @@ -766,11 +789,11 @@ def mitigate_installed_not_offloaded_frr_routes(namespace, missed_frr_rt, rt_app """ db = swsscommon.DBConnector('APPL_STATE_DB', REDIS_TIMEOUT_MSECS, True, namespace) response_producer = swsscommon.NotificationProducer(db, f'{APPL_DB_NAME}_{swsscommon.APP_ROUTE_TABLE_NAME}_RESPONSE_CHANNEL') - for entry in [entry for entry in missed_frr_rt if entry['prefix'] in rt_appl]: - fvs = swsscommon.FieldValuePairs([('err_str', 'SWSS_RC_SUCCESS'), ('protocol', entry['protocol'])]) - response_producer.send('SWSS_RC_SUCCESS', entry['prefix'], fvs) + for prefix in [prefix for prefix in missed_frr_rt if prefix in rt_appl]: + fvs = swsscommon.FieldValuePairs([('err_str', 'SWSS_RC_SUCCESS'), ('protocol', 'bgp')]) + response_producer.send('SWSS_RC_SUCCESS', prefix, fvs) - print_message(syslog.LOG_ERR, f'Mitigated route {entry["prefix"]}', write_to_stdout=False) + print_message(syslog.LOG_ERR, f'Mitigated route {prefix}', write_to_stdout=False) def get_soc_ips(config_db): @@ -888,6 +911,8 @@ def check_routes_for_namespace(namespace): rt_frr_miss = [] rt_frr_failed = [] + rt_frr_miss, rt_frr_failed = check_frr_pending_routes(namespace) + selector, subs, rt_asic = get_asicdb_routes(namespace) rt_appl = get_appdb_routes(namespace) @@ -945,8 +970,6 @@ def check_routes_for_namespace(namespace): if rt_asic_miss: results["Unaccounted_ROUTE_ENTRY_TABLE_entries"] = rt_asic_miss - rt_frr_miss, rt_frr_failed = check_frr_pending_routes(namespace) - if rt_frr_miss: results["missed_FRR_routes"] = rt_frr_miss diff --git a/tests/fetch_routes_chunk_test.py b/tests/fetch_routes_chunk_test.py deleted file mode 100755 index 24d3d63a658..00000000000 --- a/tests/fetch_routes_chunk_test.py +++ /dev/null @@ -1,872 +0,0 @@ -#!/usr/bin/env python3 -""" -Unit tests for the chunk-based reading logic in fetch_routes() function. -Tests various scenarios including: -- Complete JSON in single chunk -- JSON split across multiple chunks -- Incomplete JSON objects at chunk boundaries -- UTF-8 sequences split across chunks -- Empty responses -- Malformed JSON -""" - -import json -from unittest.mock import Mock, patch -from io import BytesIO -import sys - -sys.path.append("scripts") -import route_check # noqa: E402 - - -class ChunkedBytesIO: - def __init__(self, chunks): - self.chunks = chunks - self.index = 0 - - def read(self, size): - if not size: - return b'' - - if self.index >= len(self.chunks): - return b'' - chunk = self.chunks[self.index] - self.index += 1 - return chunk - - -class TestFetchRoutes: - """Test suite for chunk-based reading in fetch_routes()""" - - def setup_method(self): - """Setup for each test method""" - route_check.UNIT_TESTING = 1 - route_check.FRR_WAIT_TIME = 0 - - def create_mock_process(self, data_bytes): - """ - Create a mock subprocess.Popen object with stdout that returns data - in chunks. - - Args: - data_bytes: bytes to return from stdout - - Returns: - Mock Popen object - """ - mock_proc = Mock() - mock_proc.stdout = BytesIO(data_bytes) - mock_proc.wait = Mock(return_value=0) - mock_proc.__enter__ = Mock(return_value=mock_proc) - mock_proc.__exit__ = Mock(return_value=False) - return mock_proc - - def test_complete_json_single_chunk(self): - """Test parsing when complete JSON fits in a single chunk""" - json_data = { - "192.168.1.0/24": [{ - "prefix": "192.168.1.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False - }] - } - json_bytes = json.dumps(json_data).encode('utf-8') - - mock_proc = self.create_mock_process(json_bytes) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - assert result == (["192.168.1.0/24"], []) - - def test_json_split_across_chunks(self): - """Test parsing when JSON is split across multiple chunks""" - json_data = { - "10.0.0.0/8": [{ - "prefix": "10.0.0.0/8", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False - }], - "172.16.0.0/12": [{ - "prefix": "172.16.0.0/12", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False - }] - } - json_str = json.dumps(json_data) - - # Split the JSON string in the middle - split_point = len(json_str) // 2 - chunk1 = json_str[:split_point].encode('utf-8') - chunk2 = json_str[split_point:].encode('utf-8') - - # Create a custom BytesIO that returns data in specific chunk sizes - mock_proc = Mock() - mock_proc.stdout = ChunkedBytesIO([chunk1, chunk2]) - mock_proc.wait = Mock(return_value=0) - mock_proc.__enter__ = Mock(return_value=mock_proc) - mock_proc.__exit__ = Mock(return_value=False) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - assert set(result[0]) == {"10.0.0.0/8", "172.16.0.0/12"} - - def test_routes_with_offloaded_flag(self): - """Test that routes with offloaded=True are not included in missing - routes""" - json_data = { - "192.168.1.0/24": [{ - "prefix": "192.168.1.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": True # This should NOT be in missing routes - }], - "192.168.2.0/24": [{ - "prefix": "192.168.2.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False # This SHOULD be in missing routes - }] - } - json_bytes = json.dumps(json_data).encode('utf-8') - - mock_proc = self.create_mock_process(json_bytes) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - assert result == (["192.168.2.0/24"], []) - - def test_filter_connected_kernel_static_protocols(self): - """Test that connected, kernel and static protocols are filtered out""" - json_data = { - "192.168.1.0/24": [{ - "prefix": "192.168.1.0/24", - "protocol": "connected", - "vrfName": "default", - "selected": True, - "offloaded": False - }], - "192.168.2.0/24": [{ - "prefix": "192.168.2.0/24", - "protocol": "kernel", - "vrfName": "default", - "selected": True, - "offloaded": False - }], - "192.168.3.0/24": [{ - "prefix": "192.168.3.0/24", - "protocol": "static", - "vrfName": "default", - "selected": True, - "offloaded": False - }], - "192.168.4.0/24": [{ - "prefix": "192.168.4.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False - }] - } - json_bytes = json.dumps(json_data).encode('utf-8') - - mock_proc = self.create_mock_process(json_bytes) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - # Only BGP route should be in the result - assert result == (["192.168.4.0/24"], []) - - def test_filter_non_default_vrf(self): - """Test that routes in non-default VRF are filtered out""" - json_data = { - "192.168.1.0/24": [{ - "prefix": "192.168.1.0/24", - "protocol": "bgp", - "vrfName": "Vrf_RED", - "selected": True, - "offloaded": False - }], - "192.168.2.0/24": [{ - "prefix": "192.168.2.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False - }], - "192.168.3.0/24": [{ - "prefix": "192.168.3.0/24", - "protocol": "bgp", - "vrfName": "mgmt", - "selected": True, - "offloaded": False - }] - } - json_bytes = json.dumps(json_data).encode('utf-8') - - mock_proc = self.create_mock_process(json_bytes) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - # Only default VRF route should be in the result - assert result == (["192.168.2.0/24"], []) - - def test_filter_not_selected_routes(self): - """Test that routes not selected as best are filtered out""" - json_data = { - "192.168.1.0/24": [{ - "prefix": "192.168.1.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": False, - "offloaded": False - }], - "192.168.2.0/24": [{ - "prefix": "192.168.2.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False - }] - } - json_bytes = json.dumps(json_data).encode('utf-8') - - mock_proc = self.create_mock_process(json_bytes) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - # Only selected route should be in the result - assert result == (["192.168.2.0/24"], []) - - def test_empty_json_response(self): - """Test handling of empty JSON response""" - json_data = {} - json_bytes = json.dumps(json_data).encode('utf-8') - - mock_proc = self.create_mock_process(json_bytes) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - assert result == ([], []) - - def test_utf8_split_across_chunks(self): - """Test handling of UTF-8 multibyte characters split across chunks""" - # Create JSON with UTF-8 characters - json_data = { - "192.168.1.0/24": [{ - "prefix": "192.168.1.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False, - "description": "Test route with Γ©mojis πŸš€" - }] - } - json_str = json.dumps(json_data) - json_bytes = json_str.encode('utf-8') - - # Find a multibyte character and split there - # The emoji πŸš€ is 4 bytes in UTF-8 - emoji_pos = json_bytes.find('πŸš€'.encode('utf-8')) - if emoji_pos > 0: - # Split in the middle of the emoji - split_point = emoji_pos + 2 - chunk1 = json_bytes[:split_point] - chunk2 = json_bytes[split_point:] - - mock_proc = Mock() - mock_proc.stdout = ChunkedBytesIO([chunk1, chunk2]) - mock_proc.wait = Mock(return_value=0) - mock_proc.__enter__ = Mock(return_value=mock_proc) - mock_proc.__exit__ = Mock(return_value=False) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - assert result == (["192.168.1.0/24"], []) - - def test_very_small_chunks(self): - """Test parsing with very small chunk sizes (byte-by-byte)""" - json_data = { - "10.0.0.0/8": [{ - "prefix": "10.0.0.0/8", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False - }] - } - json_bytes = json.dumps(json_data).encode('utf-8') - - # Create chunks of 10 bytes each - chunks = [json_bytes[i:i+10] for i in range(0, len(json_bytes), 10)] - - mock_proc = Mock() - mock_proc.stdout = ChunkedBytesIO(chunks) - mock_proc.wait = Mock(return_value=0) - mock_proc.__enter__ = Mock(return_value=mock_proc) - mock_proc.__exit__ = Mock(return_value=False) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - assert result == (["10.0.0.0/8"], []) - - def test_large_json_with_many_routes(self): - """Test parsing large JSON with many route entries""" - json_data = {} - expected_missing = [] - - # Create 100 routes - for i in range(100): - prefix = f"10.{i}.0.0/16" - json_data[prefix] = [{ - "prefix": prefix, - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False - }] - expected_missing.append(prefix) - - json_bytes = json.dumps(json_data).encode('utf-8') - - mock_proc = self.create_mock_process(json_bytes) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - assert set(result[0]) == set(expected_missing) - assert len(result[0]) == 100 - - def test_ipv6_command(self): - """Test that IPv6 command is properly constructed""" - json_data = { - "2001:db8::/32": [{ - "prefix": "2001:db8::/32", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False - }] - } - json_bytes = json.dumps(json_data).encode('utf-8') - - mock_proc = self.create_mock_process(json_bytes) - - with patch('route_check.subprocess.Popen', return_value=mock_proc) \ - as mock_popen: - result = route_check.fetch_routes(ipv6=True) - - # Verify the correct command was called - mock_popen.assert_called_once() - call_args = mock_popen.call_args[0][0] - assert call_args == ["sudo", "vtysh", "-c", "show ipv6 route json"] - - assert result == (["2001:db8::/32"], []) - - def test_subprocess_non_zero_exit_code(self): - """Test handling of subprocess with non-zero exit code""" - json_data = { - "192.168.1.0/24": [{ - "prefix": "192.168.1.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False - }] - } - json_bytes = json.dumps(json_data).encode('utf-8') - - mock_proc = Mock() - mock_proc.stdout = BytesIO(json_bytes) - mock_proc.wait = Mock(return_value=1) # Non-zero exit code - mock_proc.__enter__ = Mock(return_value=mock_proc) - mock_proc.__exit__ = Mock(return_value=False) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - # Should still return the parsed routes - assert result == (["192.168.1.0/24"], []) - - def test_file_not_found_error(self): - """Test handling of FileNotFoundError when vtysh is not found""" - with patch('route_check.subprocess.Popen', - side_effect=FileNotFoundError("vtysh not found")): - result = route_check.fetch_routes() - - # Should return empty list on error - assert result == ([], []) - - def test_multiple_route_entries_per_prefix(self): - """Test handling of multiple route entries for the same prefix""" - json_data = { - "192.168.1.0/24": [ - { - "prefix": "192.168.1.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": True - }, - { - "prefix": "192.168.1.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": False, - "offloaded": False - } - ] - } - json_bytes = json.dumps(json_data).encode('utf-8') - - mock_proc = self.create_mock_process(json_bytes) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - # First entry is offloaded, second is not selected, so no missing - # routes - assert result == ([], []) - - def test_json_with_nested_objects(self): - """Test parsing JSON with deeply nested objects""" - json_data = { - "192.168.1.0/24": [{ - "prefix": "192.168.1.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False, - "nexthops": [ - { - "ip": "10.0.0.1", - "interfaceName": "Ethernet0", - "active": True - } - ] - }] - } - json_bytes = json.dumps(json_data).encode('utf-8') - - mock_proc = self.create_mock_process(json_bytes) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - assert result == (["192.168.1.0/24"], []) - - def test_variable_chunk_boundary(self): - """Test when chunk boundary falls at variious locations""" - json_data = { - "192.168.1.0/24": [{ - "prefix": "192.168.1.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False - }] - } - json_str = json.dumps(json_data) - json_bytes = json_str.encode('utf-8') - - json_len = len(json_str) - for chunk_boundary in range(1, json_len): - # Find the position of the first closing brace - chunk1 = json_bytes[:chunk_boundary+1] - chunk2 = json_bytes[chunk_boundary+1:] - - mock_proc = Mock() - mock_proc.stdout = ChunkedBytesIO([chunk1, chunk2]) - mock_proc.wait = Mock(return_value=0) - mock_proc.__enter__ = Mock(return_value=mock_proc) - mock_proc.__exit__ = Mock(return_value=False) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - assert result == (["192.168.1.0/24"], []) - - def test_chunk_boundary_in_string_value(self): - """Test when chunk boundary falls in the middle of a string value""" - json_data = { - "192.168.1.0/24": [{ - "prefix": "192.168.1.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False, - "description": "This is a very long description that will be split across chunks" - }] - } - json_str = json.dumps(json_data) - json_bytes = json_str.encode('utf-8') - - # Find the description and split in the middle of it - desc_start = json_str.find("This is a very long") - split_point = desc_start + 10 - chunk1 = json_bytes[:split_point] - chunk2 = json_bytes[split_point:] - - mock_proc = Mock() - mock_proc.stdout = ChunkedBytesIO([chunk1, chunk2]) - mock_proc.wait = Mock(return_value=0) - mock_proc.__enter__ = Mock(return_value=mock_proc) - mock_proc.__exit__ = Mock(return_value=False) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - assert result == (["192.168.1.0/24"], []) - - def test_escaped_quotes_in_json(self): - """Test handling of escaped quotes in JSON strings""" - json_data = { - "192.168.1.0/24": [{ - "prefix": "192.168.1.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False, - "description": "Route with \"escaped\" quotes" - }] - } - json_bytes = json.dumps(json_data).encode('utf-8') - - mock_proc = self.create_mock_process(json_bytes) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - assert result == (["192.168.1.0/24"], []) - - def test_whitespace_handling(self): - """Test handling of JSON with various whitespace""" - json_data = { - "192.168.1.0/24": [{ - "prefix": "192.168.1.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False - }] - } - # Add extra whitespace - json_str = json.dumps(json_data, indent=4) - json_str = json_str.replace(',', ' \n,') - json_bytes = json_str.encode('utf-8') - - mock_proc = self.create_mock_process(json_bytes) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - assert result == (["192.168.1.0/24"], []) - - def test_mixed_ipv4_and_ipv6_routes(self): - """Test parsing JSON with both IPv4 and IPv6 routes""" - json_data = { - "192.168.1.0/24": [{ - "prefix": "192.168.1.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False - }], - "2001:db8::/32": [{ - "prefix": "2001:db8::/32", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False - }] - } - json_bytes = json.dumps(json_data).encode('utf-8') - - mock_proc = self.create_mock_process(json_bytes) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - assert set(result[0]) == {"192.168.1.0/24", "2001:db8::/32"} - - def test_malformed_json_in_buffer(self): - """Test handling of malformed JSON that cannot be parsed""" - malformed_json = b'{"192.168.1.0/24": [{"prefix": "192.168.1.0/24", "protocol": "bgp"' - - mock_proc = self.create_mock_process(malformed_json) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - # Should return empty list for malformed JSON - assert result == ([], []) - - def test_incremental_parsing_multiple_prefixes(self): - """Test that multiple prefix entries are parsed one at a time""" - json_data = { - "192.168.1.0/24": [{ - "prefix": "192.168.1.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False - }], - "192.168.2.0/24": [{ - "prefix": "192.168.2.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False - }], - "192.168.3.0/24": [{ - "prefix": "192.168.3.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False - }] - } - json_bytes = json.dumps(json_data).encode('utf-8') - - mock_proc = self.create_mock_process(json_bytes) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - assert set(result[0]) == {"192.168.1.0/24", - "192.168.2.0/24", - "192.168.3.0/24"} - - def test_incremental_parsing_prefix_split_across_chunks(self): - """Test that a prefix entry split across chunks is parsed correctly""" - json_data = { - "192.168.1.0/24": [{ - "prefix": "192.168.1.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False, - "description": "This is a long description to make the entry larger" - }], - "192.168.2.0/24": [{ - "prefix": "192.168.2.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False, - "description": "Another long description for the second prefix" - }] - } - json_str = json.dumps(json_data) - json_bytes = json_str.encode('utf-8') - - # Split in the middle of the first prefix entry - # Find the position after the first prefix key - first_prefix_end = json_str.find(']', json_str.find('"192.168.1.0/24"')) - split_point = first_prefix_end - 20 # Split before the end of first entry - - chunk1 = json_bytes[:split_point] - chunk2 = json_bytes[split_point:] - - mock_proc = Mock() - mock_proc.stdout = ChunkedBytesIO([chunk1, chunk2]) - mock_proc.wait = Mock(return_value=0) - mock_proc.__enter__ = Mock(return_value=mock_proc) - mock_proc.__exit__ = Mock(return_value=False) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - assert set(result[0]) == {"192.168.1.0/24", "192.168.2.0/24"} - - def test_incremental_parsing_across_multiple_chunks(self): - # Create a large JSON with many prefixes - json_data = {} - expected_missing = [] - - for i in range(50): - prefix = f"10.{i}.0.0/16" - json_data[prefix] = [{ - "prefix": prefix, - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False, - "description": f"Route {i} with some description text to make it larger" - }] - expected_missing.append(prefix) - - json_bytes = json.dumps(json_data).encode('utf-8') - - # Split into multiple small chunks to test incremental parsing - chunk_size = 500 - chunks = [json_bytes[i:i+chunk_size] for i in range(0, len(json_bytes), chunk_size)] - - mock_proc = Mock() - mock_proc.stdout = ChunkedBytesIO(chunks) - mock_proc.wait = Mock(return_value=0) - mock_proc.__enter__ = Mock(return_value=mock_proc) - mock_proc.__exit__ = Mock(return_value=False) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - assert set(result[0]) == set(expected_missing) - assert len(result[0]) == 50 - - def test_incremental_parsing_prefix_boundary_at_comma(self): - """Test parsing when chunk boundary falls at comma between prefix entries""" - json_data = { - "192.168.1.0/24": [{ - "prefix": "192.168.1.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False - }], - "192.168.2.0/24": [{ - "prefix": "192.168.2.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False - }] - } - json_str = json.dumps(json_data) - json_bytes = json_str.encode('utf-8') - - # Find the comma between the two entries - comma_pos = json_str.find('],') + 2 # Position after "]," - - chunk1 = json_bytes[:comma_pos] - chunk2 = json_bytes[comma_pos:] - - mock_proc = Mock() - mock_proc.stdout = ChunkedBytesIO([chunk1, chunk2]) - mock_proc.wait = Mock(return_value=0) - mock_proc.__enter__ = Mock(return_value=mock_proc) - mock_proc.__exit__ = Mock(return_value=False) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - assert set(result[0]) == {"192.168.1.0/24", "192.168.2.0/24"} - - def test_incremental_parsing_mixed_offloaded_states(self): - """Test incremental parsing with mixed offloaded states across multiple prefixes""" - json_data = { - "192.168.1.0/24": [{ - "prefix": "192.168.1.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": True # This one is offloaded - }], - "192.168.2.0/24": [{ - "prefix": "192.168.2.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False # This one is missing - }], - "192.168.3.0/24": [{ - "prefix": "192.168.3.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": True # This one is offloaded - }], - "192.168.4.0/24": [{ - "prefix": "192.168.4.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False # This one is missing - }] - } - json_bytes = json.dumps(json_data).encode('utf-8') - - mock_proc = self.create_mock_process(json_bytes) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - # Only the non-offloaded routes should be in the result - assert set(result[0]) == {"192.168.2.0/24", "192.168.4.0/24"} - - def test_incremental_parsing_with_whitespace_variations(self): - """Test incremental parsing with various whitespace formatting""" - # Create JSON with extra whitespace and newlines - json_str = """{ - "192.168.1.0/24": [ - { - "prefix": "192.168.1.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": true, - "offloaded": false - } - ], - "192.168.2.0/24": [ - { - "prefix": "192.168.2.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": true, - "offloaded": false - } - ] - }""" - json_bytes = json_str.encode('utf-8') - - mock_proc = self.create_mock_process(json_bytes) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - assert set(result[0]) == {"192.168.1.0/24", "192.168.2.0/24"} - - def test_incremental_parsing_prefix_with_special_characters(self): - """Test incremental parsing of prefixes with special characters in descriptions""" - json_data = { - "192.168.1.0/24": [{ - "prefix": "192.168.1.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False, - "description": "Route with special chars: {}, [], \", \\" - }], - "192.168.2.0/24": [{ - "prefix": "192.168.2.0/24", - "protocol": "bgp", - "vrfName": "default", - "selected": True, - "offloaded": False, - "description": "Another route with: commas, colons: and braces {}" - }] - } - json_bytes = json.dumps(json_data).encode('utf-8') - - mock_proc = self.create_mock_process(json_bytes) - - with patch('route_check.subprocess.Popen', return_value=mock_proc): - result = route_check.fetch_routes() - - assert set(result[0]) == {"192.168.1.0/24", "192.168.2.0/24"} diff --git a/tests/fetch_routes_test.py b/tests/fetch_routes_test.py new file mode 100644 index 00000000000..3b34a839d5b --- /dev/null +++ b/tests/fetch_routes_test.py @@ -0,0 +1,255 @@ +#!/usr/bin/env python3 +""" +Tests for the text-streaming fetch_routes() and the intersection-based retry +logic in check_frr_pending_routes(). + +fetch_routes() runs: + vtysh -c "show ip route" (or "show ipv6 route") +and scans each output line with: + r'^B>([qo])\\s+(\\S+)\\s' + 'q' β†’ suppress-fib-pending (queued) β†’ missing_routes + 'o' β†’ offload failed β†’ failing_routes + anything else is ignored. + +check_frr_pending_routes() accumulates an intersection across +FRR_CHECK_RETRIES polls so only persistently stuck routes are mitigated. +""" + +import sys +from unittest.mock import Mock, patch + +sys.path.append("scripts") +import route_check # noqa: E402 + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_proc(lines, exit_code=0): + """Return a mock Popen whose stdout yields *lines* as an iterator.""" + mock_proc = Mock() + mock_proc.stdout = iter(lines) + mock_proc.wait = Mock(return_value=exit_code) + mock_proc.__enter__ = Mock(return_value=mock_proc) + mock_proc.__exit__ = Mock(return_value=False) + return mock_proc + + +def _bgp_line(flag, prefix): + """Build a realistic vtysh BGP route line with the given flag char.""" + return f"B>{flag} {prefix} [200/0] via 192.0.2.1, Ethernet0, weight 1, 00:01:00\n" + + +# --------------------------------------------------------------------------- +# TestFetchRoutes β€” text-based vtysh output parsing +# --------------------------------------------------------------------------- + +class TestFetchRoutes: + + def setup_method(self): + route_check.UNIT_TESTING = 1 + route_check.FRR_WAIT_TIME = 0 + + # --- basic flag detection --- + + def test_healthy_route_ignored(self): + """B>* (active/installed) must not appear in either return list.""" + lines = [_bgp_line("*", "10.0.0.0/24")] + with patch("route_check.subprocess.Popen", return_value=_make_proc(lines)): + result = route_check.fetch_routes() + assert result == ([], []) + + def test_queued_route_in_missing(self): + """B>q (suppress-fib-pending / queued) β†’ prefix lands in missing_routes.""" + lines = [_bgp_line("q", "10.0.0.0/24")] + with patch("route_check.subprocess.Popen", return_value=_make_proc(lines)): + result = route_check.fetch_routes() + assert result == (["10.0.0.0/24"], []) + + def test_offload_failed_route_in_failing(self): + """B>o (offload failed) β†’ prefix lands in failing_routes.""" + lines = [_bgp_line("o", "10.0.0.0/24")] + with patch("route_check.subprocess.Popen", return_value=_make_proc(lines)): + result = route_check.fetch_routes() + assert result == ([], ["10.0.0.0/24"]) + + def test_non_bgp_routes_ignored(self): + """Non-BGP protocol lines (OSPF, static, connected) are all ignored.""" + lines = [ + "O>* 10.0.0.1/32 [110/20] via 192.0.2.1, Ethernet0\n", # OSPF + "S>* 10.0.0.2/32 [1/0] directly connected, Ethernet0\n", # static + "C>* 192.168.0.0/24 is directly connected, Ethernet0\n", # connected + ] + with patch("route_check.subprocess.Popen", return_value=_make_proc(lines)): + result = route_check.fetch_routes() + assert result == ([], []) + + def test_non_selected_bgp_ignored(self): + """BGP line without '>' (not selected as best route) must not match.""" + lines = ["B q 10.0.0.0/24 [200/0] via 192.0.2.1, Ethernet0\n"] + with patch("route_check.subprocess.Popen", return_value=_make_proc(lines)): + result = route_check.fetch_routes() + assert result == ([], []) + + # --- multi-route output --- + + def test_mixed_flags_multiple_routes(self): + """Mix of *, q, and o lines β†’ correct split between the two output lists.""" + lines = [ + _bgp_line("*", "10.0.0.0/24"), # healthy β†’ skip + _bgp_line("q", "10.1.0.0/24"), # queued β†’ missing + _bgp_line("o", "10.2.0.0/24"), # failed β†’ failing + _bgp_line("q", "10.3.0.0/24"), # queued β†’ missing + ] + with patch("route_check.subprocess.Popen", return_value=_make_proc(lines)): + missing, failing = route_check.fetch_routes() + assert set(missing) == {"10.1.0.0/24", "10.3.0.0/24"} + assert failing == ["10.2.0.0/24"] + + def test_many_queued_routes(self): + """100 B>q lines β†’ 100 entries in missing_routes, failing_routes empty.""" + lines = [_bgp_line("q", f"10.{i}.0.0/16") for i in range(100)] + with patch("route_check.subprocess.Popen", return_value=_make_proc(lines)): + missing, failing = route_check.fetch_routes() + assert set(missing) == {f"10.{i}.0.0/16" for i in range(100)} + assert len(missing) == 100 + assert failing == [] + + def test_empty_output(self): + """Empty vtysh output (healthy device) β†’ ([], []).""" + with patch("route_check.subprocess.Popen", return_value=_make_proc([])): + result = route_check.fetch_routes() + assert result == ([], []) + + def test_header_and_blank_lines_ignored(self): + """vtysh legend, VRF header, and blank lines don't match the regex.""" + lines = [ + "Codes: K - kernel route, C - connected, S - static, R - RIP,\n", + " O - OSPF, I - IS-IS, B - BGP, E - EIGRP, N - NHRP,\n", + "\n", + "VRF default:\n", + _bgp_line("q", "10.0.0.0/24"), + ] + with patch("route_check.subprocess.Popen", return_value=_make_proc(lines)): + missing, failing = route_check.fetch_routes() + assert missing == ["10.0.0.0/24"] + assert failing == [] + + # --- command construction --- + + def test_ipv4_command(self): + """Default call constructs 'show ip route' (no 'json' suffix).""" + with patch("route_check.subprocess.Popen", + return_value=_make_proc([])) as mock_popen: + route_check.fetch_routes(ipv6=False) + cmd = mock_popen.call_args[0][0] + assert cmd == ["sudo", "vtysh", "-c", "show ip route"] + + def test_ipv6_command(self): + """ipv6=True constructs 'show ipv6 route' (no 'json' suffix).""" + with patch("route_check.subprocess.Popen", + return_value=_make_proc([])) as mock_popen: + route_check.fetch_routes(ipv6=True) + cmd = mock_popen.call_args[0][0] + assert cmd == ["sudo", "vtysh", "-c", "show ipv6 route"] + + def test_ipv6_route_parsing(self): + """B>q line with an IPv6 prefix is extracted correctly.""" + lines = ["B>q 2001:db8::/32 [200/0] via fe80::1, Ethernet0\n"] + with patch("route_check.subprocess.Popen", return_value=_make_proc(lines)): + missing, failing = route_check.fetch_routes(ipv6=True) + assert missing == ["2001:db8::/32"] + assert failing == [] + + # --- error handling --- + + def test_nonzero_exit_code_still_returns_routes(self): + """Non-zero vtysh exit is logged; routes collected so far are still returned.""" + lines = [_bgp_line("q", "10.0.0.0/24")] + with patch("route_check.subprocess.Popen", + return_value=_make_proc(lines, exit_code=1)): + result = route_check.fetch_routes() + assert result == (["10.0.0.0/24"], []) + + def test_file_not_found_returns_empty(self): + """FileNotFoundError (vtysh absent) β†’ ([], []) with no exception raised.""" + with patch("route_check.subprocess.Popen", + side_effect=FileNotFoundError("vtysh not found")): + result = route_check.fetch_routes() + assert result == ([], []) + + +# --------------------------------------------------------------------------- +# TestCheckFrrPendingRoutes β€” intersection-based retry logic +# --------------------------------------------------------------------------- + +class TestCheckFrrPendingRoutes: + """ + check_frr_pending_routes() calls get_frr_routes_parallel() up to + FRR_CHECK_RETRIES times and intersects results across iterations so that + only routes stuck in *every* poll are forwarded for mitigation. + """ + + def setup_method(self): + route_check.UNIT_TESTING = 1 + route_check.FRR_WAIT_TIME = 0 + + def test_clears_on_first_poll_no_mitigation(self): + """First poll returns empty β†’ no mitigation; loop exits after one call.""" + with patch.object(route_check, "get_frr_routes_parallel", + return_value=([], [])) as mock_poll: + missed, failed = route_check.check_frr_pending_routes(None) + assert missed == [] + assert failed == [] + assert mock_poll.call_count == 1 + + def test_all_stuck_every_poll_all_mitigated(self): + """Same routes returned in every iteration β†’ all survive the intersection.""" + always_stuck = (["10.0.0.0/24", "10.1.0.0/24"], []) + with patch.object(route_check, "get_frr_routes_parallel", + return_value=always_stuck): + missed, failed = route_check.check_frr_pending_routes(None) + assert set(missed) == {"10.0.0.0/24", "10.1.0.0/24"} + assert failed == [] + + def test_converging_route_not_mitigated(self): + """A route that disappears between iterations is removed from the intersection.""" + # iter 0: A + B stuck; iter 1: only A; iter 2: only A + side_effects = [ + (["10.0.0.0/24", "10.1.0.0/24"], []), + (["10.0.0.0/24"], []), + (["10.0.0.0/24"], []), + ] + with patch.object(route_check, "get_frr_routes_parallel", + side_effect=side_effects): + missed, failed = route_check.check_frr_pending_routes(None) + assert missed == ["10.0.0.0/24"] + assert failed == [] + + def test_all_converge_mid_retry(self): + """All routes clear before retries are exhausted β†’ empty result, early exit.""" + side_effects = [ + (["10.0.0.0/24"], []), # iter 0: stuck + ([], []), # iter 1: clear β†’ break + ] + with patch.object(route_check, "get_frr_routes_parallel", + side_effect=side_effects) as mock_poll: + missed, failed = route_check.check_frr_pending_routes(None) + assert missed == [] + assert failed == [] + assert mock_poll.call_count == 2 + + def test_failed_routes_intersection(self): + """Intersection logic is applied to failing_routes symmetrically.""" + # iter 0: two failing; iter 1: one cleared; iter 2: one still failing + side_effects = [ + ([], ["10.0.0.0/24", "10.1.0.0/24"]), + ([], ["10.0.0.0/24"]), + ([], ["10.0.0.0/24"]), + ] + with patch.object(route_check, "get_frr_routes_parallel", + side_effect=side_effects): + missed, failed = route_check.check_frr_pending_routes(None) + assert missed == [] + assert failed == ["10.0.0.0/24"] diff --git a/tests/route_check_test.py b/tests/route_check_test.py index e4a1336803e..f59326f1b3d 100644 --- a/tests/route_check_test.py +++ b/tests/route_check_test.py @@ -342,7 +342,7 @@ def test_logging(self): def test_mitigate_routes(self, mock_dbs): namespace = DEFAULTNS - missed_frr_rt = [{'prefix': '192.168.0.1', 'protocol': 'bgp'}] + missed_frr_rt = ['192.168.0.1'] rt_appl = ['192.168.0.1'] init_db_conns([namespace]) with patch('sys.stdout', new_callable=StringIO) as mock_stdout: