Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/core/src/bootstrap/Constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ class Certificates(EnumBackport):
KEK = "KEK"
DB = "DB"

# Confidential VM detection shim script name
DETECT_CVM_SHIM_FILE_NAME = "DetectConfidentialVmShim.sh"

# File to save default settings for auto OS updates
IMAGE_DEFAULT_PATCH_CONFIGURATION_BACKUP_PATH = "ImageDefaultPatchConfiguration.bak"

Expand Down
92 changes: 92 additions & 0 deletions src/core/src/bootstrap/DetectConfidentialVmShim.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#!/usr/bin/env bash
# Copyright 2020 Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -euo pipefail

HOSTNAME=$(hostname)

ROOT_SRC=$(findmnt -n -o SOURCE /)
ROOT_DEV=$(readlink -f "$ROOT_SRC" || echo "$ROOT_SRC")

FDE="false"
DETAILS=""

check_device() {
local dev="$1"

if blkid "$dev" 2>/dev/null | grep -qi 'crypto_LUKS'; then
FDE="true"
DETAILS="LUKS:$dev"
return
fi

local type
type=$(lsblk -dn -o TYPE "$dev" 2>/dev/null || true)

if [[ "$type" == "crypt" ]]; then
FDE="true"
DETAILS="CRYPT:$dev"
return
fi
}

walk_parents() {
local dev="$1"

while [[ -n "$dev" ]]; do
check_device "$dev"

if [[ "$FDE" == "true" ]]; then
return
fi

local parent
parent=$(lsblk -ndo PKNAME "$dev" 2>/dev/null | head -1 || true)

if [[ -z "$parent" ]]; then
break
fi

dev="/dev/$parent"
done
}

walk_parents "$ROOT_DEV"

if [[ "$FDE" != "true" ]]; then
while read -r name type; do
if [[ "$type" == "crypt" ]]; then
mapper="/dev/mapper/$name"

if mount | grep -q "^$mapper on / "; then
FDE="true"
DETAILS="DMCRYPT_ROOT:$mapper"
break
fi
fi
done < <(dmsetup ls --target crypt 2>/dev/null || true)
fi

