|
13 | 13 | # limitations under the License. |
14 | 14 | # |
15 | 15 | # Requires Python 2.7+ |
| 16 | +import os |
16 | 17 | import platform |
17 | 18 | import sys |
18 | 19 | import unittest |
@@ -84,6 +85,42 @@ def mock_linux_distribution_to_return_rhel_10(self): |
84 | 85 |
|
85 | 86 | def mock_distro_os_release_attr_return_rhel_10(self, attribute): |
86 | 87 | return '10.0' |
| 88 | + |
| 89 | + def mock_run_command_output_fde_true(self, cmd, no_output=False, chk_err=False): |
| 90 | + return 0, 'test-vm,/dev/sda1,FDE=true,LUKS:/dev/sda1' |
| 91 | + |
| 92 | + def mock_run_command_output_fde_false(self, cmd, no_output=False, chk_err=False): |
| 93 | + return 0, 'test-vm,/dev/sda1,FDE=false,LUKS:/dev/sda1' |
| 94 | + |
| 95 | + def mock_run_command_output_imds_true(self, cmd, no_output=False, chk_err=False): |
| 96 | + return 0, '"securityProfile": { "encryptionAtHost": "false", "secureBootEnabled": "false", "securityType": "ConfidentialVM", "virtualTpmEnabled": "false"}' |
| 97 | + |
| 98 | + def mock_run_command_output_imds_false(self, cmd, no_output=False, chk_err=False): |
| 99 | + return 0, '{"compute":{"securityProfile":{"securityType":""}}}' |
| 100 | + |
| 101 | + def mock_run_command_raises_exception(self, cmd, no_output=False, chk_err=False): |
| 102 | + raise Exception('Test Exception') |
| 103 | + |
| 104 | + def mock_detect_confidential_vm_by_fde_returns_true(self): |
| 105 | + return True, 'test-vm,/dev/sda1,FDE=true,LUKS:/dev/sda1' |
| 106 | + |
| 107 | + def mock_detect_confidential_vm_by_fde_returns_false(self): |
| 108 | + return False, str() |
| 109 | + |
| 110 | + def mock_detect_confidential_vm_by_imds_returns_true(self): |
| 111 | + return True, 'IMDS:ConfidentialVM' |
| 112 | + |
| 113 | + def mock_detect_confidential_vm_by_imds_returns_false(self): |
| 114 | + return False, str() |
| 115 | + |
| 116 | + def mock_os_remove_raises_exeception(self, path): |
| 117 | + raise Exception('Test Exception') |
| 118 | + |
| 119 | + def mock_os_makedirs_raises_exeception(self, path): |
| 120 | + raise Exception('Test Exception') |
| 121 | + |
| 122 | + def mock_os_path_isdir_returns_false(self, path): |
| 123 | + return False |
87 | 124 | # endregion |
88 | 125 |
|
89 | 126 | def test_get_package_manager(self): |
@@ -138,84 +175,82 @@ def test_is_distro_azure_linux_3(self): |
138 | 175 | distro.os_release_attr = self.backup_envlayer_distro_os_release_attr |
139 | 176 |
|
140 | 177 | def test_detect_confidential_vm_by_fde(self): |
| 178 | + backup_detect_cvm_bash_file_path = Constants.AzGPSPaths.DETECT_CVM |
141 | 179 | backup_run_command_output = self.envlayer.run_command_output |
| 180 | + backup_os_remove = os.remove |
| 181 | + backup_os_path_isdir = os.path.isdir |
| 182 | + backup_os_makedirs = os.makedirs |
142 | 183 |
|
143 | | - self.envlayer.run_command_output = lambda cmd, no_output=False, chk_err=False: (0, 'test-vm,/dev/sda1,FDE=true,LUKS:/dev/sda1') |
144 | | - is_confidential_vm, detection_details = self.envlayer.detect_confidential_vm_by_fde() |
| 184 | + test_input_output_table = [ |
| 185 | + [self.mock_run_command_output_fde_true, backup_os_remove, backup_os_path_isdir, backup_os_makedirs, False, True, 'FDE=true'], |
| 186 | + [self.mock_run_command_output_fde_false, backup_os_remove, backup_os_path_isdir, backup_os_makedirs, False, False, str()], |
| 187 | + [self.mock_run_command_output_fde_true, self.mock_os_remove_raises_exeception, backup_os_path_isdir, backup_os_makedirs, False, True, 'FDE=true'], |
| 188 | + [self.mock_run_command_output_fde_true, backup_os_remove, self.mock_os_path_isdir_returns_false, self.mock_os_makedirs_raises_exeception, True, False, str()], |
| 189 | + [self.mock_run_command_output_fde_true, self.mock_os_remove_raises_exeception, self.mock_os_path_isdir_returns_false, self.mock_os_makedirs_raises_exeception, True, False, str()], |
| 190 | + ] |
145 | 191 |
|
146 | | - self.assertTrue(is_confidential_vm) |
147 | | - self.assertIn('FDE=true', detection_details) |
| 192 | + Constants.AzGPSPaths.DETECT_CVM = os.path.join(os.getcwd(), 'patch.detectcvm.sh') |
| 193 | + for row in test_input_output_table: |
| 194 | + self.envlayer.run_command_output = row[0] |
| 195 | + os.remove = row[1] |
| 196 | + os.path.isdir = row[2] |
| 197 | + os.makedirs = row[3] |
| 198 | + expected_raises_exception = row[4] |
| 199 | + expected_is_confidential_vm = row[5] |
| 200 | + expected_detection_details = row[6] |
| 201 | + |
| 202 | + if expected_raises_exception: |
| 203 | + self.assertRaises(Exception, self.envlayer.detect_confidential_vm_by_fde) |
| 204 | + else: |
| 205 | + is_confidential_vm, detection_details = self.envlayer.detect_confidential_vm_by_fde() |
| 206 | + self.assertEqual(is_confidential_vm, expected_is_confidential_vm) |
| 207 | + self.assertIn(expected_detection_details, detection_details) |
148 | 208 |
|
149 | 209 | self.envlayer.run_command_output = backup_run_command_output |
| 210 | + os.remove = backup_os_remove |
| 211 | + os.path.isdir = backup_os_path_isdir |
| 212 | + os.makedirs = backup_os_makedirs |
| 213 | + Constants.AzGPSPaths.DETECT_CVM = backup_detect_cvm_bash_file_path |
150 | 214 |
|
151 | 215 | def test_detect_confidential_vm_by_imds(self): |
152 | 216 | backup_run_command_output = self.envlayer.run_command_output |
153 | 217 |
|
154 | | - self.envlayer.run_command_output = lambda cmd, no_output=False, chk_err=False: (0, '{"compute":{"securityProfile":{"securityType":"ConfidentialVM"}}}') |
155 | | - is_confidential_vm, detection_details = self.envlayer.detect_confidential_vm_by_imds() |
| 218 | + test_input_output_table = [ |
| 219 | + [self.mock_run_command_output_imds_true, True, 'IMDS:ConfidentialVM'], |
| 220 | + [self.mock_run_command_output_imds_false, False, str()], |
| 221 | + ] |
156 | 222 |
|
157 | | - self.assertTrue(is_confidential_vm) |
158 | | - self.assertEqual('IMDS:ConfidentialVM', detection_details) |
| 223 | + for row in test_input_output_table: |
| 224 | + self.envlayer.run_command_output = row[0] |
| 225 | + is_confidential_vm, detection_details = self.envlayer.detect_confidential_vm_by_imds() |
| 226 | + self.assertEqual(is_confidential_vm, row[1]) |
| 227 | + self.assertIn(row[2], detection_details) |
159 | 228 |
|
160 | 229 | self.envlayer.run_command_output = backup_run_command_output |
161 | 230 |
|
162 | | - def test_detect_confidential_vm_checks_imds_before_fde(self): |
163 | | - backup_platform = self.envlayer.platform |
164 | | - backup_detect_confidential_vm_by_fde = self.envlayer.detect_confidential_vm_by_fde |
165 | | - backup_detect_confidential_vm_by_imds = self.envlayer.detect_confidential_vm_by_imds |
166 | | - |
167 | | - calls = [] |
168 | | - self.envlayer.platform = self.envlayer.Platform() |
169 | | - self.envlayer.platform.os_type = lambda: 'Linux' |
170 | | - |
171 | | - def detect_confidential_vm_by_fde(): |
172 | | - calls.append('fde') |
173 | | - return True, 'test-vm,/dev/sda1,FDE=true,LUKS:/dev/sda1' |
174 | | - |
175 | | - def detect_confidential_vm_by_imds(): |
176 | | - calls.append('imds') |
177 | | - return True, 'IMDS:ConfidentialVM' |
178 | | - |
179 | | - self.envlayer.detect_confidential_vm_by_fde = detect_confidential_vm_by_fde |
180 | | - self.envlayer.detect_confidential_vm_by_imds = detect_confidential_vm_by_imds |
181 | | - |
182 | | - is_confidential_vm, detection_details = self.envlayer.detect_confidential_vm() |
183 | | - |
184 | | - self.assertTrue(is_confidential_vm) |
185 | | - self.assertIn('IMDS:ConfidentialVM', detection_details) |
186 | | - self.assertEqual(['imds'], calls) |
187 | | - |
188 | | - self.envlayer.platform = backup_platform |
189 | | - self.envlayer.detect_confidential_vm_by_fde = backup_detect_confidential_vm_by_fde |
190 | | - self.envlayer.detect_confidential_vm_by_imds = backup_detect_confidential_vm_by_imds |
| 231 | + def test_detect_confidential_vm(self): |
| 232 | + self.backup_platform_system = platform.system |
191 | 233 |
|
192 | | - def test_detect_confidential_vm_checks_fde_when_imds_not_detected(self): |
193 | | - backup_platform = self.envlayer.platform |
194 | 234 | backup_detect_confidential_vm_by_fde = self.envlayer.detect_confidential_vm_by_fde |
195 | 235 | backup_detect_confidential_vm_by_imds = self.envlayer.detect_confidential_vm_by_imds |
196 | 236 |
|
197 | | - calls = [] |
198 | | - self.envlayer.platform = self.envlayer.Platform() |
199 | | - self.envlayer.platform.os_type = lambda: 'Linux' |
200 | | - |
201 | | - def detect_confidential_vm_by_fde(): |
202 | | - calls.append('fde') |
203 | | - return True, 'test-vm,/dev/sda1,FDE=true,LUKS:/dev/sda1' |
204 | | - |
205 | | - def detect_confidential_vm_by_imds(): |
206 | | - calls.append('imds') |
207 | | - return False, str() |
208 | | - |
209 | | - self.envlayer.detect_confidential_vm_by_fde = detect_confidential_vm_by_fde |
210 | | - self.envlayer.detect_confidential_vm_by_imds = detect_confidential_vm_by_imds |
211 | | - |
212 | | - is_confidential_vm, detection_details = self.envlayer.detect_confidential_vm() |
| 237 | + test_input_output_table = [ |
| 238 | + ["Linux", self.mock_detect_confidential_vm_by_fde_returns_true, self.mock_detect_confidential_vm_by_imds_returns_true, True, 'IMDS:ConfidentialVM'], |
| 239 | + ["Linux", self.mock_detect_confidential_vm_by_fde_returns_true, self.mock_detect_confidential_vm_by_imds_returns_false, True, 'FDE=true'], |
| 240 | + ["Windows", self.mock_run_command_output_fde_true, self.mock_run_command_output_imds_true, False, str()], |
| 241 | + ["Linux", self.mock_detect_confidential_vm_by_fde_returns_false, self.mock_detect_confidential_vm_by_imds_returns_false, False, str()], |
| 242 | + ] |
213 | 243 |
|
214 | | - self.assertTrue(is_confidential_vm) |
215 | | - self.assertIn('FDE=true', detection_details) |
216 | | - self.assertEqual(['imds', 'fde'], calls) |
| 244 | + for row in test_input_output_table: |
| 245 | + platform.system = self.mock_platform_system if row[0] == 'Linux' else self.mock_platform_system_windows |
| 246 | + self.envlayer.detect_confidential_vm_by_fde = row[1] |
| 247 | + self.envlayer.detect_confidential_vm_by_imds = row[2] |
| 248 | + is_confidential_vm, detection_details = self.envlayer.detect_confidential_vm() |
| 249 | + self.assertEqual(is_confidential_vm, row[3]) |
| 250 | + self.assertIn(row[4], detection_details) |
217 | 251 |
|
218 | | - self.envlayer.platform = backup_platform |
| 252 | + # restore original methods |
| 253 | + platform.system = self.backup_platform_system |
219 | 254 | self.envlayer.detect_confidential_vm_by_fde = backup_detect_confidential_vm_by_fde |
220 | 255 | self.envlayer.detect_confidential_vm_by_imds = backup_detect_confidential_vm_by_imds |
221 | 256 |
|
|
0 commit comments