From 25e0af965b0552caaf4dc4928c67af69e4481106 Mon Sep 17 00:00:00 2001 From: Rajasi Rane Date: Mon, 22 Jun 2026 10:16:24 -0700 Subject: [PATCH 1/3] Feature: [Ubuntu, no-CVM] CVM machines to not get UEFI and kek certificate updates --- src/core/src/bootstrap/Constants.py | 1 + src/core/src/bootstrap/EnvLayer.py | 138 ++++++++++++++++++++++ src/core/src/core_logic/PatchInstaller.py | 12 ++ src/core/tests/Test_EnvLayer.py | 83 ++++++++++++- src/core/tests/Test_PatchInstaller.py | 10 ++ 5 files changed, 243 insertions(+), 1 deletion(-) diff --git a/src/core/src/bootstrap/Constants.py b/src/core/src/bootstrap/Constants.py index b4b7e663..2ad59801 100644 --- a/src/core/src/bootstrap/Constants.py +++ b/src/core/src/bootstrap/Constants.py @@ -59,6 +59,7 @@ class SystemPaths(EnumBackport): class AzGPSPaths(EnumBackport): EULA_SETTINGS = "/var/lib/azure/linuxpatchextension/patch.eula.settings" UEFI_SETTINGS = "/var/lib/azure/linuxpatchextension/patch.uefi.settings" + DETECT_CVM = "/var/lib/azure/linuxpatchextension/patch.detectcvm.sh" class EnvSettings(EnumBackport): LOG_FOLDER = "logFolder" diff --git a/src/core/src/bootstrap/EnvLayer.py b/src/core/src/bootstrap/EnvLayer.py index 4c01ea12..c29d8ce5 100644 --- a/src/core/src/bootstrap/EnvLayer.py +++ b/src/core/src/bootstrap/EnvLayer.py @@ -192,6 +192,144 @@ def get_env_var(self, var_name, raise_if_not_success=False): raise return None + def detect_confidential_vm(self): + # type: () -> tuple + """Returns whether the current VM is a Confidential VM and the detection details.""" + if self.platform.os_type() == 'Windows': + return False, str() + + is_confidential_vm, detection_details = self.detect_confidential_vm_by_imds() + if is_confidential_vm: + return True, detection_details + + is_confidential_vm, detection_details = self.detect_confidential_vm_by_fde() + if is_confidential_vm: + return True, detection_details + + return False, str() + + def detect_confidential_vm_by_fde(self): + # type: () -> tuple + """Runs the FDE-based CVM detection script and returns whether it detected a Confidential VM.""" + script_path = Constants.AzGPSPaths.DETECT_CVM + command_output = str() + detection_script = """#!/usr/bin/env bash +set -euo pipefail + +HOSTNAME=$(hostname) + +ROOT_SRC=$(findmnt -n -o SOURCE /) +ROOT_DEV=$(readlink -f "$ROOT_SRC" || echo "$ROOT_SRC") + +FDE="false" +DETAILS="" + +check_device() { + local dev="$1" + + if blkid "$dev" 2>/dev/null | grep -qi 'crypto_LUKS'; then + FDE="true" + DETAILS="LUKS:$dev" + return + fi + + local type + type=$(lsblk -dn -o TYPE "$dev" 2>/dev/null || true) + + if [[ "$type" == "crypt" ]]; then + FDE="true" + DETAILS="CRYPT:$dev" + return + fi +} + +walk_parents() { + local dev="$1" + + while [[ -n "$dev" ]]; do + check_device "$dev" + + if [[ "$FDE" == "true" ]]; then + return + fi + + local parent + parent=$(lsblk -ndo PKNAME "$dev" 2>/dev/null | head -1 || true) + + if [[ -z "$parent" ]]; then + break + fi + + dev="/dev/$parent" + done +} + +walk_parents "$ROOT_DEV" + +if [[ "$FDE" != "true" ]]; then + while read -r name type; do + if [[ "$type" == "crypt" ]]; then + mapper="/dev/mapper/$name" + + if mount | grep -q "^$mapper on / "; then + FDE="true" + DETAILS="DMCRYPT_ROOT:$mapper" + break + fi + fi + done < <(dmsetup ls --target crypt 2>/dev/null || true) +fi + +if [[ "$FDE" != "true" ]]; then + if systemctl list-units 2>/dev/null | grep -qi azure; then + if ls /var/lib/waagent/*Encryption* >/dev/null 2>&1; then + FDE="true" + DETAILS="AZURE_ADE_ARTIFACTS" + fi + fi +fi + +echo "$HOSTNAME,$ROOT_DEV,FDE=$FDE,$DETAILS" +""" + + try: + script_dir = os.path.dirname(script_path) + if script_dir and not os.path.isdir(script_dir): + try: + os.makedirs(script_dir) + except OSError: + if not os.path.isdir(script_dir): + raise + + self.file_system.write_with_retry(script_path, detection_script, 'w') + + code, out = self.run_command_output('bash "{0}"'.format(script_path), False, False) + command_output = str(out).strip() if out is not None else str() + return code == 0 and re.search(r'\bFDE\s*=\s*true\b', command_output, re.IGNORECASE) is not None, command_output + except Exception: + return False, command_output + finally: + if script_path is not None and os.path.exists(script_path): + try: + os.remove(script_path) + except Exception: + pass + + def detect_confidential_vm_by_imds(self): + # type: () -> tuple + """Queries Azure IMDS and returns whether the VM reports ConfidentialVM security type.""" + command = 'curl -s --connect-timeout 2 --max-time 2 -H Metadata:true --noproxy "*" "http://169.254.169.254/metadata/instance?api-version=2025-04-07"' + + try: + code, out = self.run_command_output(command, False, False) + command_output = str(out).strip() if out is not None else str() + if code == 0 and re.search(r'"securityType"\s*:\s*"ConfidentialVM"', command_output, re.IGNORECASE) is not None: + return True, 'IMDS:ConfidentialVM' + except Exception: + pass + + return False, str() + def run_command_output(self, cmd, no_output=False, chk_err=True): # type: (str, bool, bool) -> (int, any) """ Wrapper for subprocess.check_output. Execute 'cmd'. Returns return code and STDOUT, trapping expected exceptions. Reports exceptions to Error if chk_err parameter is True """ diff --git a/src/core/src/core_logic/PatchInstaller.py b/src/core/src/core_logic/PatchInstaller.py index aee414f4..4a31a428 100644 --- a/src/core/src/core_logic/PatchInstaller.py +++ b/src/core/src/core_logic/PatchInstaller.py @@ -808,6 +808,18 @@ def try_update_certificates_for_default_patching(self): self.composite_logger.log_debug("Not updating certificates since this is not a default patching operation.") return + try: + is_confidential_vm, detection_details = self.env_layer.detect_confidential_vm() + except Exception as e: + is_confidential_vm = False + detection_details = str() + self.composite_logger.log_warning("Unable to determine whether the VM is a Confidential VM before attempting the UEFI certificate update. Continuing with patch installation... [Error: {0}]".format(str(e))) + return + + if is_confidential_vm: + self.composite_logger.log("Skipping UEFI certificate update because this VM was detected as a Confidential VM. [Detection={0}]".format(detection_details)) + return + try: self.package_manager.update_certs() except Exception as e: diff --git a/src/core/tests/Test_EnvLayer.py b/src/core/tests/Test_EnvLayer.py index cecc0ec8..9dfa0bce 100644 --- a/src/core/tests/Test_EnvLayer.py +++ b/src/core/tests/Test_EnvLayer.py @@ -13,7 +13,6 @@ # limitations under the License. # # Requires Python 2.7+ -import io import platform import sys import unittest @@ -138,6 +137,88 @@ def test_is_distro_azure_linux_3(self): # restore original methods distro.os_release_attr = self.backup_envlayer_distro_os_release_attr + def test_detect_confidential_vm_by_fde(self): + backup_run_command_output = self.envlayer.run_command_output + + self.envlayer.run_command_output = lambda cmd, no_output=False, chk_err=False: (0, 'test-vm,/dev/sda1,FDE=true,LUKS:/dev/sda1') + is_confidential_vm, detection_details = self.envlayer.detect_confidential_vm_by_fde() + + self.assertTrue(is_confidential_vm) + self.assertIn('FDE=true', detection_details) + + self.envlayer.run_command_output = backup_run_command_output + + def test_detect_confidential_vm_by_imds(self): + backup_run_command_output = self.envlayer.run_command_output + + self.envlayer.run_command_output = lambda cmd, no_output=False, chk_err=False: (0, '{"compute":{"securityProfile":{"securityType":"ConfidentialVM"}}}') + is_confidential_vm, detection_details = self.envlayer.detect_confidential_vm_by_imds() + + self.assertTrue(is_confidential_vm) + self.assertEqual('IMDS:ConfidentialVM', detection_details) + + self.envlayer.run_command_output = backup_run_command_output + + def test_detect_confidential_vm_checks_imds_before_fde(self): + backup_platform = self.envlayer.platform + backup_detect_confidential_vm_by_fde = self.envlayer.detect_confidential_vm_by_fde + backup_detect_confidential_vm_by_imds = self.envlayer.detect_confidential_vm_by_imds + + calls = [] + self.envlayer.platform = self.envlayer.Platform() + self.envlayer.platform.os_type = lambda: 'Linux' + + def detect_confidential_vm_by_fde(): + calls.append('fde') + return True, 'test-vm,/dev/sda1,FDE=true,LUKS:/dev/sda1' + + def detect_confidential_vm_by_imds(): + calls.append('imds') + return True, 'IMDS:ConfidentialVM' + + self.envlayer.detect_confidential_vm_by_fde = detect_confidential_vm_by_fde + self.envlayer.detect_confidential_vm_by_imds = detect_confidential_vm_by_imds + + is_confidential_vm, detection_details = self.envlayer.detect_confidential_vm() + + self.assertTrue(is_confidential_vm) + self.assertIn('IMDS:ConfidentialVM', detection_details) + self.assertEqual(['imds'], calls) + + self.envlayer.platform = backup_platform + self.envlayer.detect_confidential_vm_by_fde = backup_detect_confidential_vm_by_fde + self.envlayer.detect_confidential_vm_by_imds = backup_detect_confidential_vm_by_imds + + def test_detect_confidential_vm_checks_fde_when_imds_not_detected(self): + backup_platform = self.envlayer.platform + backup_detect_confidential_vm_by_fde = self.envlayer.detect_confidential_vm_by_fde + backup_detect_confidential_vm_by_imds = self.envlayer.detect_confidential_vm_by_imds + + calls = [] + self.envlayer.platform = self.envlayer.Platform() + self.envlayer.platform.os_type = lambda: 'Linux' + + def detect_confidential_vm_by_fde(): + calls.append('fde') + return True, 'test-vm,/dev/sda1,FDE=true,LUKS:/dev/sda1' + + def detect_confidential_vm_by_imds(): + calls.append('imds') + return False, str() + + self.envlayer.detect_confidential_vm_by_fde = detect_confidential_vm_by_fde + self.envlayer.detect_confidential_vm_by_imds = detect_confidential_vm_by_imds + + is_confidential_vm, detection_details = self.envlayer.detect_confidential_vm() + + self.assertTrue(is_confidential_vm) + self.assertIn('FDE=true', detection_details) + self.assertEqual(['imds', 'fde'], calls) + + self.envlayer.platform = backup_platform + self.envlayer.detect_confidential_vm_by_fde = backup_detect_confidential_vm_by_fde + self.envlayer.detect_confidential_vm_by_imds = backup_detect_confidential_vm_by_imds + def test_filesystem(self): # only validates if these invocable without exceptions backup_retry_count = Constants.MAX_FILE_OPERATION_RETRY_COUNT diff --git a/src/core/tests/Test_PatchInstaller.py b/src/core/tests/Test_PatchInstaller.py index 45900263..6fa8b7ad 100644 --- a/src/core/tests/Test_PatchInstaller.py +++ b/src/core/tests/Test_PatchInstaller.py @@ -816,6 +816,16 @@ def test_try_update_certs_swallows_exception_from_update_certs(self): runtime.patch_installer.package_manager.update_certs = backup_up_update_certs runtime.stop() + + def test_try_update_certificates_skips_confidential_vm(self): + runtime = self._create_update_certs_runtime(enable_uefi_cert_update=True, health_store_id="pub_off_sku_2025.01.01") + runtime.patch_installer.env_layer.detect_confidential_vm = lambda: (True, 'IMDS:ConfidentialVM') + + method_called = self._track_method_call(runtime.patch_installer.package_manager, 'update_certs') + runtime.patch_installer.start_installation(simulate=True) + + self.assertEqual(len(method_called), 0) + runtime.stop() # endregion From 627bb930f37232971d0189b1a0b97b66866a833f Mon Sep 17 00:00:00 2001 From: Rajasi Rane Date: Mon, 22 Jun 2026 13:41:11 -0700 Subject: [PATCH 2/3] Feature: [Ubuntu, no-CVM] Added additional UT coverage and addressed some comments --- src/core/src/bootstrap/EnvLayer.py | 18 +-- src/core/src/core_logic/PatchInstaller.py | 2 - src/core/tests/Test_EnvLayer.py | 153 +++++++++++++--------- src/core/tests/Test_PatchInstaller.py | 22 +++- 4 files changed, 122 insertions(+), 73 deletions(-) diff --git a/src/core/src/bootstrap/EnvLayer.py b/src/core/src/bootstrap/EnvLayer.py index c29d8ce5..4d23fbd0 100644 --- a/src/core/src/bootstrap/EnvLayer.py +++ b/src/core/src/bootstrap/EnvLayer.py @@ -297,9 +297,8 @@ def detect_confidential_vm_by_fde(self): if script_dir and not os.path.isdir(script_dir): try: os.makedirs(script_dir) - except OSError: - if not os.path.isdir(script_dir): - raise + except Exception: + raise self.file_system.write_with_retry(script_path, detection_script, 'w') @@ -307,7 +306,7 @@ def detect_confidential_vm_by_fde(self): command_output = str(out).strip() if out is not None else str() return code == 0 and re.search(r'\bFDE\s*=\s*true\b', command_output, re.IGNORECASE) is not None, command_output except Exception: - return False, command_output + raise Exception("FDE_DETECTION_ERROR:{0}; OUTPUT:{1}".format(str(e), command_output)) finally: if script_path is not None and os.path.exists(script_path): try: @@ -320,13 +319,10 @@ def detect_confidential_vm_by_imds(self): """Queries Azure IMDS and returns whether the VM reports ConfidentialVM security type.""" command = 'curl -s --connect-timeout 2 --max-time 2 -H Metadata:true --noproxy "*" "http://169.254.169.254/metadata/instance?api-version=2025-04-07"' - try: - code, out = self.run_command_output(command, False, False) - command_output = str(out).strip() if out is not None else str() - if code == 0 and re.search(r'"securityType"\s*:\s*"ConfidentialVM"', command_output, re.IGNORECASE) is not None: - return True, 'IMDS:ConfidentialVM' - except Exception: - pass + code, out = self.run_command_output(command, False, False) + command_output = str(out).strip() if out is not None else str() + if code == 0 and re.search(r'"securityType"\s*:\s*"ConfidentialVM"', command_output, re.IGNORECASE) is not None: + return True, 'IMDS:ConfidentialVM' return False, str() diff --git a/src/core/src/core_logic/PatchInstaller.py b/src/core/src/core_logic/PatchInstaller.py index 4a31a428..81bb0976 100644 --- a/src/core/src/core_logic/PatchInstaller.py +++ b/src/core/src/core_logic/PatchInstaller.py @@ -811,8 +811,6 @@ def try_update_certificates_for_default_patching(self): try: is_confidential_vm, detection_details = self.env_layer.detect_confidential_vm() except Exception as e: - is_confidential_vm = False - detection_details = str() self.composite_logger.log_warning("Unable to determine whether the VM is a Confidential VM before attempting the UEFI certificate update. Continuing with patch installation... [Error: {0}]".format(str(e))) return diff --git a/src/core/tests/Test_EnvLayer.py b/src/core/tests/Test_EnvLayer.py index 9dfa0bce..ea0f3d73 100644 --- a/src/core/tests/Test_EnvLayer.py +++ b/src/core/tests/Test_EnvLayer.py @@ -13,6 +13,7 @@ # limitations under the License. # # Requires Python 2.7+ +import os import platform import sys import unittest @@ -84,6 +85,42 @@ def mock_linux_distribution_to_return_rhel_10(self): def mock_distro_os_release_attr_return_rhel_10(self, attribute): return '10.0' + + def mock_run_command_output_fde_true(self, cmd, no_output=False, chk_err=False): + return 0, 'test-vm,/dev/sda1,FDE=true,LUKS:/dev/sda1' + + def mock_run_command_output_fde_false(self, cmd, no_output=False, chk_err=False): + return 0, 'test-vm,/dev/sda1,FDE=false,LUKS:/dev/sda1' + + def mock_run_command_output_imds_true(self, cmd, no_output=False, chk_err=False): + return 0, '"securityProfile": { "encryptionAtHost": "false", "secureBootEnabled": "false", "securityType": "ConfidentialVM", "virtualTpmEnabled": "false"}' + + def mock_run_command_output_imds_false(self, cmd, no_output=False, chk_err=False): + return 0, '{"compute":{"securityProfile":{"securityType":""}}}' + + def mock_run_command_raises_exception(self, cmd, no_output=False, chk_err=False): + raise Exception('Test Exception') + + def mock_detect_confidential_vm_by_fde_returns_true(self): + return True, 'test-vm,/dev/sda1,FDE=true,LUKS:/dev/sda1' + + def mock_detect_confidential_vm_by_fde_returns_false(self): + return False, str() + + def mock_detect_confidential_vm_by_imds_returns_true(self): + return True, 'IMDS:ConfidentialVM' + + def mock_detect_confidential_vm_by_imds_returns_false(self): + return False, str() + + def mock_os_remove_raises_exeception(self, path): + raise Exception('Test Exception') + + def mock_os_makedirs_raises_exeception(self, path): + raise Exception('Test Exception') + + def mock_os_path_isdir_returns_false(self, path): + return False # endregion def test_get_package_manager(self): @@ -138,84 +175,82 @@ def test_is_distro_azure_linux_3(self): distro.os_release_attr = self.backup_envlayer_distro_os_release_attr def test_detect_confidential_vm_by_fde(self): + backup_detect_cvm_bash_file_path = Constants.AzGPSPaths.DETECT_CVM backup_run_command_output = self.envlayer.run_command_output + backup_os_remove = os.remove + backup_os_path_isdir = os.path.isdir + backup_os_makedirs = os.makedirs - self.envlayer.run_command_output = lambda cmd, no_output=False, chk_err=False: (0, 'test-vm,/dev/sda1,FDE=true,LUKS:/dev/sda1') - is_confidential_vm, detection_details = self.envlayer.detect_confidential_vm_by_fde() + test_input_output_table = [ + [self.mock_run_command_output_fde_true, backup_os_remove, backup_os_path_isdir, backup_os_makedirs, False, True, 'FDE=true'], + [self.mock_run_command_output_fde_false, backup_os_remove, backup_os_path_isdir, backup_os_makedirs, False, False, str()], + [self.mock_run_command_output_fde_true, self.mock_os_remove_raises_exeception, backup_os_path_isdir, backup_os_makedirs, False, True, 'FDE=true'], + [self.mock_run_command_output_fde_true, backup_os_remove, self.mock_os_path_isdir_returns_false, self.mock_os_makedirs_raises_exeception, True, False, str()], + [self.mock_run_command_output_fde_true, self.mock_os_remove_raises_exeception, self.mock_os_path_isdir_returns_false, self.mock_os_makedirs_raises_exeception, True, False, str()], + ] - self.assertTrue(is_confidential_vm) - self.assertIn('FDE=true', detection_details) + Constants.AzGPSPaths.DETECT_CVM = os.path.join(os.getcwd(), 'patch.detectcvm.sh') + for row in test_input_output_table: + self.envlayer.run_command_output = row[0] + os.remove = row[1] + os.path.isdir = row[2] + os.makedirs = row[3] + expected_raises_exception = row[4] + expected_is_confidential_vm = row[5] + expected_detection_details = row[6] + + if expected_raises_exception: + self.assertRaises(Exception, self.envlayer.detect_confidential_vm_by_fde) + else: + is_confidential_vm, detection_details = self.envlayer.detect_confidential_vm_by_fde() + self.assertEqual(is_confidential_vm, expected_is_confidential_vm) + self.assertIn(expected_detection_details, detection_details) self.envlayer.run_command_output = backup_run_command_output + os.remove = backup_os_remove + os.path.isdir = backup_os_path_isdir + os.makedirs = backup_os_makedirs + Constants.AzGPSPaths.DETECT_CVM = backup_detect_cvm_bash_file_path def test_detect_confidential_vm_by_imds(self): backup_run_command_output = self.envlayer.run_command_output - self.envlayer.run_command_output = lambda cmd, no_output=False, chk_err=False: (0, '{"compute":{"securityProfile":{"securityType":"ConfidentialVM"}}}') - is_confidential_vm, detection_details = self.envlayer.detect_confidential_vm_by_imds() + test_input_output_table = [ + [self.mock_run_command_output_imds_true, True, 'IMDS:ConfidentialVM'], + [self.mock_run_command_output_imds_false, False, str()], + ] - self.assertTrue(is_confidential_vm) - self.assertEqual('IMDS:ConfidentialVM', detection_details) + for row in test_input_output_table: + self.envlayer.run_command_output = row[0] + is_confidential_vm, detection_details = self.envlayer.detect_confidential_vm_by_imds() + self.assertEqual(is_confidential_vm, row[1]) + self.assertIn(row[2], detection_details) self.envlayer.run_command_output = backup_run_command_output - def test_detect_confidential_vm_checks_imds_before_fde(self): - backup_platform = self.envlayer.platform - backup_detect_confidential_vm_by_fde = self.envlayer.detect_confidential_vm_by_fde - backup_detect_confidential_vm_by_imds = self.envlayer.detect_confidential_vm_by_imds - - calls = [] - self.envlayer.platform = self.envlayer.Platform() - self.envlayer.platform.os_type = lambda: 'Linux' - - def detect_confidential_vm_by_fde(): - calls.append('fde') - return True, 'test-vm,/dev/sda1,FDE=true,LUKS:/dev/sda1' - - def detect_confidential_vm_by_imds(): - calls.append('imds') - return True, 'IMDS:ConfidentialVM' - - self.envlayer.detect_confidential_vm_by_fde = detect_confidential_vm_by_fde - self.envlayer.detect_confidential_vm_by_imds = detect_confidential_vm_by_imds - - is_confidential_vm, detection_details = self.envlayer.detect_confidential_vm() - - self.assertTrue(is_confidential_vm) - self.assertIn('IMDS:ConfidentialVM', detection_details) - self.assertEqual(['imds'], calls) - - self.envlayer.platform = backup_platform - self.envlayer.detect_confidential_vm_by_fde = backup_detect_confidential_vm_by_fde - self.envlayer.detect_confidential_vm_by_imds = backup_detect_confidential_vm_by_imds + def test_detect_confidential_vm(self): + self.backup_platform_system = platform.system - def test_detect_confidential_vm_checks_fde_when_imds_not_detected(self): - backup_platform = self.envlayer.platform backup_detect_confidential_vm_by_fde = self.envlayer.detect_confidential_vm_by_fde backup_detect_confidential_vm_by_imds = self.envlayer.detect_confidential_vm_by_imds - calls = [] - self.envlayer.platform = self.envlayer.Platform() - self.envlayer.platform.os_type = lambda: 'Linux' - - def detect_confidential_vm_by_fde(): - calls.append('fde') - return True, 'test-vm,/dev/sda1,FDE=true,LUKS:/dev/sda1' - - def detect_confidential_vm_by_imds(): - calls.append('imds') - return False, str() - - self.envlayer.detect_confidential_vm_by_fde = detect_confidential_vm_by_fde - self.envlayer.detect_confidential_vm_by_imds = detect_confidential_vm_by_imds - - is_confidential_vm, detection_details = self.envlayer.detect_confidential_vm() + test_input_output_table = [ + ["Linux", self.mock_detect_confidential_vm_by_fde_returns_true, self.mock_detect_confidential_vm_by_imds_returns_true, True, 'IMDS:ConfidentialVM'], + ["Linux", self.mock_detect_confidential_vm_by_fde_returns_true, self.mock_detect_confidential_vm_by_imds_returns_false, True, 'FDE=true'], + ["Windows", self.mock_run_command_output_fde_true, self.mock_run_command_output_imds_true, False, str()], + ["Linux", self.mock_detect_confidential_vm_by_fde_returns_false, self.mock_detect_confidential_vm_by_imds_returns_false, False, str()], + ] - self.assertTrue(is_confidential_vm) - self.assertIn('FDE=true', detection_details) - self.assertEqual(['imds', 'fde'], calls) + for row in test_input_output_table: + platform.system = self.mock_platform_system if row[0] == 'Linux' else self.mock_platform_system_windows + self.envlayer.detect_confidential_vm_by_fde = row[1] + self.envlayer.detect_confidential_vm_by_imds = row[2] + is_confidential_vm, detection_details = self.envlayer.detect_confidential_vm() + self.assertEqual(is_confidential_vm, row[3]) + self.assertIn(row[4], detection_details) - self.envlayer.platform = backup_platform + # restore original methods + platform.system = self.backup_platform_system self.envlayer.detect_confidential_vm_by_fde = backup_detect_confidential_vm_by_fde self.envlayer.detect_confidential_vm_by_imds = backup_detect_confidential_vm_by_imds diff --git a/src/core/tests/Test_PatchInstaller.py b/src/core/tests/Test_PatchInstaller.py index 6fa8b7ad..affbe97d 100644 --- a/src/core/tests/Test_PatchInstaller.py +++ b/src/core/tests/Test_PatchInstaller.py @@ -35,6 +35,12 @@ def tearDown(self): # region Mocks def mock_update_certs_raise_exception(self): raise Exception("Simulated cert update failure") + + def mock_detect_confidential_vm_raises_exception(self): + raise Exception("Simulated VM detection failure") + + def mock_detect_confidential_vm_by_imds_returns_true(self): + return True, 'IMDS:ConfidentialVM' # endregion # region Utility functions (update cert tests) @@ -819,12 +825,26 @@ def test_try_update_certs_swallows_exception_from_update_certs(self): def test_try_update_certificates_skips_confidential_vm(self): runtime = self._create_update_certs_runtime(enable_uefi_cert_update=True, health_store_id="pub_off_sku_2025.01.01") - runtime.patch_installer.env_layer.detect_confidential_vm = lambda: (True, 'IMDS:ConfidentialVM') + backup_detect_confidential_vm_by_imds = runtime.env_layer.detect_confidential_vm_by_imds + runtime.env_layer.detect_confidential_vm_by_imds = self.mock_detect_confidential_vm_by_imds_returns_true method_called = self._track_method_call(runtime.patch_installer.package_manager, 'update_certs') runtime.patch_installer.start_installation(simulate=True) + self.assertEqual(len(method_called), 0) + + runtime.env_layer.detect_confidential_vm_by_imds = backup_detect_confidential_vm_by_imds + runtime.stop() + def test_try_update_certificates_skips_when_detect_confidential_vm_raises_exception(self): + runtime = self._create_update_certs_runtime(enable_uefi_cert_update=True, health_store_id="pub_off_sku_2025.01.01") + backup_detect_confidential_vm = runtime.env_layer.detect_confidential_vm + + runtime.env_layer.detect_confidential_vm = self.mock_detect_confidential_vm_raises_exception + method_called = self._track_method_call(runtime.patch_installer.package_manager, 'update_certs') + runtime.patch_installer.start_installation(simulate=True) self.assertEqual(len(method_called), 0) + + runtime.env_layer.detect_confidential_vm = backup_detect_confidential_vm runtime.stop() # endregion From ef8d5543dc260f48b500efbdd73305aa08c87596 Mon Sep 17 00:00:00 2001 From: Rajasi Rane Date: Wed, 24 Jun 2026 06:36:03 -0700 Subject: [PATCH 3/3] Feature: [Ubuntu, no-CVM] Addressing PR feedback #1 --- src/core/src/bootstrap/Constants.py | 4 +- .../src/bootstrap/DetectConfidentialVmShim.sh | 92 ++++++++++++++ src/core/src/bootstrap/EnvLayer.py | 117 +++--------------- .../AptitudePackageManager.py | 10 +- src/core/tests/Test_EnvLayer.py | 45 +++---- src/tools/Package-Core.py | 9 ++ 6 files changed, 145 insertions(+), 132 deletions(-) create mode 100644 src/core/src/bootstrap/DetectConfidentialVmShim.sh diff --git a/src/core/src/bootstrap/Constants.py b/src/core/src/bootstrap/Constants.py index 2ad59801..d014419d 100644 --- a/src/core/src/bootstrap/Constants.py +++ b/src/core/src/bootstrap/Constants.py @@ -59,7 +59,6 @@ class SystemPaths(EnumBackport): class AzGPSPaths(EnumBackport): EULA_SETTINGS = "/var/lib/azure/linuxpatchextension/patch.eula.settings" UEFI_SETTINGS = "/var/lib/azure/linuxpatchextension/patch.uefi.settings" - DETECT_CVM = "/var/lib/azure/linuxpatchextension/patch.detectcvm.sh" class EnvSettings(EnumBackport): LOG_FOLDER = "logFolder" @@ -102,6 +101,9 @@ class Certificates(EnumBackport): KEK = "KEK" DB = "DB" + # Confidential VM detection shim script name + DETECT_CVM_SHIM_FILE_NAME = "DetectConfidentialVmShim.sh" + # File to save default settings for auto OS updates IMAGE_DEFAULT_PATCH_CONFIGURATION_BACKUP_PATH = "ImageDefaultPatchConfiguration.bak" diff --git a/src/core/src/bootstrap/DetectConfidentialVmShim.sh b/src/core/src/bootstrap/DetectConfidentialVmShim.sh new file mode 100644 index 00000000..fa8cd2ea --- /dev/null +++ b/src/core/src/bootstrap/DetectConfidentialVmShim.sh @@ -0,0 +1,92 @@ +#!/usr/bin/env bash +# Copyright 2020 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -euo pipefail + +HOSTNAME=$(hostname) + +ROOT_SRC=$(findmnt -n -o SOURCE /) +ROOT_DEV=$(readlink -f "$ROOT_SRC" || echo "$ROOT_SRC") + +FDE="false" +DETAILS="" + +check_device() { + local dev="$1" + + if blkid "$dev" 2>/dev/null | grep -qi 'crypto_LUKS'; then + FDE="true" + DETAILS="LUKS:$dev" + return + fi + + local type + type=$(lsblk -dn -o TYPE "$dev" 2>/dev/null || true) + + if [[ "$type" == "crypt" ]]; then + FDE="true" + DETAILS="CRYPT:$dev" + return + fi +} + +walk_parents() { + local dev="$1" + + while [[ -n "$dev" ]]; do + check_device "$dev" + + if [[ "$FDE" == "true" ]]; then + return + fi + + local parent + parent=$(lsblk -ndo PKNAME "$dev" 2>/dev/null | head -1 || true) + + if [[ -z "$parent" ]]; then + break + fi + + dev="/dev/$parent" + done +} + +walk_parents "$ROOT_DEV" + +if [[ "$FDE" != "true" ]]; then + while read -r name type; do + if [[ "$type" == "crypt" ]]; then + mapper="/dev/mapper/$name" + + if mount | grep -q "^$mapper on / "; then + FDE="true" + DETAILS="DMCRYPT_ROOT:$mapper" + break + fi + fi + done < <(dmsetup ls --target crypt 2>/dev/null || true) +fi + +if [[ "$FDE" != "true" ]]; then + if systemctl list-units 2>/dev/null | grep -qi azure; then + if ls /var/lib/waagent/*Encryption* >/dev/null 2>&1; then + FDE="true" + DETAILS="AZURE_ADE_ARTIFACTS" + fi + fi +fi + +echo "$HOSTNAME,$ROOT_DEV,FDE=$FDE,$DETAILS" + diff --git a/src/core/src/bootstrap/EnvLayer.py b/src/core/src/bootstrap/EnvLayer.py index 4d23fbd0..6b1d690d 100644 --- a/src/core/src/bootstrap/EnvLayer.py +++ b/src/core/src/bootstrap/EnvLayer.py @@ -200,119 +200,42 @@ def detect_confidential_vm(self): is_confidential_vm, detection_details = self.detect_confidential_vm_by_imds() if is_confidential_vm: - return True, detection_details + return is_confidential_vm, detection_details is_confidential_vm, detection_details = self.detect_confidential_vm_by_fde() if is_confidential_vm: - return True, detection_details + return is_confidential_vm, detection_details return False, str() def detect_confidential_vm_by_fde(self): # type: () -> tuple """Runs the FDE-based CVM detection script and returns whether it detected a Confidential VM.""" - script_path = Constants.AzGPSPaths.DETECT_CVM command_output = str() - detection_script = """#!/usr/bin/env bash -set -euo pipefail - -HOSTNAME=$(hostname) - -ROOT_SRC=$(findmnt -n -o SOURCE /) -ROOT_DEV=$(readlink -f "$ROOT_SRC" || echo "$ROOT_SRC") - -FDE="false" -DETAILS="" - -check_device() { - local dev="$1" - - if blkid "$dev" 2>/dev/null | grep -qi 'crypto_LUKS'; then - FDE="true" - DETAILS="LUKS:$dev" - return - fi - - local type - type=$(lsblk -dn -o TYPE "$dev" 2>/dev/null || true) - - if [[ "$type" == "crypt" ]]; then - FDE="true" - DETAILS="CRYPT:$dev" - return - fi -} - -walk_parents() { - local dev="$1" - - while [[ -n "$dev" ]]; do - check_device "$dev" - - if [[ "$FDE" == "true" ]]; then - return - fi - - local parent - parent=$(lsblk -ndo PKNAME "$dev" 2>/dev/null | head -1 || true) - - if [[ -z "$parent" ]]; then - break - fi - - dev="/dev/$parent" - done -} - -walk_parents "$ROOT_DEV" - -if [[ "$FDE" != "true" ]]; then - while read -r name type; do - if [[ "$type" == "crypt" ]]; then - mapper="/dev/mapper/$name" - - if mount | grep -q "^$mapper on / "; then - FDE="true" - DETAILS="DMCRYPT_ROOT:$mapper" - break - fi - fi - done < <(dmsetup ls --target crypt 2>/dev/null || true) -fi - -if [[ "$FDE" != "true" ]]; then - if systemctl list-units 2>/dev/null | grep -qi azure; then - if ls /var/lib/waagent/*Encryption* >/dev/null 2>&1; then - FDE="true" - DETAILS="AZURE_ADE_ARTIFACTS" - fi - fi -fi - -echo "$HOSTNAME,$ROOT_DEV,FDE=$FDE,$DETAILS" -""" try: - script_dir = os.path.dirname(script_path) - if script_dir and not os.path.isdir(script_dir): - try: - os.makedirs(script_dir) - except Exception: - raise - - self.file_system.write_with_retry(script_path, detection_script, 'w') + detection_shim_path = self.__get_fde_detection_shim_path() + detection_script = self.file_system.read_with_retry(detection_shim_path) + if detection_script is None or str(detection_script).strip() == str(): + raise Exception("FDE_DETECTION_SHIM_EMPTY:{0}".format(str(detection_shim_path))) - code, out = self.run_command_output('bash "{0}"'.format(script_path), False, False) + code, out = self.run_command_output('bash "{0}"'.format(detection_shim_path), False, False) command_output = str(out).strip() if out is not None else str() return code == 0 and re.search(r'\bFDE\s*=\s*true\b', command_output, re.IGNORECASE) is not None, command_output - except Exception: + except Exception as e: raise Exception("FDE_DETECTION_ERROR:{0}; OUTPUT:{1}".format(str(e), command_output)) - finally: - if script_path is not None and os.path.exists(script_path): - try: - os.remove(script_path) - except Exception: - pass + + @staticmethod + def __get_fde_detection_shim_path(): + # type: () -> str + """Resolves the packaged FDE detection shim path.""" + current_dir = os.path.dirname(os.path.realpath(__file__)) + shim_path = os.path.join(current_dir, Constants.DETECT_CVM_SHIM_FILE_NAME) + + if os.path.isfile(shim_path): + return shim_path + + raise Exception("FDE_DETECTION_SHIM_NOT_FOUND:{0}".format(str(shim_path))) def detect_confidential_vm_by_imds(self): # type: () -> tuple diff --git a/src/core/src/package_managers/AptitudePackageManager.py b/src/core/src/package_managers/AptitudePackageManager.py index 9439d496..860ef9a7 100644 --- a/src/core/src/package_managers/AptitudePackageManager.py +++ b/src/core/src/package_managers/AptitudePackageManager.py @@ -975,12 +975,12 @@ def try_update_certs(self): success = False try: - self.__run_cert_apt_command(self.apt_update_cmd, "AptUpdate", raise_on_error=True) + self.__run_cert_apt_command(self.apt_update_cmd, step_name="AptUpdate", raise_on_error=True) self.__ensure_fwupd_installation() # shell fwupd commands to update certificates - self.__run_cert_shell_command(self.fwupd_refresh_cmd, "FwupdRefresh", raise_on_error=True) - self.__run_cert_shell_command(self.fwupd_update_cmd, "FwupdUpdate", raise_on_error=True) + self.__run_cert_shell_command(self.fwupd_refresh_cmd, step_name="FwupdRefresh", raise_on_error=True) + self.__run_cert_shell_command(self.fwupd_update_cmd, step_name="FwupdUpdate", raise_on_error=True) """ NOTE: They tooling used to update here is fwupd (firmware update manager). In this method of updating certs, the exact version of current certs is never pinned or set/referred while installing. fwupd fetches and installs latest available certs. This is beneficial because our code doesn't become dated in the future @@ -1015,11 +1015,11 @@ def __ensure_fwupd_installation(self): if installed_version != str(): self.composite_logger.log("[APM][Certs] Existing fwupd version is below minimum. Reinstalling latest. [InstalledVersion={0}][MinimumVersion={1}]" .format(installed_version, self.min_fwupd_version)) - self.__run_cert_apt_command(self.remove_fwupd_cmd, "RemoveOldFwupd", raise_on_error=True) + self.__run_cert_apt_command(self.remove_fwupd_cmd, step_name="RemoveOldFwupd", raise_on_error=True) else: self.composite_logger.log_debug("[APM][Certs] fwupd is not installed. Installing latest version.") - self.__run_cert_apt_command(self.install_fwupd_cmd, "InstallFwupd", raise_on_error=True) + self.__run_cert_apt_command(self.install_fwupd_cmd, step_name="InstallFwupd", raise_on_error=True) # Validate that the installed fwupd meets the minimum requirement. installed_version = self.__get_installed_fwupd_version() diff --git a/src/core/tests/Test_EnvLayer.py b/src/core/tests/Test_EnvLayer.py index ea0f3d73..c9041752 100644 --- a/src/core/tests/Test_EnvLayer.py +++ b/src/core/tests/Test_EnvLayer.py @@ -98,9 +98,6 @@ def mock_run_command_output_imds_true(self, cmd, no_output=False, chk_err=False) def mock_run_command_output_imds_false(self, cmd, no_output=False, chk_err=False): return 0, '{"compute":{"securityProfile":{"securityType":""}}}' - def mock_run_command_raises_exception(self, cmd, no_output=False, chk_err=False): - raise Exception('Test Exception') - def mock_detect_confidential_vm_by_fde_returns_true(self): return True, 'test-vm,/dev/sda1,FDE=true,LUKS:/dev/sda1' @@ -113,13 +110,10 @@ def mock_detect_confidential_vm_by_imds_returns_true(self): def mock_detect_confidential_vm_by_imds_returns_false(self): return False, str() - def mock_os_remove_raises_exeception(self, path): - raise Exception('Test Exception') - - def mock_os_makedirs_raises_exeception(self, path): - raise Exception('Test Exception') + def mock_file_system_read_with_retry_returns_empty(self, file_path_or_handle, raise_if_not_found=True): + return str() - def mock_os_path_isdir_returns_false(self, path): + def mock_os_path_isfile_returns_false(self, path): return False # endregion @@ -175,29 +169,24 @@ def test_is_distro_azure_linux_3(self): distro.os_release_attr = self.backup_envlayer_distro_os_release_attr def test_detect_confidential_vm_by_fde(self): - backup_detect_cvm_bash_file_path = Constants.AzGPSPaths.DETECT_CVM backup_run_command_output = self.envlayer.run_command_output - backup_os_remove = os.remove - backup_os_path_isdir = os.path.isdir - backup_os_makedirs = os.makedirs + backup_read_with_retry = self.envlayer.file_system.read_with_retry + backup_os_path_isfile = os.path.isfile test_input_output_table = [ - [self.mock_run_command_output_fde_true, backup_os_remove, backup_os_path_isdir, backup_os_makedirs, False, True, 'FDE=true'], - [self.mock_run_command_output_fde_false, backup_os_remove, backup_os_path_isdir, backup_os_makedirs, False, False, str()], - [self.mock_run_command_output_fde_true, self.mock_os_remove_raises_exeception, backup_os_path_isdir, backup_os_makedirs, False, True, 'FDE=true'], - [self.mock_run_command_output_fde_true, backup_os_remove, self.mock_os_path_isdir_returns_false, self.mock_os_makedirs_raises_exeception, True, False, str()], - [self.mock_run_command_output_fde_true, self.mock_os_remove_raises_exeception, self.mock_os_path_isdir_returns_false, self.mock_os_makedirs_raises_exeception, True, False, str()], + [self.mock_run_command_output_fde_true, backup_os_path_isfile, backup_read_with_retry, False, True, 'FDE=true'], + [self.mock_run_command_output_fde_false, backup_os_path_isfile, backup_read_with_retry, False, False, str()], + [self.mock_run_command_output_fde_true, backup_os_path_isfile, self.mock_file_system_read_with_retry_returns_empty, True, False, str()], + [self.mock_run_command_output_fde_true, self.mock_os_path_isfile_returns_false, backup_read_with_retry, True, False, str()], ] - Constants.AzGPSPaths.DETECT_CVM = os.path.join(os.getcwd(), 'patch.detectcvm.sh') for row in test_input_output_table: self.envlayer.run_command_output = row[0] - os.remove = row[1] - os.path.isdir = row[2] - os.makedirs = row[3] - expected_raises_exception = row[4] - expected_is_confidential_vm = row[5] - expected_detection_details = row[6] + os.path.isfile = row[1] + self.envlayer.file_system.read_with_retry = row[2] + expected_raises_exception = row[3] + expected_is_confidential_vm = row[4] + expected_detection_details = row[5] if expected_raises_exception: self.assertRaises(Exception, self.envlayer.detect_confidential_vm_by_fde) @@ -207,10 +196,8 @@ def test_detect_confidential_vm_by_fde(self): self.assertIn(expected_detection_details, detection_details) self.envlayer.run_command_output = backup_run_command_output - os.remove = backup_os_remove - os.path.isdir = backup_os_path_isdir - os.makedirs = backup_os_makedirs - Constants.AzGPSPaths.DETECT_CVM = backup_detect_cvm_bash_file_path + self.envlayer.file_system.read_with_retry = backup_read_with_retry + os.path.isfile = backup_os_path_isfile def test_detect_confidential_vm_by_imds(self): backup_run_command_output = self.envlayer.run_command_output diff --git a/src/tools/Package-Core.py b/src/tools/Package-Core.py index afbcc01f..1404e6d0 100644 --- a/src/tools/Package-Core.py +++ b/src/tools/Package-Core.py @@ -238,6 +238,15 @@ def main(argv): external_dependencies_source_code_path = os.path.join(source_code_path, 'external_dependencies') add_external_dependencies(external_dependencies_destination, external_dependencies_source_code_path) + # Copy core shim files + enforce UNIX style line endings + print('\n========== Copying core shim files + enforcing UNIX style line endings.\n') + core_shim_files = ['DetectConfidentialVmShim.sh'] + for core_shim_file in core_shim_files: + core_shim_src = os.path.join(working_directory, 'core', 'src', 'bootstrap', core_shim_file) + core_shim_destination = os.path.join(working_directory, 'out', core_shim_file) + shutil.copyfile(core_shim_src, core_shim_destination) + replace_text_in_file(core_shim_destination, '\r\n', '\n') + except Exception as error: print('Exception during packaging all python modules in core: ' + repr(error)) raise