if [[ "$FDE" != "true" ]]; then
if systemctl list-units 2>/dev/null | grep -qi azure; then
if ls /var/lib/waagent/*Encryption* >/dev/null 2>&1; then
FDE="true"
DETAILS="AZURE_ADE_ARTIFACTS"
fi
fi
fi

echo "$HOSTNAME,$ROOT_DEV,FDE=$FDE,$DETAILS"

57 changes: 57 additions & 0 deletions src/core/src/bootstrap/EnvLayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,63 @@ def get_env_var(self, var_name, raise_if_not_success=False):
raise
return None

def detect_confidential_vm(self):
# type: () -> tuple
"""Returns whether the current VM is a Confidential VM and the detection details."""
if self.platform.os_type() == 'Windows':
return False, str()

is_confidential_vm, detection_details = self.detect_confidential_vm_by_imds()
if is_confidential_vm:
return is_confidential_vm, detection_details

is_confidential_vm, detection_details = self.detect_confidential_vm_by_fde()
if is_confidential_vm:
return is_confidential_vm, detection_details

return False, str()

def detect_confidential_vm_by_fde(self):
# type: () -> tuple
"""Runs the FDE-based CVM detection script and returns whether it detected a Confidential VM."""
command_output = str()

try:
detection_shim_path = self.__get_fde_detection_shim_path()
detection_script = self.file_system.read_with_retry(detection_shim_path)
if detection_script is None or str(detection_script).strip() == str():
raise Exception("FDE_DETECTION_SHIM_EMPTY:{0}".format(str(detection_shim_path)))

code, out = self.run_command_output('bash "{0}"'.format(detection_shim_path), False, False)
command_output = str(out).strip() if out is not None else str()
return code == 0 and re.search(r'\bFDE\s*=\s*true\b', command_output, re.IGNORECASE) is not None, command_output
except Exception as e:
raise Exception("FDE_DETECTION_ERROR:{0}; OUTPUT:{1}".format(str(e), command_output))

@staticmethod
def __get_fde_detection_shim_path():
# type: () -> str
"""Resolves the packaged FDE detection shim path."""
current_dir = os.path.dirname(os.path.realpath(__file__))
shim_path = os.path.join(current_dir, Constants.DETECT_CVM_SHIM_FILE_NAME)

if os.path.isfile(shim_path):
return shim_path

raise Exception("FDE_DETECTION_SHIM_NOT_FOUND:{0}".format(str(shim_path)))

def detect_confidential_vm_by_imds(self):
# type: () -> tuple
"""Queries Azure IMDS and returns whether the VM reports ConfidentialVM security type."""
command = 'curl -s --connect-timeout 2 --max-time 2 -H Metadata:true --noproxy "*" "http://169.254.169.254/metadata/instance?api-version=2025-04-07"'

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a chance this IP address changes and could be spoofed/taken by a malicious actor?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have much insight on this one, simply used the command that was shared by partners to fetch from imds.

Here's Copilot's response:

✅ Short answer

No, the IP does not change
No, it cannot be externally spoofed or taken over by another tenant

🔎 Why this is safe

  1. Fixed, well-known platform endpoint

169.254.169.254 is a hard-coded Azure IMDS endpoint, not assigned from your VNet or subnet. [learn.microsoft.com]
It’s part of the link‑local range (169.254.0.0/16), reserved for host-local communication. [simonpainter.com]

➡️ This means:

It is not customer-controlled
It is not dynamically allocated
Azure guarantees its availability and consistency

  1. Not reachable from outside the VM

The endpoint is non-routable and only accessible from within the VM [learn.microsoft.com]
Traffic to IMDS never leaves the host [learn.microsoft.com]

➡️ So:

No external attacker can “impersonate” this endpoint over the network
It’s effectively a host-side service exposed into the VM

  1. Platform-injected routing

Azure automatically injects a route to 169.254.169.254 via the VM’s primary NIC (not via your VNet config) [simonpainter.com]

➡️ This prevents:

DNS spoofing
Route hijacking from within normal networking config

⚠️ What is a valid concern (and how to address it)
The only realistic risk is:

If an attacker already has root/admin access inside the VM, they could:

Intercept traffic locally
Modify iptables / routing
Run a fake service on that IP

✅ But:

At that point, the VM is already compromised
This is not specific to IMDS (applies to any local endpoint)

💡 Best-practice statement

The IMDS endpoint (169.254.169.254) is a well-known, platform-defined link-local address that is not part of the customer-controlled network and does not change across deployments. It is non-routable and only accessible from within the VM, with traffic handled entirely within the Azure host fabric.
This prevents external spoofing or takeover by other tenants. The only theoretical risk would require local root-level compromise of the VM, in which case any local endpoint (not just IMDS) could be intercepted.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Thanks!


code, out = self.run_command_output(command, False, False)
command_output = str(out).strip() if out is not None else str()
if code == 0 and re.search(r'"securityType"\s*:\s*"ConfidentialVM"', command_output, re.IGNORECASE) is not None:
return True, 'IMDS:ConfidentialVM'

return False, str()

def run_command_output(self, cmd, no_output=False, chk_err=True):
# type: (str, bool, bool) -> (int, any)
""" Wrapper for subprocess.check_output. Execute 'cmd'. Returns return code and STDOUT, trapping expected exceptions. Reports exceptions to Error if chk_err parameter is True """
Expand Down
10 changes: 10 additions & 0 deletions src/core/src/core_logic/PatchInstaller.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,16 @@ def try_update_certificates_for_default_patching(self):
self.composite_logger.log_debug("Not updating certificates since this is not a default patching operation.")
return

try:
is_confidential_vm, detection_details = self.env_layer.detect_confidential_vm()
except Exception as e:
self.composite_logger.log_warning("Unable to determine whether the VM is a Confidential VM before attempting the UEFI certificate update. Continuing with patch installation... [Error: {0}]".format(str(e)))
return

if is_confidential_vm:
self.composite_logger.log("Skipping UEFI certificate update because this VM was detected as a Confidential VM. [Detection={0}]".format(detection_details))
return

try:
self.package_manager.update_certs()
except Exception as e:
Expand Down
10 changes: 5 additions & 5 deletions src/core/src/package_managers/AptitudePackageManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -975,12 +975,12 @@ def try_update_certs(self):

success = False
try:
self.__run_cert_apt_command(self.apt_update_cmd, "AptUpdate", raise_on_error=True)
self.__run_cert_apt_command(self.apt_update_cmd, step_name="AptUpdate", raise_on_error=True)
self.__ensure_fwupd_installation()

# shell fwupd commands to update certificates
self.__run_cert_shell_command(self.fwupd_refresh_cmd, "FwupdRefresh", raise_on_error=True)
self.__run_cert_shell_command(self.fwupd_update_cmd, "FwupdUpdate", raise_on_error=True)
self.__run_cert_shell_command(self.fwupd_refresh_cmd, step_name="FwupdRefresh", raise_on_error=True)
self.__run_cert_shell_command(self.fwupd_update_cmd, step_name="FwupdUpdate", raise_on_error=True)

""" NOTE: They tooling used to update here is fwupd (firmware update manager). In this method of updating certs, the exact version of current certs is never pinned
or set/referred while installing. fwupd fetches and installs latest available certs. This is beneficial because our code doesn't become dated in the future
Expand Down Expand Up @@ -1015,11 +1015,11 @@ def __ensure_fwupd_installation(self):
if installed_version != str():
self.composite_logger.log("[APM][Certs] Existing fwupd version is below minimum. Reinstalling latest. [InstalledVersion={0}][MinimumVersion={1}]"
.format(installed_version, self.min_fwupd_version))
self.__run_cert_apt_command(self.remove_fwupd_cmd, "RemoveOldFwupd", raise_on_error=True)
self.__run_cert_apt_command(self.remove_fwupd_cmd, step_name="RemoveOldFwupd", raise_on_error=True)
else:
self.composite_logger.log_debug("[APM][Certs] fwupd is not installed. Installing latest version.")

self.__run_cert_apt_command(self.install_fwupd_cmd, "InstallFwupd", raise_on_error=True)
self.__run_cert_apt_command(self.install_fwupd_cmd, step_name="InstallFwupd", raise_on_error=True)

# Validate that the installed fwupd meets the minimum requirement.
installed_version = self.__get_installed_fwupd_version()
Expand Down
105 changes: 104 additions & 1 deletion src/core/tests/Test_EnvLayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.
#
# Requires Python 2.7+
import io
import os
import platform
import sys
import unittest
Expand Down Expand Up @@ -85,6 +85,36 @@ def mock_linux_distribution_to_return_rhel_10(self):

def mock_distro_os_release_attr_return_rhel_10(self, attribute):
return '10.0'

def mock_run_command_output_fde_true(self, cmd, no_output=False, chk_err=False):
return 0, 'test-vm,/dev/sda1,FDE=true,LUKS:/dev/sda1'

def mock_run_command_output_fde_false(self, cmd, no_output=False, chk_err=False):
return 0, 'test-vm,/dev/sda1,FDE=false,LUKS:/dev/sda1'

def mock_run_command_output_imds_true(self, cmd, no_output=False, chk_err=False):
return 0, '"securityProfile": { "encryptionAtHost": "false", "secureBootEnabled": "false", "securityType": "ConfidentialVM", "virtualTpmEnabled": "false"}'

def mock_run_command_output_imds_false(self, cmd, no_output=False, chk_err=False):
return 0, '{"compute":{"securityProfile":{"securityType":""}}}'

def mock_detect_confidential_vm_by_fde_returns_true(self):
return True, 'test-vm,/dev/sda1,FDE=true,LUKS:/dev/sda1'

def mock_detect_confidential_vm_by_fde_returns_false(self):
return False, str()

def mock_detect_confidential_vm_by_imds_returns_true(self):
return True, 'IMDS:ConfidentialVM'

def mock_detect_confidential_vm_by_imds_returns_false(self):
return False, str()

def mock_file_system_read_with_retry_returns_empty(self, file_path_or_handle, raise_if_not_found=True):
return str()

def mock_os_path_isfile_returns_false(self, path):
return False
# endregion

def test_get_package_manager(self):
Expand Down Expand Up @@ -138,6 +168,79 @@ def test_is_distro_azure_linux_3(self):
# restore original methods
distro.os_release_attr = self.backup_envlayer_distro_os_release_attr

def test_detect_confidential_vm_by_fde(self):
backup_run_command_output = self.envlayer.run_command_output
backup_read_with_retry = self.envlayer.file_system.read_with_retry
backup_os_path_isfile = os.path.isfile

test_input_output_table = [
[self.mock_run_command_output_fde_true, backup_os_path_isfile, backup_read_with_retry, False, True, 'FDE=true'],
[self.mock_run_command_output_fde_false, backup_os_path_isfile, backup_read_with_retry, False, False, str()],
[self.mock_run_command_output_fde_true, backup_os_path_isfile, self.mock_file_system_read_with_retry_returns_empty, True, False, str()],
[self.mock_run_command_output_fde_true, self.mock_os_path_isfile_returns_false, backup_read_with_retry, True, False, str()],
]

for row in test_input_output_table:
self.envlayer.run_command_output = row[0]
os.path.isfile = row[1]
self.envlayer.file_system.read_with_retry = row[2]
expected_raises_exception = row[3]
expected_is_confidential_vm = row[4]
expected_detection_details = row[5]

if expected_raises_exception:
self.assertRaises(Exception, self.envlayer.detect_confidential_vm_by_fde)
else:
is_confidential_vm, detection_details = self.envlayer.detect_confidential_vm_by_fde()
self.assertEqual(is_confidential_vm, expected_is_confidential_vm)
self.assertIn(expected_detection_details, detection_details)

self.envlayer.run_command_output = backup_run_command_output
self.envlayer.file_system.read_with_retry = backup_read_with_retry
os.path.isfile = backup_os_path_isfile

def test_detect_confidential_vm_by_imds(self):
backup_run_command_output = self.envlayer.run_command_output

test_input_output_table = [
[self.mock_run_command_output_imds_true, True, 'IMDS:ConfidentialVM'],
[self.mock_run_command_output_imds_false, False, str()],
]

for row in test_input_output_table:
self.envlayer.run_command_output = row[0]
is_confidential_vm, detection_details = self.envlayer.detect_confidential_vm_by_imds()
self.assertEqual(is_confidential_vm, row[1])
self.assertIn(row[2], detection_details)

self.envlayer.run_command_output = backup_run_command_output

def test_detect_confidential_vm(self):
self.backup_platform_system = platform.system

backup_detect_confidential_vm_by_fde = self.envlayer.detect_confidential_vm_by_fde
backup_detect_confidential_vm_by_imds = self.envlayer.detect_confidential_vm_by_imds

test_input_output_table = [
["Linux", self.mock_detect_confidential_vm_by_fde_returns_true, self.mock_detect_confidential_vm_by_imds_returns_true, True, 'IMDS:ConfidentialVM'],
["Linux", self.mock_detect_confidential_vm_by_fde_returns_true, self.mock_detect_confidential_vm_by_imds_returns_false, True, 'FDE=true'],
["Windows", self.mock_run_command_output_fde_true, self.mock_run_command_output_imds_true, False, str()],
["Linux", self.mock_detect_confidential_vm_by_fde_returns_false, self.mock_detect_confidential_vm_by_imds_returns_false, False, str()],
]

for row in test_input_output_table:
platform.system = self.mock_platform_system if row[0] == 'Linux' else self.mock_platform_system_windows
self.envlayer.detect_confidential_vm_by_fde = row[1]
self.envlayer.detect_confidential_vm_by_imds = row[2]
is_confidential_vm, detection_details = self.envlayer.detect_confidential_vm()
self.assertEqual(is_confidential_vm, row[3])
self.assertIn(row[4], detection_details)

# restore original methods
platform.system = self.backup_platform_system
self.envlayer.detect_confidential_vm_by_fde = backup_detect_confidential_vm_by_fde
self.envlayer.detect_confidential_vm_by_imds = backup_detect_confidential_vm_by_imds

def test_filesystem(self):
# only validates if these invocable without exceptions
backup_retry_count = Constants.MAX_FILE_OPERATION_RETRY_COUNT
Expand Down
30 changes: 30 additions & 0 deletions src/core/tests/Test_PatchInstaller.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ def tearDown(self):
# region Mocks
def mock_update_certs_raise_exception(self):
raise Exception("Simulated cert update failure")

def mock_detect_confidential_vm_raises_exception(self):
raise Exception("Simulated VM detection failure")

def mock_detect_confidential_vm_by_imds_returns_true(self):
return True, 'IMDS:ConfidentialVM'
# endregion

# region Utility functions (update cert tests)
Expand Down Expand Up @@ -816,6 +822,30 @@ def test_try_update_certs_swallows_exception_from_update_certs(self):

runtime.patch_installer.package_manager.update_certs = backup_up_update_certs
runtime.stop()

def test_try_update_certificates_skips_confidential_vm(self):
runtime = self._create_update_certs_runtime(enable_uefi_cert_update=True, health_store_id="pub_off_sku_2025.01.01")
backup_detect_confidential_vm_by_imds = runtime.env_layer.detect_confidential_vm_by_imds

runtime.env_layer.detect_confidential_vm_by_imds = self.mock_detect_confidential_vm_by_imds_returns_true
method_called = self._track_method_call(runtime.patch_installer.package_manager, 'update_certs')
runtime.patch_installer.start_installation(simulate=True)
self.assertEqual(len(method_called), 0)

runtime.env_layer.detect_confidential_vm_by_imds = backup_detect_confidential_vm_by_imds
runtime.stop()

def test_try_update_certificates_skips_when_detect_confidential_vm_raises_exception(self):
runtime = self._create_update_certs_runtime(enable_uefi_cert_update=True, health_store_id="pub_off_sku_2025.01.01")
backup_detect_confidential_vm = runtime.env_layer.detect_confidential_vm

runtime.env_layer.detect_confidential_vm = self.mock_detect_confidential_vm_raises_exception
method_called = self._track_method_call(runtime.patch_installer.package_manager, 'update_certs')
runtime.patch_installer.start_installation(simulate=True)
self.assertEqual(len(method_called), 0)

runtime.env_layer.detect_confidential_vm = backup_detect_confidential_vm
runtime.stop()
# endregion


Expand Down
9 changes: 9 additions & 0 deletions src/tools/Package-Core.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,15 @@ def main(argv):
external_dependencies_source_code_path = os.path.join(source_code_path, 'external_dependencies')
add_external_dependencies(external_dependencies_destination, external_dependencies_source_code_path)

# Copy core shim files + enforce UNIX style line endings
print('\n========== Copying core shim files + enforcing UNIX style line endings.\n')
core_shim_files = ['DetectConfidentialVmShim.sh']
for core_shim_file in core_shim_files:
core_shim_src = os.path.join(working_directory, 'core', 'src', 'bootstrap', core_shim_file)
core_shim_destination = os.path.join(working_directory, 'out', core_shim_file)
shutil.copyfile(core_shim_src, core_shim_destination)
replace_text_in_file(core_shim_destination, '\r\n', '\n')

except Exception as error:
print('Exception during packaging all python modules in core: ' + repr(error))
raise
Expand Down
Loading