From 9ffc6c1a99f22d7911359d7ad5ea36ce669574c7 Mon Sep 17 00:00:00 2001 From: Abdul Haleem Date: Fri, 12 Jun 2026 14:36:15 +0530 Subject: [PATCH] podman.py: simplified version with abstract methods This is updated version of podman.py with much of complex parameter management moved out of the utililty, now container creation is simplified as most of the args are handled by user and utility has abstract dict of args:values Also removed redundant utils like multiple container creation this can be handled at test level by calling run in for loop also added support for creating and managing containers via root and non root users Signed-off-by: Abdul Haleem --- avocado/utils/podman.py | 1358 +++++++++++++++++++++------------------ 1 file changed, 719 insertions(+), 639 deletions(-) diff --git a/avocado/utils/podman.py b/avocado/utils/podman.py index 4592741834..da3ab21772 100644 --- a/avocado/utils/podman.py +++ b/avocado/utils/podman.py @@ -26,6 +26,7 @@ import json import logging import os +import shlex import subprocess import time from asyncio import create_subprocess_exec @@ -36,6 +37,317 @@ LOG = logging.getLogger(__name__) +def setup_user_and_group(username, password, spyre_group, add_to_group=True, log=None): + """ + Setup user and manage group membership for non-root container execution. + + :param username: Username to setup (None or "root" for root user) + :param password: Password for user (not used for root) + :param spyre_group: Group name to add/remove user to/from (e.g., "spyre_group") + :param add_to_group: True to add user to group, False to remove (default: True) + :param log: Logger instance (optional) + :return: None + """ + if log is None: + log = LOG + + if username is None or username == "root": + if spyre_group: + if add_to_group: + log.info("Add root user to %s group", spyre_group) + subprocess.run(["usermod", "-aG", spyre_group, "root"], + check=False) + else: + log.info("Remove root user from %s group", spyre_group) + subprocess.run(["gpasswd", "-d", "root", spyre_group], + check=False) + else: + # Check if user exists + user_check = subprocess.run( + ["id", "-u", username], + capture_output=True, check=False + ) + + if user_check.returncode != 0: + log.info("Create user: %s", username) + subprocess.run(["useradd", "-m", username], check=False) + # Use input parameter to pass password securely to chpasswd + subprocess.run(["chpasswd"], + input=f"{username}:{password}\n".encode(), + check=False) + + if spyre_group: + if add_to_group: + log.info("Add %s to %s group", username, spyre_group) + subprocess.run(["usermod", "-aG", spyre_group, username], + check=False) + else: + log.info("Remove %s from %s group", username, spyre_group) + subprocess.run(["gpasswd", "-d", username, spyre_group], + check=False) + +def get_container_port(container_id, port=8000, user=None, log=None): + """ + Get the actual host port mapped to a container port. + + :param container_id: Container ID + :param port: Container port to check (default: 8000) + :param user: Username if container was created by specific user + :param log: Logger instance (optional) + :return: Host port number or None + """ + if log is None: + log = LOG + + try: + if user and user != "root": + # Get user ID first + id_result = subprocess.run( + ["id", "-u", user], + capture_output=True, + text=True, + check=False + ) + if id_result.returncode != 0: + log.error("Failed to get user ID for %s", user) + return None + user_id = id_result.stdout.strip() + xdg_runtime_dir = f"/run/user/{user_id}" + # Run podman port command as user with proper environment + result = subprocess.run( + ["su", "-", user, "-c", f"XDG_RUNTIME_DIR={xdg_runtime_dir} podman port {container_id} {port}"], + capture_output=True, + text=True, + check=False + ) + else: + result = subprocess.run( + ["podman", "port", container_id, str(port)], + capture_output=True, + text=True, + check=False + ) + + if result.returncode == 0: + port_output = result.stdout.strip() + log.info("Port mapping output: %s", port_output) + + if port_output and ":" in port_output: + host_port = port_output.strip().split(":")[-1] + log.info( + "Container port %d is mapped to host port: %s", port, host_port) + return int(host_port) + else: + log.warning( + "Could not parse port from output: %s", port_output) + return None + else: + log.error("Failed to get container port: %s", result.stderr) + return None + except Exception as ex: + log.error("Failed to get container port: %s", ex) + return None + + +def save_container_logs(container_id, log_dir, test_name="test", user=None, log=None): + """ + Save complete container logs to a file. + + :param container_id: Container ID + :param log_dir: Directory to save logs + :param test_name: Test name for log file naming (default: "test") + :param user: Username if container was created by specific user + :param log: Logger instance (optional) + :return: Path to saved log file or None + """ + if log is None: + log = LOG + + try: + # Create logs directory + logs_dir = os.path.join(log_dir, "container_logs") + os.makedirs(logs_dir, exist_ok=True) + + # Generate log filename + timestamp = time.strftime("%Y%m%d_%H%M%S") + log_filename = f"{test_name}_{container_id[:12]}_{timestamp}.log" + log_filepath = os.path.join(logs_dir, log_filename) + + log.info("Saving complete container logs to: %s", log_filepath) + + # Get complete logs based on user context + if user and user != "root": + # Get user ID first + id_result = subprocess.run( + ["id", "-u", user], + capture_output=True, + text=True, + check=False + ) + if id_result.returncode != 0: + log.error("Failed to get user ID for %s", user) + return None + user_id = id_result.stdout.strip() + xdg_runtime_dir = f"/run/user/{user_id}" + # Run podman logs command as user with proper environment + result = subprocess.run( + ["su", "-", user, "-c", f"XDG_RUNTIME_DIR={xdg_runtime_dir} podman logs {container_id}"], + capture_output=True, + text=True, + check=False, + timeout=60 + ) + else: + result = subprocess.run( + ["podman", "logs", container_id], + capture_output=True, + text=True, + check=False, + timeout=60 + ) + + if result.returncode == 0: + log_content = result.stdout + else: + log_content = f"Error retrieving logs:\n{result.stderr}\n\nPartial stdout:\n{result.stdout}" + + # Save logs to file + with open(log_filepath, 'w', encoding='utf-8') as f: + f.write(f"Container ID: {container_id}\n") + f.write(f"Test Name: {test_name}\n") + f.write(f"User: {user if user else 'root'}\n") + f.write(f"Timestamp: {timestamp}\n") + f.write("=" * 80 + "\n\n") + f.write(log_content) + + log.info("Container logs saved successfully (%d bytes)", len(log_content)) + return log_filepath + + except subprocess.TimeoutExpired: + log.warning("Timeout while retrieving container logs") + return None + except Exception as ex: + log.warning("Failed to save container logs: %s", ex) + +def wait_for_vllm_startup( + container_id, + success_pattern="Application startup complete.", + failure_pattern=None, + additional_failure_checks=None, + timeout=300, + check_interval=20, + user=None, + log=None, + show_live_logs=True, + live_log_lines=20 +): + """ + Wait for container to start by checking logs for a success pattern. + + This is a generic function that can be used for any container type. + The success and failure patterns can be customized based on the application. + + :param container_id: Container ID to monitor + :param success_pattern: String pattern to look for in logs indicating successful startup + :param failure_pattern: Optional string pattern indicating startup failure (e.g., "BACKTRACE") + :param additional_failure_checks: Optional list of tuples [(pattern, case_sensitive), ...] + for additional failure detection. Example: + [("VFIO", False), ("fail", False)] checks for "VFIO" and "fail" + :param timeout: Maximum time to wait in seconds (default: 300) + :param check_interval: Time between log checks in seconds (default: 10) + :param user: Username if container was created by specific user + :param log: Logger instance (optional) + :param show_live_logs: If True, display recent log lines during each check (default: True) + :param live_log_lines: Number of recent log lines to display (default: 10) + :return: True if startup successful, False otherwise + """ + if log is None: + log = LOG + + elapsed = 0 + last_log_position = 0 + + while elapsed < timeout: + try: + # Get container logs directly using capture_output + if user and user != "root": + # Get user ID first + id_result = subprocess.run( + ["id", "-u", user], + capture_output=True, + text=True, + check=False + ) + if id_result.returncode != 0: + log.warning("Failed to get user ID for %s", user) + time.sleep(check_interval) + elapsed += check_interval + continue + user_id = id_result.stdout.strip() + xdg_runtime_dir = f"/run/user/{user_id}" + # Run podman logs command as user with proper environment + result = subprocess.run( + ["su", "-", user, "-c", f"XDG_RUNTIME_DIR={xdg_runtime_dir} podman logs {container_id}"], + capture_output=True, + text=True, + check=False, + timeout=30 + ) + else: + # For root user, run directly + result = subprocess.run( + ["podman", "logs", container_id], + capture_output=True, + text=True, + check=False, + timeout=30 + ) + # Combine stdout and stderr + log_content = result.stdout + result.stderr + if show_live_logs and log_content: + log_lines = log_content.split('\n') + current_log_length = len(log_lines) + if current_log_length > last_log_position: + new_lines = log_lines[last_log_position:] + display_lines = new_lines[-live_log_lines:] if len(new_lines) > live_log_lines else new_lines + if display_lines: + log.info("=== Recent Container Logs ===") + for line in display_lines: + if line.strip(): # Only show non-empty lines + log.info(" %s", line) + log.info("=== End Logs ===") + last_log_position = current_log_length + if failure_pattern and failure_pattern in log_content: + log.error("%s detected in container logs - startup failed", failure_pattern) + log.error("Container logs:\n%s", log_content) + return False + if additional_failure_checks: + for pattern, case_sensitive in additional_failure_checks: + check_content = log_content if case_sensitive else log_content.lower() + check_pattern = pattern if case_sensitive else pattern.lower() + if check_pattern in check_content: + log.error("Failure pattern '%s' detected in logs", pattern) + log.error("Container logs:\n%s", log_content) + return False + if success_pattern in log_content: + log.info("✓ Container started successfully: %s", container_id) + return True + log.info("Waiting for container startup... (%d/%d seconds)", elapsed, timeout) + time.sleep(check_interval) + elapsed += check_interval + + except subprocess.TimeoutExpired: + log.warning("Timeout getting container logs") + time.sleep(check_interval) + elapsed += check_interval + except Exception as ex: + log.warning("Failed to get container logs: %s", ex) + time.sleep(check_interval) + elapsed += check_interval + + log.error("Timeout waiting for container startup") + return False + def install_huggingface_cli(): """ Install Hugging Face CLI if not already installed. @@ -43,44 +355,77 @@ def install_huggingface_cli(): :return: True if installed successfully, False otherwise """ try: - result = subprocess.run( - ["huggingface-cli", "--version"], - capture_output=True, - text=True, - check=False, - ) - if result.returncode == 0: - LOG.info("Hugging Face CLI already installed: %s", result.stdout.strip()) - return True + # Check if huggingface-cli is already available + hf_cli_path = which("hf") + if hf_cli_path: + LOG.info("Hugging Face CLI already installed at: %s", hf_cli_path) + try: + result = subprocess.run( + [hf_cli_path, "--version"], + capture_output=True, + text=True, + check=False, + timeout=10 + ) + if result.returncode == 0: + LOG.info("Hugging Face CLI version: %s", + result.stdout.strip()) + return True + except Exception as e: + LOG.warning("Could not verify HF CLI version: %s", e) + return True # CLI exists, assume it works + + LOG.info("Hugging Face CLI not found, installing...") + + # Try to install using pip + pip_path = which("pip") or which("pip3") + if not pip_path: + LOG.error("pip not found, cannot install Hugging Face CLI") + return False - LOG.info("Installing Hugging Face CLI...") + LOG.info("Installing huggingface_hub[cli] using: %s", pip_path) result = subprocess.run( - ["pip", "install", "-U", "huggingface_hub[cli]"], + [pip_path, "install", "-U", "huggingface_hub[cli]"], capture_output=True, text=True, check=False, + timeout=300 ) if result.returncode == 0: LOG.info("Hugging Face CLI installed successfully") - verify_result = subprocess.run( - ["huggingface-cli", "--version"], - capture_output=True, - text=True, - check=False, - ) - if verify_result.returncode == 0: - LOG.info("Verified installation: %s", verify_result.stdout.strip()) + LOG.debug("Installation output: %s", result.stdout) + + # Verify installation + hf_cli_path = which("hf") + if hf_cli_path: + LOG.info("Verified: huggingface-cli found at %s", hf_cli_path) return True + else: + LOG.warning("huggingface-cli installed but not found in PATH") + # Try common installation paths + common_paths = [ + os.path.expanduser("~/.local/bin/hf"), + "/usr/local/bin/hf", + "/usr/bin/hf" + ] + for path in common_paths: + if os.path.exists(path): + LOG.info("Found huggingface-cli at: %s", path) + return True + return False + else: + LOG.error("Failed to install Hugging Face CLI") + LOG.error("Error output: %s", result.stderr) + return False - LOG.error("Failed to install Hugging Face CLI") + except subprocess.TimeoutExpired: + LOG.error("Timeout while installing Hugging Face CLI") return False - except Exception as ex: LOG.error("Error installing Hugging Face CLI: %s", ex) return False - def download_model_from_hf(hf_model_id, local_dir, model_name): """ Download a model from Hugging Face Hub. @@ -108,7 +453,7 @@ def download_model_from_hf(hf_model_id, local_dir, model_name): LOG.info(" Destination: %s", model_path) result = subprocess.run( - ["huggingface-cli", "download", "--local-dir", model_path, hf_model_id], + ["hf", "download", "--local-dir", model_path, hf_model_id], capture_output=True, text=True, timeout=3600, @@ -138,6 +483,90 @@ def download_model_from_hf(hf_model_id, local_dir, model_name): LOG.error("Error downloading model: %s", ex) return False +def validate_model_with_sha(model_path): + """ + Validate model files by checking SHA256 checksums if available. + + :param model_path: Path to the model directory + :return: Tuple of (is_valid, validation_messages) + """ + import hashlib + import glob + validation_messages = [] + is_valid = True + required_files = [ + "config.json", + "tokenizer.json", + "tokenizer_config.json" + ] + for req_file in required_files: + file_path = os.path.join(model_path, req_file) + if not os.path.exists(file_path): + validation_messages.append(f"MISSING: {req_file}") + is_valid = False + elif os.path.getsize(file_path) == 0: + validation_messages.append(f"EMPTY: {req_file}") + is_valid = False + else: + validation_messages.append( + f"OK: {req_file} ({os.path.getsize(file_path)} bytes)") + weight_patterns = ["*.safetensors", "*.bin", "pytorch_model*.bin"] + weight_files = [] + for pattern in weight_patterns: + weight_files.extend(glob.glob(os.path.join(model_path, pattern))) + if not weight_files: + validation_messages.append("MISSING: No model weight files found") + is_valid = False + else: + validation_messages.append(f"Found {len(weight_files)} weight file(s)") + for weight_file in weight_files: + file_size = os.path.getsize(weight_file) + if file_size == 0: + validation_messages.append( + f"EMPTY: {os.path.basename(weight_file)}") + is_valid = False + else: + validation_messages.append( + f"OK: {os.path.basename(weight_file)} ({file_size} bytes)") + sha_file = os.path.join(model_path, "SHA256SUMS") + + if os.path.exists(sha_file): + validation_messages.append( + f"Found checksum file: {os.path.basename(sha_file)}") + try: + with open(sha_file, 'r') as f: + sha_content = f.read() + for weight_file in weight_files: + filename = os.path.basename(weight_file) + if filename in sha_content: + validation_messages.append( + f"Validating SHA256 for: {filename}") + sha256_hash = hashlib.sha256() + try: + with open(weight_file, "rb") as f: + for byte_block in iter(lambda: f.read(4096), b""): + sha256_hash.update(byte_block) + actual_sha = sha256_hash.hexdigest() + if actual_sha[:16] in sha_content: + validation_messages.append( + f"SHA256 VALID: {filename}") + else: + validation_messages.append( + f"SHA256 MISMATCH: {filename}") + validation_messages.append( + f" Calculated: {actual_sha[:16]}...") + is_valid = False + except Exception as sha_ex: + validation_messages.append( + f"SHA256 calculation failed for {filename}: {sha_ex}") + except Exception as ex: + validation_messages.append(f"Failed to read checksum file: {ex}") + else: + validation_messages.append( + "No checksum file found - skipping SHA validation") + + return is_valid, validation_messages + class PodmanException(Exception): pass @@ -167,17 +596,24 @@ def __init__(self, podman_bin=None): class Podman(_Podman): - def execute(self, *args): + def execute(self, *args, user=None): """Execute a command and return the returncode, stdout and stderr. :param args: Variable length argument list to be used as argument during execution. + :param user: Optional user to run command as (uses 'su - user -c') :rtype: tuple with returncode, stdout and stderr. """ try: LOG.debug("Executing %s", args) - cmd = [self.podman_bin, *args] + if user: + import shlex + podman_cmd = [self.podman_bin] + list(args) + podman_cmd_str = " ".join(shlex.quote(str(arg)) for arg in podman_cmd) + cmd = ["su", "-", user, "-c", podman_cmd_str] + else: + cmd = [self.podman_bin, *args] with subprocess.Popen( cmd, stdin=subprocess.DEVNULL, @@ -189,7 +625,7 @@ def execute(self, *args): LOG.debug("Stdout: %s", stdout.decode("utf-8", "replace")) LOG.debug("Stderr: %s", stderr.decode("utf-8", "replace")) if proc.returncode: - command_args = " ".join(args) + command_args = " ".join(str(a) for a in args) msg = f'Failure from command "{self.podman_bin} {command_args}": returned code "{proc.returncode}" stderr: "{stderr}"' LOG.error(msg) raise PodmanException(msg) @@ -200,7 +636,7 @@ def execute(self, *args): LOG.error("%s: %s", msg, str(ex)) raise PodmanException(msg) from ex - def copy_to_container(self, container_id, src, dst): + def copy_to_container(self, container_id, src, dst, user=None): """Copy artifacts from src to container:dst. This method allows copying the contents of src to the dst. Files will @@ -210,10 +646,11 @@ def copy_to_container(self, container_id, src, dst): :param str container_id: string with the container identification. :param str src: what file or directory you are trying to copy. :param str dst: the destination inside the container. + :param str user: Optional user to run command as. :rtype: tuple with returncode, stdout and stderr. """ try: - return self.execute("cp", src, f"{container_id}:{dst}") + return self.execute("cp", src, f"{container_id}:{dst}", user=user) except PodmanException as ex: error = f"Failed copying data to container {container_id}" LOG.error(error) @@ -237,58 +674,64 @@ def get_python_version(self, image): return int(output[0]), int(output[1]), output[2] return None - def get_container_info(self, container_id): + def get_container_info(self, container_id, user=None): """Return all information about specific container. :param container_id: identifier of container :type container_id: str + :param user: Optional user to run command as + :type user: str :rtype: dict """ try: _, stdout, _ = self.execute( - "ps", "--all", "--format=json", "--filter", f"id={container_id}" + "ps", "--all", "--format=json", "--filter", f"id={container_id}", + user=user ) except PodmanException as ex: raise PodmanException( f"Failed getting information about container: {container_id}." ) from ex containers = json.loads(stdout.decode()) - for container in containers: - if container.get("Id") == container_id: - return container + # Return first container if found (filter by id should return only one) + if containers and len(containers) > 0: + return containers[0] return {} - def start(self, container_id): + def start(self, container_id, user=None): """Starts a container and return the returncode, stdout and stderr. :param str container_id: Container identification string to start. + :param str user: Optional user to run command as. :rtype: tuple with returncode, stdout and stderr. """ try: - return self.execute("start", container_id) + return self.execute("start", container_id, user=user) except PodmanException as ex: raise PodmanException("Failed to start the container.") from ex - def stop(self, container_id): + def stop(self, container_id, user=None): """Stops a container and return the returncode, stdout and stderr. :param str container_id: Container identification string to stop. + :param str user: Optional user to run command as. :rtype: tuple with returncode, stdout and stderr. """ try: - return self.execute("stop", "-t=0", container_id) + return self.execute("stop", "-t=0", container_id, user=user) except PodmanException as ex: raise PodmanException("Failed to stop the container.") from ex - def restart(self, container_id, timeout=10): + def restart(self, container_id, timeout=10, user=None): """Restarts a container and return the returncode, stdout and stderr. :param str container_id: Container identification string to restart. :param int timeout: Timeout in seconds to wait for container to stop before killing it (default: 10). + :param str user: Optional user to run command as. :rtype: tuple with returncode, stdout and stderr. """ try: - return self.execute("restart", f"-t={timeout}", container_id) + return self.execute("restart", f"-t={timeout}", container_id, user=user) except PodmanException as ex: raise PodmanException("Failed to restart the container.") from ex @@ -301,16 +744,18 @@ def exec_command( env=None, interactive=False, tty=False, + podman_user=None, ): """Execute a command in a running container. :param str container_id: Container identification string. :param str or list command: Command to execute. Can be a string or list of arguments. - :param str user: Optional user to run the command as (e.g., "root", "1000", "1000:1000"). + :param str user: Optional user to run the command as inside container (e.g., "root", "1000", "1000:1000"). :param str workdir: Optional working directory for the command. :param dict env: Optional dictionary of environment variables to set. :param bool interactive: Keep STDIN open even if not attached (default: False). :param bool tty: Allocate a pseudo-TTY (default: False). + :param str podman_user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout and stderr. """ try: @@ -337,7 +782,7 @@ def exec_command( else: raise PodmanException("Command must be a string or list") - return self.execute(*cmd_args) + return self.execute(*cmd_args, user=podman_user) except PodmanException as ex: raise PodmanException( f"Failed to execute command in container {container_id}." @@ -350,6 +795,7 @@ def collect_container_aiu_metrics( dtcompiler_export_dir=None, stats_command="aiu-smi --csv", timeout=None, + user=None, ): """ Start collecting AIU metrics from a container in the background using nohup. @@ -364,6 +810,7 @@ def collect_container_aiu_metrics( (default: "aiu-smi --csv"). :param int timeout: Optional timeout in seconds. If specified, stats collection will automatically stop after this duration. If None, collection runs indefinitely. + :param str user: Optional system user to run podman command as. :return: Process object for the background process. :rtype: subprocess.Popen """ @@ -379,19 +826,18 @@ def collect_container_aiu_metrics( podman_cmd.extend(["bash", "--login", "-c", stats_command]) + # Build the command string using shlex.join to properly quote arguments + podman_cmd_str = shlex.join(podman_cmd) if timeout: - full_command = f"nohup bash -c 'timeout {timeout} {' '.join(podman_cmd)} | tee {output_file}' > /dev/null 2>&1 &" - LOG.info( - "Starting background stats collection with %d second timeout: %s", - timeout, - full_command, - ) + base_command = f"timeout {timeout} {podman_cmd_str} | tee {output_file}" else: - full_command = f"nohup bash -c '{' '.join(podman_cmd)} | tee {output_file}' > /dev/null 2>&1 &" - LOG.info( - "Starting background stats collection (no timeout): %s", - full_command, - ) + base_command = f"{podman_cmd_str} | tee {output_file}" + # Wrap with user if specified + if user: + full_command = f"nohup su - {user} -c {shlex.quote(base_command)} > /dev/null 2>&1 &" + else: + full_command = f"nohup bash -c {shlex.quote(base_command)} > /dev/null 2>&1 &" + LOG.info("Starting background stats collection: %s", full_command) process = subprocess.Popen( full_command, @@ -410,7 +856,7 @@ def collect_container_aiu_metrics( raise PodmanException(error_msg) from ex def collect_container_stats( - self, container_id, output_dir, interval=1, duration=60 + self, container_id, output_dir, interval=1, duration=60, user=None ): """ Collect Podman CPU and memory stats and write as JSON. @@ -424,12 +870,13 @@ def collect_container_stats( Container-specific subdirectories will be created. :param int interval: Interval in seconds between stat collections (default: 1). :param int duration: Total duration in seconds to collect stats (default: 60). + :param str user: Optional system user to run podman command as. :return: Path to the generated JSON stats file (or list of paths if container_id="all"). :rtype: str or list """ try: if container_id.lower() == "all": - _, stdout, _ = self.execute("ps", "--format", "{{.ID}}") + _, stdout, _ = self.execute("ps", "--format", "{{.ID}}", user=user) container_ids = stdout.decode().strip().split("\n") container_ids = [cid.strip() for cid in container_ids if cid.strip()] @@ -442,14 +889,14 @@ def collect_container_stats( json_files = [] for cid in container_ids: json_file = self._collect_single_container_stats( - cid, output_dir, interval, duration + cid, output_dir, interval, duration, user ) json_files.append(json_file) return json_files else: return self._collect_single_container_stats( - container_id, output_dir, interval, duration + container_id, output_dir, interval, duration, user ) except Exception as ex: @@ -458,7 +905,7 @@ def collect_container_stats( raise PodmanException(error_msg) from ex def _collect_single_container_stats( - self, container_id, base_output_dir, interval, duration + self, container_id, base_output_dir, interval, duration, user=None ): """ Internal method to collect stats for a single container. @@ -467,6 +914,7 @@ def _collect_single_container_stats( :param str base_output_dir: Base directory for output. :param int interval: Interval between collections. :param int duration: Total duration. + :param str user: Optional system user to run podman command as. :return: Path to JSON stats file. :rtype: str """ @@ -488,7 +936,7 @@ def _collect_single_container_stats( while time.time() < end_time: try: _, stdout, stderr = self.execute( - "stats", "--no-stream", "--format", "json", container_id + "stats", "--no-stream", "--format", "json", container_id, user=user ) if stderr: @@ -630,208 +1078,54 @@ def send_vllm_inference_request( LOG.error("%s: %s", error_msg, ex) raise PodmanException(error_msg) from ex - def run_vllm_container( - self, - image, - aiu_ids, - host_models_dir, - vllm_model_path, - aiu_world_size, - max_model_len, - max_batch_size, - memory="100G", - shm_size="2G", - device="/dev/vfio", - privileged="true", - pids_limit="0", - userns="keep-id", - group_add="keep-groups", - port_mapping="127.0.0.1::8000", - vllm_spyre_use_cb=None, - vllm_dt_chunk_len=None, - vllm_spyre_use_chunked_prefill=None, - enable_prefix_caching=None, - max_num_batched_tokens=None, - additional_vllm_args=None, - container_name=None, - user=None, - dtlog_level=None, - dtcompiler_keep_export=None, - vllm_spyre_require_precompiled_decoders=None, - enable_flex_timing=None, - flex_print_end_to_end_breakdown=None, - flex_skip_timestamp_calibration=None, - flex_scheduler_print_raw_timestamps=None, - flex_global_profile_prefix=None, - ): - """Run a VLLM container with AIU support. - - :param str image: Container image to run. - :param str aiu_ids: Space-separated AIU PCIe IDs. - :param str host_models_dir: Host directory containing models. - :param str vllm_model_path: Path to model inside container. - :param int aiu_world_size: AIU world size (tensor parallelism). - :param int max_model_len: Maximum model length. - :param int max_batch_size: Maximum batch size. - :param str memory: Memory limit (default: "100G" - adjust based on model size). - :param str shm_size: Shared memory size (default: "2G" - increase for larger batches). - :param str device: Device to mount (default: "/dev/vfio"). - :param str privileged: Run in privileged mode (default: "true"). - :param str pids_limit: PIDs limit (default: "0" - unlimited). - :param str userns: User namespace mode (default: "keep-id"). - :param str group_add: Group add mode (default: "keep-groups"). - :param str port_mapping: Port mapping in format - ``[host_ip]:[host_port]:container_port`` (default: - ``"127.0.0.1::8000"`` - localhost with random host port to - container port 8000). Examples: - ``"127.0.0.1:8000:8000"`` (fixed host port 8000), - ``"0.0.0.0::8000"`` (all interfaces, random host port), - ``"::8000"`` (all interfaces, random host port). - :param str vllm_spyre_use_cb: Optional VLLM Spyre use CB flag (e.g., "1"). If None, not set. - :param int vllm_dt_chunk_len: Optional DT chunk length. If None, not set. - :param int vllm_spyre_use_chunked_prefill: Optional chunked prefill flag. If None, not set. - :param bool enable_prefix_caching: Optional enable VLLM prefix caching. If None, not set. - :param int max_num_batched_tokens: Optional maximum number of batched tokens. If None, not set. - :param list additional_vllm_args: Additional VLLM command-line arguments as list of strings. - :param str container_name: Optional container name. - :param str user: Optional user to run container as (e.g., "1000:1000"). - :param str dtlog_level: Optional DT log level (e.g., "warning"). If None, not set. - :param str dtcompiler_keep_export: Optional DT compiler keep export flag (e.g., "true"). If None, not set. - :param str vllm_spyre_require_precompiled_decoders: Optional require precompiled decoders flag (e.g., "0"). If None, not set. - :param str enable_flex_timing: Optional enable flex timing flag (e.g., "0"). If None, not set. - :param str flex_print_end_to_end_breakdown: Optional flex print end-to-end breakdown flag (e.g., "0"). If None, not set. - :param str flex_skip_timestamp_calibration: Optional flex skip timestamp calibration flag (e.g., "0"). If None, not set. - :param str flex_scheduler_print_raw_timestamps: Optional flex scheduler print raw timestamps flag (e.g., "0"). If None, not set. - :param str flex_global_profile_prefix: Optional flex global profile prefix (e.g., "flex-logs"). If None, not set. - :rtype: tuple with returncode, stdout (container ID), stderr. + def run(self, podman_options=None, user=None): """ - cmd_args = [ - "run", - "-d", - "-it", - f"--device={device}", - "-v", - f"{host_models_dir}:/models", - "-e", - f"AIU_PCIE_IDS={aiu_ids}", - f"--privileged={privileged}", - "--pids-limit", - pids_limit, - f"--userns={userns}", - f"--group-add={group_add}", - "--memory", - memory, - "--shm-size", - shm_size, - "-p", - port_mapping, - ] - - if vllm_spyre_use_cb is not None: - cmd_args.extend(["-e", f"VLLM_SPYRE_USE_CB={vllm_spyre_use_cb}"]) - if vllm_dt_chunk_len is not None: - cmd_args.extend(["-e", f"VLLM_DT_CHUNK_LEN={vllm_dt_chunk_len}"]) - if vllm_spyre_use_chunked_prefill is not None: - cmd_args.extend( - [ - "-e", - f"VLLM_SPYRE_USE_CHUNKED_PREFILL={vllm_spyre_use_chunked_prefill}", - ] - ) - - if dtlog_level is not None: - cmd_args.extend(["-e", f"DTLOG_LEVEL={dtlog_level}"]) - if dtcompiler_keep_export is not None: - cmd_args.extend(["-e", f"DTCOMPILER_KEEP_EXPORT={dtcompiler_keep_export}"]) - if vllm_spyre_require_precompiled_decoders is not None: - cmd_args.extend( - [ - "-e", - f"VLLM_SPYRE_REQUIRE_PRECOMPILED_DECODERS={vllm_spyre_require_precompiled_decoders}", - ] - ) - if enable_flex_timing is not None: - cmd_args.extend(["-e", f"ENABLE_FLEX_TIMING={enable_flex_timing}"]) - if flex_print_end_to_end_breakdown is not None: - cmd_args.extend( - [ - "-e", - f"FLEX_PRINT_END_TO_END_BREAKDOWN={flex_print_end_to_end_breakdown}", - ] - ) - if flex_skip_timestamp_calibration is not None: - cmd_args.extend( - [ - "-e", - f"FLEX_SKIP_TIMESTAMP_CALIBRATION={flex_skip_timestamp_calibration}", - ] - ) - if flex_scheduler_print_raw_timestamps is not None: - cmd_args.extend( - [ - "-e", - f"FLEX_SCHEDULER_PRINT_RAW_TIMESTAMPS={flex_scheduler_print_raw_timestamps}", - ] - ) - if flex_global_profile_prefix is not None: - cmd_args.extend( - ["-e", f"FLEX_GLOBAL_PROFILE_PREFIX={flex_global_profile_prefix}"] - ) - - if user is not None: - cmd_args.extend(["--user", user]) - - if container_name: - cmd_args.extend(["--name", container_name]) - - cmd_args.extend( - [ - image, - "--model", - vllm_model_path, - "-tp", - str(aiu_world_size), - "--max-model-len", - str(max_model_len), - "--max-num-seqs", - str(max_batch_size), - ] - ) - - if enable_prefix_caching is not None and enable_prefix_caching: - cmd_args.append("--enable-prefix-caching") + Run a container with command-line arguments as a list. + + This is a simplified generic container runner that accepts a list + of command-line arguments to pass to 'podman run'. + + :param list podman_options: List of command-line arguments for 'podman run'. + Example: ["-d", "--name", "test", "-e", "VAR=value", + "--memory", "100G", "image:tag", "--model", "/path"] + :param str user: Optional user to run podman command as (uses 'su - user -c') + :return: tuple (returncode, stdout, stderr) + """ + if not podman_options: + raise PodmanException("podman_options is required") - if max_num_batched_tokens is not None: - cmd_args.extend(["--max-num-batched-tokens", str(max_num_batched_tokens)]) + if not isinstance(podman_options, list): + raise PodmanException("podman_options must be a list of strings") - if additional_vllm_args: - cmd_args.extend(additional_vllm_args) + cmd_args = ["run"] + podman_options try: - return self.execute(*cmd_args) + return self.execute(*cmd_args, user=user) except PodmanException as ex: - raise PodmanException("Failed to run VLLM container.") from ex + raise PodmanException("Failed to run container.") from ex - def list_containers(self, all_containers=True): + def list_containers(self, all_containers=True, user=None): """List containers. :param bool all_containers: If True, list all containers including stopped ones. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout (JSON list), stderr. """ try: args = ["ps", "--format=json"] if all_containers: args.append("--all") - return self.execute(*args) + return self.execute(*args, user=user) except PodmanException as ex: raise PodmanException("Failed to list containers.") from ex - def logs(self, container_id, follow=False, tail=None): + def logs(self, container_id, follow=False, tail=None, user=None): """Get container logs. :param str container_id: Container identification string. :param bool follow: If True, follow log output. :param int tail: Number of lines to show from the end of the logs. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout, stderr. """ try: @@ -841,30 +1135,32 @@ def logs(self, container_id, follow=False, tail=None): if tail is not None: args.extend(["--tail", str(tail)]) args.append(container_id) - return self.execute(*args) + return self.execute(*args, user=user) except PodmanException as ex: raise PodmanException( f"Failed to get logs for container {container_id}." ) from ex - def inspect(self, container_id): + def inspect(self, container_id, user=None): """Inspect a container and return detailed information. :param str container_id: Container identification string. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout (JSON), stderr. """ try: - return self.execute("inspect", container_id) + return self.execute("inspect", container_id, user=user) except PodmanException as ex: raise PodmanException( f"Failed to inspect container {container_id}." ) from ex - def remove(self, container_id, force=False): + def remove(self, container_id, force=False, user=None): """Remove a container. :param str container_id: Container identification string. :param bool force: If True, force removal of running container. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout, stderr. """ try: @@ -872,7 +1168,7 @@ def remove(self, container_id, force=False): if force: args.append("--force") args.append(container_id) - return self.execute(*args) + return self.execute(*args, user=user) except PodmanException as ex: raise PodmanException(f"Failed to remove container {container_id}.") from ex @@ -884,6 +1180,7 @@ def login( api_key=None, api_key_username="iamapikey", password_stdin=False, + user=None, ): """Login to a container registry. @@ -894,6 +1191,7 @@ def login( :param str api_key_username: Username to use with API key authentication (default: "iamapikey" for IBM Cloud). Other registries may use different conventions (e.g., "oauth2accesstoken" for GCR). :param bool password_stdin: If True, read password from stdin. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout, stderr. """ try: @@ -916,26 +1214,28 @@ def login( ) args.append(registry) - return self.execute(*args) + return self.execute(*args, user=user) except PodmanException as ex: raise PodmanException(f"Failed to login to registry {registry}.") from ex - def pull(self, image): + def pull(self, image, user=None): """Pull an image from a registry. :param str image: Image name to pull. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout, stderr. """ try: - return self.execute("pull", image) + return self.execute("pull", image, user=user) except PodmanException as ex: raise PodmanException(f"Failed to pull image {image}.") from ex - def stats(self, container_id, no_stream=True): + def stats(self, container_id, no_stream=True, user=None): """Get container resource usage statistics. :param str container_id: Container identification string. :param bool no_stream: If True, output stats once and exit. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout, stderr. """ try: @@ -943,30 +1243,69 @@ def stats(self, container_id, no_stream=True): if no_stream: args.append("--no-stream") args.append(container_id) - return self.execute(*args) + return self.execute(*args, user=user) except PodmanException as ex: raise PodmanException( f"Failed to get stats for container {container_id}." ) from ex + def get_container_port(self, container_id, port=8000, user=None): + """ + Get the actual host port mapped to a container port. + + :param container_id: Container ID + :param port: Container port to check (default: 8000) + :param user: Username if container was created by specific user + :return: Host port number or None + """ + return get_container_port(container_id, port, user, LOG) + + def save_container_logs(self, container_id, log_dir, test_name="test", user=None): + """ + Save complete container logs to a file. + + :param container_id: Container ID + :param log_dir: Directory to save logs + :param test_name: Test name for log file naming (default: "test") + :param user: Username if container was created by specific user + :return: Path to saved log file or None + """ + return save_container_logs(container_id, log_dir, test_name, user, LOG) + + class AsyncPodman(_Podman): - async def execute(self, *args): + + async def execute(self, *args, user=None): """Execute a command and return the returncode, stdout and stderr. :param args: Variable length argument list to be used as argument during execution. + :param user: Optional system user to run podman command as (uses 'su - user -c'). :rtype: tuple with returncode, stdout and stderr. """ try: LOG.debug("Executing %s", args) - proc = await create_subprocess_exec( - self.podman_bin, - *args, - stdout=asyncio_subprocess.PIPE, - stderr=asyncio_subprocess.PIPE, - ) + if user and user != "root": + import shlex + # Build the full podman command + podman_cmd = [self.podman_bin] + list(args) + podman_cmd_str = " ".join(shlex.quote(str(arg)) for arg in podman_cmd) + su_cmd = ["su", "-", user, "-c", podman_cmd_str] + LOG.debug("Executing as user %s: %s", user, su_cmd) + proc = await create_subprocess_exec( + *su_cmd, + stdout=asyncio_subprocess.PIPE, + stderr=asyncio_subprocess.PIPE, + ) + else: + proc = await create_subprocess_exec( + self.podman_bin, + *args, + stdout=asyncio_subprocess.PIPE, + stderr=asyncio_subprocess.PIPE, + ) stdout, stderr = await proc.communicate() LOG.debug("Return code: %s", proc.returncode) LOG.debug("Stdout: %s", stdout.decode("utf-8", "replace")) @@ -984,7 +1323,33 @@ async def execute(self, *args): return proc.returncode, stdout, stderr - async def copy_to_container(self, container_id, src, dst): + async def run(self, podman_options=None, user=None): + """ + Run a container with command-line arguments as a list (async version). + + This is a simplified async container runner that accepts a list + of command-line arguments to pass to 'podman run'. + + :param list podman_options: List of command-line arguments for 'podman run'. + Example: ["-d", "--name", "test", "-e", "VAR=value", + "--memory", "100G", "image:tag", "--model", "/path"] + :param str user: Optional user to run podman command as (uses 'su - user -c') + :return: tuple (returncode, stdout, stderr) + """ + if not podman_options: + raise PodmanException("podman_options is required") + + if not isinstance(podman_options, list): + raise PodmanException("podman_options must be a list of strings") + + cmd_args = ["run"] + podman_options + + try: + return await self.execute(*cmd_args, user=user) + except PodmanException as ex: + raise PodmanException("Failed to run container.") from ex + + async def copy_to_container(self, container_id, src, dst, user=None): """Copy artifacts from src to container:dst. This method allows copying the contents of src to the dst. Files will @@ -994,10 +1359,11 @@ async def copy_to_container(self, container_id, src, dst): :param str container_id: string with the container identification. :param str src: what file or directory you are trying to copy. :param str dst: the destination inside the container. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout and stderr. """ try: - return await self.execute("cp", src, f"{container_id}:{dst}") + return await self.execute("cp", src, f"{container_id}:{dst}", user=user) except PodmanException as ex: error = f"Failed copying data to container {container_id}" LOG.error(error) @@ -1020,58 +1386,62 @@ async def get_python_version(self, image): output = stdout.decode().strip().split() return int(output[0]), int(output[1]), output[2] - async def get_container_info(self, container_id): + async def get_container_info(self, container_id, user=None): """Return all information about specific container. :param container_id: identifier of container :type container_id: str + :param str user: Optional system user to run podman command as. :rtype: dict """ try: _, stdout, _ = await self.execute( - "ps", "--all", "--format=json", "--filter", f"id={container_id}" + "ps", "--all", "--format=json", "--filter", f"id={container_id}", user=user ) except PodmanException as ex: raise PodmanException( f"Failed getting information about container:" f" {container_id}." ) from ex containers = json.loads(stdout.decode()) - for container in containers: - if container["Id"] == container_id: - return container + # Return first container if found (filter by id should return only one) + if containers and len(containers) > 0: + return containers[0] return {} - async def start(self, container_id): + async def start(self, container_id, user=None): """Starts a container and return the returncode, stdout and stderr. :param str container_id: Container identification string to start. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout and stderr. """ try: - return await self.execute("start", container_id) + return await self.execute("start", container_id, user=user) except PodmanException as ex: raise PodmanException("Failed to start the container.") from ex - async def stop(self, container_id): + async def stop(self, container_id, user=None): """Stops a container and return the returncode, stdout and stderr. :param str container_id: Container identification string to stop. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout and stderr. """ try: - return await self.execute("stop", "-t=0", container_id) + return await self.execute("stop", "-t=0", container_id, user=user) except PodmanException as ex: raise PodmanException("Failed to stop the container.") from ex - async def restart(self, container_id, timeout=10): + async def restart(self, container_id, timeout=10, user=None): """Restarts a container and return the returncode, stdout and stderr. :param str container_id: Container identification string to restart. :param int timeout: Timeout in seconds to wait for container to stop before killing it (default: 10). + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout and stderr. """ try: - return await self.execute("restart", f"-t={timeout}", container_id) + return await self.execute("restart", f"-t={timeout}", container_id, user=user) except PodmanException as ex: raise PodmanException("Failed to restart the container.") from ex @@ -1084,16 +1454,18 @@ async def exec_command( env=None, interactive=False, tty=False, + podman_user=None, ): """Execute a command in a running container. :param str container_id: Container identification string. :param str or list command: Command to execute. Can be a string or list of arguments. - :param str user: Optional user to run the command as (e.g., "root", "1000", "1000:1000"). + :param str user: Optional user to run the command as inside container (e.g., "root", "1000", "1000:1000"). :param str workdir: Optional working directory for the command. :param dict env: Optional dictionary of environment variables to set. :param bool interactive: Keep STDIN open even if not attached (default: False). :param bool tty: Allocate a pseudo-TTY (default: False). + :param str podman_user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout and stderr. """ try: @@ -1124,7 +1496,7 @@ async def exec_command( else: raise PodmanException("Command must be a string or list") - return await self.execute(*cmd_args) + return await self.execute(*cmd_args, user=podman_user) except PodmanException as ex: raise PodmanException( f"Failed to execute command in container {container_id}." @@ -1137,6 +1509,7 @@ async def collect_container_aiu_metrics( dtcompiler_export_dir=None, stats_command="aiu-smi --csv", timeout=None, + user=None, ): """ Start collecting AIU metrics from a container in the background using nohup. @@ -1151,6 +1524,7 @@ async def collect_container_aiu_metrics( (default: "aiu-smi --csv"). :param int timeout: Optional timeout in seconds. If specified, stats collection will automatically stop after this duration. If None, collection runs indefinitely. + :param str user: Optional system user to run podman command as. :return: Process object for the background process. :rtype: subprocess.Popen """ @@ -1170,23 +1544,21 @@ async def collect_container_aiu_metrics( # Add the bash command to execute podman_cmd.extend(["bash", "--login", "-c", stats_command]) + # Build the command string using shlex.join to properly quote arguments + podman_cmd_str = shlex.join(podman_cmd) # Build the full command with optional timeout if timeout: # Use timeout command to automatically stop after specified duration - full_command = f"nohup bash -c 'timeout {timeout} {' '.join(podman_cmd)} | tee {output_file}' > /dev/null 2>&1 &" - LOG.info( - "Starting background stats collection with %d second timeout: %s", - timeout, - full_command, - ) + base_command = f"timeout {timeout} {podman_cmd_str} | tee {output_file}" else: # Run indefinitely without timeout - full_command = f"nohup bash -c '{' '.join(podman_cmd)} | tee {output_file}' > /dev/null 2>&1 &" - LOG.info( - "Starting background stats collection (no timeout): %s", - full_command, - ) - + base_command = f"{podman_cmd_str} | tee {output_file}" + # Wrap with su if user is specified + if user and user != "root": + full_command = f"su - {user} -c {shlex.quote(f'nohup bash -c {shlex.quote(base_command)} > /dev/null 2>&1 &')}" + else: + full_command = f"nohup bash -c {shlex.quote(base_command)} > /dev/null 2>&1 &" + LOG.info("Starting background stats collection: %s", full_command) # Execute the command in the background process = subprocess.Popen( full_command, @@ -1450,148 +1822,29 @@ async def send_vllm_inference_request( LOG.error("%s: %s", error_msg, ex) raise PodmanException(error_msg) from ex - async def run_vllm_container( - self, - image, - aiu_ids, - host_models_dir, - vllm_model_path, - aiu_world_size, - max_model_len, - max_batch_size, - memory="100G", - shm_size="2G", - device="/dev/vfio", - privileged="true", - pids_limit="0", - userns="keep-id", - group_add="keep-groups", - port_mapping="127.0.0.1::8000", - vllm_spyre_use_cb="1", - vllm_dt_chunk_len=None, - vllm_spyre_use_chunked_prefill=None, - enable_prefix_caching=True, - additional_vllm_args=None, - container_name=None, - ): - """Run a VLLM container with AIU support. - - :param str image: Container image to run. - :param str aiu_ids: Space-separated AIU PCIe IDs. - :param str host_models_dir: Host directory containing models. - :param str vllm_model_path: Path to model inside container. - :param int aiu_world_size: AIU world size (tensor parallelism). - :param int max_model_len: Maximum model length. - :param int max_batch_size: Maximum batch size. - :param str memory: Memory limit (default: "100G" - adjust based on model size). - :param str shm_size: Shared memory size (default: "2G" - increase for larger batches). - :param str device: Device to mount (default: "/dev/vfio"). - :param str privileged: Run in privileged mode (default: "true"). - :param str pids_limit: PIDs limit (default: "0" - unlimited). - :param str userns: User namespace mode (default: "keep-id"). - :param str group_add: Group add mode (default: "keep-groups"). - :param str port_mapping: Port mapping in format - ``[host_ip]:[host_port]:container_port`` (default: - ``"127.0.0.1::8000"`` - localhost with random host port to - container port 8000). Examples: - ``"127.0.0.1:8000:8000"`` (fixed host port 8000), - ``"0.0.0.0::8000"`` (all interfaces, random host port), - ``"::8000"`` (all interfaces, random host port). - :param str vllm_spyre_use_cb: Use CB flag (default: "1"). - :param int vllm_dt_chunk_len: Optional DT chunk length. - :param int vllm_spyre_use_chunked_prefill: Optional chunked prefill flag. - :param bool enable_prefix_caching: Enable VLLM prefix caching (default: True). - :param list additional_vllm_args: Additional VLLM command-line arguments as list of strings. - :param str container_name: Optional container name. - :rtype: tuple with returncode, stdout (container ID), stderr. - """ - cmd_args = [ - "run", - "-d", - "-it", - f"--device={device}", - "-v", - f"{host_models_dir}:/models", - "-e", - f"AIU_PCIE_IDS={aiu_ids}", - "-e", - f"VLLM_SPYRE_USE_CB={vllm_spyre_use_cb}", - f"--privileged={privileged}", - "--pids-limit", - pids_limit, - f"--userns={userns}", - f"--group-add={group_add}", - "--memory", - memory, - "--shm-size", - shm_size, - "-p", - port_mapping, - ] - - if vllm_dt_chunk_len is not None: - cmd_args.extend(["-e", f"VLLM_DT_CHUNK_LEN={vllm_dt_chunk_len}"]) - if vllm_spyre_use_chunked_prefill is not None: - cmd_args.extend( - [ - "-e", - f"VLLM_SPYRE_USE_CHUNKED_PREFILL={vllm_spyre_use_chunked_prefill}", - ] - ) - - if container_name: - cmd_args.extend(["--name", container_name]) - - cmd_args.extend( - [ - image, - "--model", - vllm_model_path, - "-tp", - str(aiu_world_size), - "--max-model-len", - str(max_model_len), - "--max-num-seqs", - str(max_batch_size), - ] - ) - - if enable_prefix_caching: - cmd_args.append("--enable-prefix-caching") - - if additional_vllm_args: - if isinstance(additional_vllm_args, list): - cmd_args.extend(additional_vllm_args) - else: - raise PodmanException( - f"additional_vllm_args must be a list, got {type(additional_vllm_args).__name__}" - ) - - try: - return await self.execute(*cmd_args) - except PodmanException as ex: - raise PodmanException("Failed to run VLLM container.") from ex - async def list_containers(self, all_containers=True): + async def list_containers(self, all_containers=True, user=None): """List containers. :param bool all_containers: If True, list all containers including stopped ones. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout (JSON list), stderr. """ try: args = ["ps", "--format=json"] if all_containers: args.append("--all") - return await self.execute(*args) + return await self.execute(*args, user=user) except PodmanException as ex: raise PodmanException("Failed to list containers.") from ex - async def logs(self, container_id, follow=False, tail=None): + async def logs(self, container_id, follow=False, tail=None, user=None): """Get container logs. :param str container_id: Container identification string. :param bool follow: If True, follow log output. :param int tail: Number of lines to show from the end of the logs. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout, stderr. """ try: @@ -1601,30 +1854,32 @@ async def logs(self, container_id, follow=False, tail=None): if tail is not None: args.extend(["--tail", str(tail)]) args.append(container_id) - return await self.execute(*args) + return await self.execute(*args, user=user) except PodmanException as ex: raise PodmanException( f"Failed to get logs for container {container_id}." ) from ex - async def inspect(self, container_id): + async def inspect(self, container_id, user=None): """Inspect a container and return detailed information. :param str container_id: Container identification string. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout (JSON), stderr. """ try: - return await self.execute("inspect", container_id) + return await self.execute("inspect", container_id, user=user) except PodmanException as ex: raise PodmanException( f"Failed to inspect container {container_id}." ) from ex - async def remove(self, container_id, force=False): + async def remove(self, container_id, force=False, user=None): """Remove a container. :param str container_id: Container identification string. :param bool force: If True, force removal of running container. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout, stderr. """ try: @@ -1632,7 +1887,7 @@ async def remove(self, container_id, force=False): if force: args.append("--force") args.append(container_id) - return await self.execute(*args) + return await self.execute(*args, user=user) except PodmanException as ex: raise PodmanException(f"Failed to remove container {container_id}.") from ex @@ -1644,6 +1899,7 @@ async def login( api_key=None, api_key_username="iamapikey", password_stdin=False, + user=None, ): """Login to a container registry. @@ -1654,6 +1910,7 @@ async def login( :param str api_key_username: Username to use with API key authentication (default: "iamapikey" for IBM Cloud). Other registries may use different conventions (e.g., "oauth2accesstoken" for GCR). :param bool password_stdin: If True, read password from stdin. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout, stderr. """ try: @@ -1676,26 +1933,28 @@ async def login( ) args.append(registry) - return await self.execute(*args) + return await self.execute(*args, user=user) except PodmanException as ex: raise PodmanException(f"Failed to login to registry {registry}.") from ex - async def pull(self, image): + async def pull(self, image, user=None): """Pull an image from a registry. :param str image: Image name to pull. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout, stderr. """ try: - return await self.execute("pull", image) + return await self.execute("pull", image, user=user) except PodmanException as ex: raise PodmanException(f"Failed to pull image {image}.") from ex - async def stats(self, container_id, no_stream=True): + async def stats(self, container_id, no_stream=True, user=None): """Get container resource usage statistics. :param str container_id: Container identification string. :param bool no_stream: If True, output stats once and exit. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout, stderr. """ try: @@ -1703,244 +1962,65 @@ async def stats(self, container_id, no_stream=True): if no_stream: args.append("--no-stream") args.append(container_id) - return await self.execute(*args) + return await self.execute(*args, user=user) except PodmanException as ex: raise PodmanException( f"Failed to get stats for container {container_id}." ) from ex - async def run_multiple_vllm_containers( - self, - num_containers, - image, - aiu_ids_list, - host_models_dir, - vllm_model_path, - aiu_world_size, - max_model_len, - max_batch_size, - memory="100G", - shm_size="2G", - device="/dev/vfio", - privileged="true", - pids_limit="0", - userns="keep-id", - group_add="keep-groups", - port_mapping="127.0.0.1::8000", - vllm_spyre_use_cb="1", - vllm_dt_chunk_len=None, - vllm_spyre_use_chunked_prefill=None, - enable_prefix_caching=True, - additional_vllm_args=None, - container_name_prefix="vllm-container", - ): - """Run multiple VLLM containers concurrently with different AIU IDs. - - :param int num_containers: Number of containers to create. - :param str image: Container image to run. - :param list aiu_ids_list: List of AIU PCIe IDs for each container. - Must provide exactly num_containers AIU ID sets. - :param str host_models_dir: Host directory containing models. - :param str vllm_model_path: Path to model inside container. - :param int aiu_world_size: AIU world size (tensor parallelism). - :param int max_model_len: Maximum model length. - :param int max_batch_size: Maximum batch size. - :param str memory: Memory limit (default: "100G"). - :param str shm_size: Shared memory size (default: "2G"). - :param str device: Device to mount (default: "/dev/vfio"). - :param str privileged: Run in privileged mode (default: "true"). - :param str pids_limit: PIDs limit (default: "0"). - :param str userns: User namespace mode (default: "keep-id"). - :param str group_add: Group add mode (default: "keep-groups"). - :param str port_mapping: Port mapping (default: "127.0.0.1::8000"). - :param str vllm_spyre_use_cb: Use CB flag (default: "1"). - :param int vllm_dt_chunk_len: Optional DT chunk length. - :param int vllm_spyre_use_chunked_prefill: Optional chunked prefill flag. - :param bool enable_prefix_caching: Enable VLLM prefix caching (default: True). - :param list additional_vllm_args: Additional VLLM command-line arguments as list of strings. - :param str container_name_prefix: Prefix for container names. - :rtype: list of tuples (container_name, returncode, stdout, stderr). + async def get_container_port(self, container_id, port=8000, user=None): """ - if not isinstance(aiu_ids_list, list): - raise PodmanException( - f"aiu_ids_list must be a list, got {type(aiu_ids_list).__name__}" - ) - - if len(aiu_ids_list) != num_containers: - raise PodmanException( - f"Number of AIU ID sets ({len(aiu_ids_list)}) must match " - f"number of containers ({num_containers}). " - f"Each container requires its own unique AIU IDs." - ) - - container_names = [] - tasks = [] - for i in range(num_containers): - container_name = f"{container_name_prefix}-{i}" - container_names.append(container_name) - task = self.run_vllm_container( - image=image, - aiu_ids=aiu_ids_list[i], - host_models_dir=host_models_dir, - vllm_model_path=vllm_model_path, - aiu_world_size=aiu_world_size, - max_model_len=max_model_len, - max_batch_size=max_batch_size, - memory=memory, - shm_size=shm_size, - device=device, - privileged=privileged, - pids_limit=pids_limit, - userns=userns, - group_add=group_add, - port_mapping=port_mapping, - vllm_spyre_use_cb=vllm_spyre_use_cb, - vllm_dt_chunk_len=vllm_dt_chunk_len, - vllm_spyre_use_chunked_prefill=vllm_spyre_use_chunked_prefill, - enable_prefix_caching=enable_prefix_caching, - additional_vllm_args=additional_vllm_args, - container_name=container_name, - ) - tasks.append(task) - - task_results = await asyncio.gather(*tasks, return_exceptions=True) - - results = [] - for container_name, result in zip(container_names, task_results): - if isinstance(result, Exception): - LOG.error( - "Failed to create container %s: %s", container_name, str(result) - ) - results.append((container_name, None, None, str(result))) - elif isinstance(result, tuple) and len(result) == 3: - returncode, stdout, stderr = result - results.append((container_name, returncode, stdout, stderr)) - LOG.info("Container %s created successfully", container_name) - else: - LOG.error( - "Unexpected result type for container %s: %s", - container_name, - type(result), - ) - results.append((container_name, None, None, "Unexpected result type")) - - return results - - async def start_multiple_containers(self, container_ids): - """Start multiple containers concurrently. + Get the actual host port mapped to a container port. - :param list container_ids: List of container IDs to start. - :rtype: list of tuples (container_id, returncode, stdout, stderr). + :param container_id: Container ID + :param port: Container port to check (default: 8000) + :param user: Username if container was created by specific user + :return: Host port number or None """ - tasks = [self.start(container_id) for container_id in container_ids] - results = await asyncio.gather(*tasks, return_exceptions=True) - - output = [] - for container_id, result in zip(container_ids, results): - if isinstance(result, Exception): - LOG.error("Failed to start container %s: %s", container_id, str(result)) - output.append((container_id, None, None, str(result))) - elif isinstance(result, tuple) and len(result) == 3: - returncode, stdout, stderr = result - output.append((container_id, returncode, stdout, stderr)) - LOG.info("Container %s started successfully", container_id) - else: - LOG.error( - "Unexpected result type for container %s: %s", - container_id, - type(result), - ) - output.append((container_id, None, None, "Unexpected result type")) - - return output + return get_container_port(container_id, port, user, LOG) - async def stop_multiple_containers(self, container_ids): - """Stop multiple containers concurrently. - - :param list container_ids: List of container IDs to stop. - :rtype: list of tuples (container_id, returncode, stdout, stderr). + async def save_container_logs(self, container_id, log_dir, test_name="test", user=None): """ - tasks = [self.stop(container_id) for container_id in container_ids] - results = await asyncio.gather(*tasks, return_exceptions=True) - - output = [] - for container_id, result in zip(container_ids, results): - if isinstance(result, Exception): - LOG.error("Failed to stop container %s: %s", container_id, str(result)) - output.append((container_id, None, None, str(result))) - elif isinstance(result, tuple) and len(result) == 3: - returncode, stdout, stderr = result - output.append((container_id, returncode, stdout, stderr)) - LOG.info("Container %s stopped successfully", container_id) - else: - LOG.error( - "Unexpected result type for container %s: %s", - container_id, - type(result), - ) - output.append((container_id, None, None, "Unexpected result type")) - - return output + Save complete container logs to a file. - async def remove_multiple_containers(self, container_ids, force=False): - """Remove multiple containers concurrently. - - :param list container_ids: List of container IDs to remove. - :param bool force: If True, force removal of running containers. - :rtype: list of tuples (container_id, returncode, stdout, stderr). + :param container_id: Container ID + :param log_dir: Directory to save logs + :param test_name: Test name for log file naming (default: "test") + :param user: Username if container was created by specific user + :return: Path to saved log file or None """ - tasks = [ - self.remove(container_id, force=force) for container_id in container_ids - ] - results = await asyncio.gather(*tasks, return_exceptions=True) - - output = [] - for container_id, result in zip(container_ids, results): - if isinstance(result, Exception): - LOG.error( - "Failed to remove container %s: %s", container_id, str(result) - ) - output.append((container_id, None, None, str(result))) - elif isinstance(result, tuple) and len(result) == 3: - returncode, stdout, stderr = result - output.append((container_id, returncode, stdout, stderr)) - LOG.info("Container %s removed successfully", container_id) - else: - LOG.error( - "Unexpected result type for container %s: %s", - container_id, - type(result), - ) - output.append((container_id, None, None, "Unexpected result type")) - - return output + return save_container_logs(container_id, log_dir, test_name, user, LOG) - async def get_multiple_container_logs(self, container_ids, tail=None): - """Get logs from multiple containers concurrently. - :param list container_ids: List of container IDs. - :param int tail: Number of lines to show from the end of the logs. - :rtype: list of tuples (container_id, returncode, stdout, stderr). + async def wait_for_vllm_startup( + self, + container_id, + success_pattern="Application startup complete.", + failure_pattern=None, + additional_failure_checks=None, + timeout=300, + check_interval=10, + user=None, + show_live_logs=True, + live_log_lines=10 + ): """ - tasks = [self.logs(container_id, tail=tail) for container_id in container_ids] - results = await asyncio.gather(*tasks, return_exceptions=True) - - output = [] - for container_id, result in zip(container_ids, results): - if isinstance(result, Exception): - LOG.error( - "Failed to get logs for container %s: %s", container_id, str(result) - ) - output.append((container_id, None, None, str(result))) - elif isinstance(result, tuple) and len(result) == 3: - returncode, stdout, stderr = result - output.append((container_id, returncode, stdout, stderr)) - else: - LOG.error( - "Unexpected result type for container %s: %s", - container_id, - type(result), - ) - output.append((container_id, None, None, "Unexpected result type")) - - return output + Async method: Wait for container to start by checking logs for a success pattern. + + This is a generic async method that can be used for any container type. + The success and failure patterns can be customized based on the application. + + :param container_id: Container ID to monitor + :param success_pattern: String pattern to look for in logs indicating successful startup + :param failure_pattern: Optional string pattern indicating startup failure (e.g., "BACKTRACE") + :param additional_failure_checks: Optional list of tuples [(pattern, case_sensitive), ...] + for additional failure detection. Example: + [("VFIO", False), ("fail", False)] checks for "VFIO" and "fail" + :param timeout: Maximum time to wait in seconds (default: 300) + :param check_interval: Time between log checks in seconds (default: 10) + :param user: Username if container was created by specific user + :param show_live_logs: If True, display recent log lines during each check (default: True) + :param live_log_lines: Number of recent log lines to display (default: 10) + :return: True if startup successful, False otherwise + """ + return wait_for_vllm_startup(container_id, success_pattern, failure_pattern, additional_failure_checks, timeout, check_interval, user, show_live_logs, live_log_lines)