diff --git a/avocado/utils/podman.py b/avocado/utils/podman.py index 4592741834..da3ab21772 100644 --- a/avocado/utils/podman.py +++ b/avocado/utils/podman.py @@ -26,6 +26,7 @@ import json import logging import os +import shlex import subprocess import time from asyncio import create_subprocess_exec @@ -36,6 +37,317 @@ LOG = logging.getLogger(__name__) +def setup_user_and_group(username, password, spyre_group, add_to_group=True, log=None): + """ + Setup user and manage group membership for non-root container execution. + + :param username: Username to setup (None or "root" for root user) + :param password: Password for user (not used for root) + :param spyre_group: Group name to add/remove user to/from (e.g., "spyre_group") + :param add_to_group: True to add user to group, False to remove (default: True) + :param log: Logger instance (optional) + :return: None + """ + if log is None: + log = LOG + + if username is None or username == "root": + if spyre_group: + if add_to_group: + log.info("Add root user to %s group", spyre_group) + subprocess.run(["usermod", "-aG", spyre_group, "root"], + check=False) + else: + log.info("Remove root user from %s group", spyre_group) + subprocess.run(["gpasswd", "-d", "root", spyre_group], + check=False) + else: + # Check if user exists + user_check = subprocess.run( + ["id", "-u", username], + capture_output=True, check=False + ) + + if user_check.returncode != 0: + log.info("Create user: %s", username) + subprocess.run(["useradd", "-m", username], check=False) + # Use input parameter to pass password securely to chpasswd + subprocess.run(["chpasswd"], + input=f"{username}:{password}\n".encode(), + check=False) + + if spyre_group: + if add_to_group: + log.info("Add %s to %s group", username, spyre_group) + subprocess.run(["usermod", "-aG", spyre_group, username], + check=False) + else: + log.info("Remove %s from %s group", username, spyre_group) + subprocess.run(["gpasswd", "-d", username, spyre_group], + check=False) + +def get_container_port(container_id, port=8000, user=None, log=None): + """ + Get the actual host port mapped to a container port. + + :param container_id: Container ID + :param port: Container port to check (default: 8000) + :param user: Username if container was created by specific user + :param log: Logger instance (optional) + :return: Host port number or None + """ + if log is None: + log = LOG + + try: + if user and user != "root": + # Get user ID first + id_result = subprocess.run( + ["id", "-u", user], + capture_output=True, + text=True, + check=False + ) + if id_result.returncode != 0: + log.error("Failed to get user ID for %s", user) + return None + user_id = id_result.stdout.strip() + xdg_runtime_dir = f"/run/user/{user_id}" + # Run podman port command as user with proper environment + result = subprocess.run( + ["su", "-", user, "-c", f"XDG_RUNTIME_DIR={xdg_runtime_dir} podman port {container_id} {port}"], + capture_output=True, + text=True, + check=False + ) + else: + result = subprocess.run( + ["podman", "port", container_id, str(port)], + capture_output=True, + text=True, + check=False + ) + + if result.returncode == 0: + port_output = result.stdout.strip() + log.info("Port mapping output: %s", port_output) + + if port_output and ":" in port_output: + host_port = port_output.strip().split(":")[-1] + log.info( + "Container port %d is mapped to host port: %s", port, host_port) + return int(host_port) + else: + log.warning( + "Could not parse port from output: %s", port_output) + return None + else: + log.error("Failed to get container port: %s", result.stderr) + return None + except Exception as ex: + log.error("Failed to get container port: %s", ex) + return None + + +def save_container_logs(container_id, log_dir, test_name="test", user=None, log=None): + """ + Save complete container logs to a file. + + :param container_id: Container ID + :param log_dir: Directory to save logs + :param test_name: Test name for log file naming (default: "test") + :param user: Username if container was created by specific user + :param log: Logger instance (optional) + :return: Path to saved log file or None + """ + if log is None: + log = LOG + + try: + # Create logs directory + logs_dir = os.path.join(log_dir, "container_logs") + os.makedirs(logs_dir, exist_ok=True) + + # Generate log filename + timestamp = time.strftime("%Y%m%d_%H%M%S") + log_filename = f"{test_name}_{container_id[:12]}_{timestamp}.log" + log_filepath = os.path.join(logs_dir, log_filename) + + log.info("Saving complete container logs to: %s", log_filepath) + + # Get complete logs based on user context + if user and user != "root": + # Get user ID first + id_result = subprocess.run( + ["id", "-u", user], + capture_output=True, + text=True, + check=False + ) + if id_result.returncode != 0: + log.error("Failed to get user ID for %s", user) + return None + user_id = id_result.stdout.strip() + xdg_runtime_dir = f"/run/user/{user_id}" + # Run podman logs command as user with proper environment + result = subprocess.run( + ["su", "-", user, "-c", f"XDG_RUNTIME_DIR={xdg_runtime_dir} podman logs {container_id}"], + capture_output=True, + text=True, + check=False, + timeout=60 + ) + else: + result = subprocess.run( + ["podman", "logs", container_id], + capture_output=True, + text=True, + check=False, + timeout=60 + ) + + if result.returncode == 0: + log_content = result.stdout + else: + log_content = f"Error retrieving logs:\n{result.stderr}\n\nPartial stdout:\n{result.stdout}" + + # Save logs to file + with open(log_filepath, 'w', encoding='utf-8') as f: + f.write(f"Container ID: {container_id}\n") + f.write(f"Test Name: {test_name}\n") + f.write(f"User: {user if user else 'root'}\n") + f.write(f"Timestamp: {timestamp}\n") + f.write("=" * 80 + "\n\n") + f.write(log_content) + + log.info("Container logs saved successfully (%d bytes)", len(log_content)) + return log_filepath + + except subprocess.TimeoutExpired: + log.warning("Timeout while retrieving container logs") + return None + except Exception as ex: + log.warning("Failed to save container logs: %s", ex) + +def wait_for_vllm_startup( + container_id, + success_pattern="Application startup complete.", + failure_pattern=None, + additional_failure_checks=None, + timeout=300, + check_interval=20, + user=None, + log=None, + show_live_logs=True, + live_log_lines=20 +): + """ + Wait for container to start by checking logs for a success pattern. + + This is a generic function that can be used for any container type. + The success and failure patterns can be customized based on the application. + + :param container_id: Container ID to monitor + :param success_pattern: String pattern to look for in logs indicating successful startup + :param failure_pattern: Optional string pattern indicating startup failure (e.g., "BACKTRACE") + :param additional_failure_checks: Optional list of tuples [(pattern, case_sensitive), ...] + for additional failure detection. Example: + [("VFIO", False), ("fail", False)] checks for "VFIO" and "fail" + :param timeout: Maximum time to wait in seconds (default: 300) + :param check_interval: Time between log checks in seconds (default: 10) + :param user: Username if container was created by specific user + :param log: Logger instance (optional) + :param show_live_logs: If True, display recent log lines during each check (default: True) + :param live_log_lines: Number of recent log lines to display (default: 10) + :return: True if startup successful, False otherwise + """ + if log is None: + log = LOG + + elapsed = 0 + last_log_position = 0 + + while elapsed < timeout: + try: + # Get container logs directly using capture_output + if user and user != "root": + # Get user ID first + id_result = subprocess.run( + ["id", "-u", user], + capture_output=True, + text=True, + check=False + ) + if id_result.returncode != 0: + log.warning("Failed to get user ID for %s", user) + time.sleep(check_interval) + elapsed += check_interval + continue + user_id = id_result.stdout.strip() + xdg_runtime_dir = f"/run/user/{user_id}" + # Run podman logs command as user with proper environment + result = subprocess.run( + ["su", "-", user, "-c", f"XDG_RUNTIME_DIR={xdg_runtime_dir} podman logs {container_id}"], + capture_output=True, + text=True, + check=False, + timeout=30 + ) + else: + # For root user, run directly + result = subprocess.run( + ["podman", "logs", container_id], + capture_output=True, + text=True, + check=False, + timeout=30 + ) + # Combine stdout and stderr + log_content = result.stdout + result.stderr + if show_live_logs and log_content: + log_lines = log_content.split('\n') + current_log_length = len(log_lines) + if current_log_length > last_log_position: + new_lines = log_lines[last_log_position:] + display_lines = new_lines[-live_log_lines:] if len(new_lines) > live_log_lines else new_lines + if display_lines: + log.info("=== Recent Container Logs ===") + for line in display_lines: + if line.strip(): # Only show non-empty lines + log.info(" %s", line) + log.info("=== End Logs ===") + last_log_position = current_log_length + if failure_pattern and failure_pattern in log_content: + log.error("%s detected in container logs - startup failed", failure_pattern) + log.error("Container logs:\n%s", log_content) + return False + if additional_failure_checks: + for pattern, case_sensitive in additional_failure_checks: + check_content = log_content if case_sensitive else log_content.lower() + check_pattern = pattern if case_sensitive else pattern.lower() + if check_pattern in check_content: + log.error("Failure pattern '%s' detected in logs", pattern) + log.error("Container logs:\n%s", log_content) + return False + if success_pattern in log_content: + log.info("✓ Container started successfully: %s", container_id) + return True + log.info("Waiting for container startup... (%d/%d seconds)", elapsed, timeout) + time.sleep(check_interval) + elapsed += check_interval + + except subprocess.TimeoutExpired: + log.warning("Timeout getting container logs") + time.sleep(check_interval) + elapsed += check_interval + except Exception as ex: + log.warning("Failed to get container logs: %s", ex) + time.sleep(check_interval) + elapsed += check_interval + + log.error("Timeout waiting for container startup") + return False + def install_huggingface_cli(): """ Install Hugging Face CLI if not already installed. @@ -43,44 +355,77 @@ def install_huggingface_cli(): :return: True if installed successfully, False otherwise """ try: - result = subprocess.run( - ["huggingface-cli", "--version"], - capture_output=True, - text=True, - check=False, - ) - if result.returncode == 0: - LOG.info("Hugging Face CLI already installed: %s", result.stdout.strip()) - return True + # Check if huggingface-cli is already available + hf_cli_path = which("hf") + if hf_cli_path: + LOG.info("Hugging Face CLI already installed at: %s", hf_cli_path) + try: + result = subprocess.run( + [hf_cli_path, "--version"], + capture_output=True, + text=True, + check=False, + timeout=10 + ) + if result.returncode == 0: + LOG.info("Hugging Face CLI version: %s", + result.stdout.strip()) + return True + except Exception as e: + LOG.warning("Could not verify HF CLI version: %s", e) + return True # CLI exists, assume it works + + LOG.info("Hugging Face CLI not found, installing...") + + # Try to install using pip + pip_path = which("pip") or which("pip3") + if not pip_path: + LOG.error("pip not found, cannot install Hugging Face CLI") + return False - LOG.info("Installing Hugging Face CLI...") + LOG.info("Installing huggingface_hub[cli] using: %s", pip_path) result = subprocess.run( - ["pip", "install", "-U", "huggingface_hub[cli]"], + [pip_path, "install", "-U", "huggingface_hub[cli]"], capture_output=True, text=True, check=False, + timeout=300 ) if result.returncode == 0: LOG.info("Hugging Face CLI installed successfully") - verify_result = subprocess.run( - ["huggingface-cli", "--version"], - capture_output=True, - text=True, - check=False, - ) - if verify_result.returncode == 0: - LOG.info("Verified installation: %s", verify_result.stdout.strip()) + LOG.debug("Installation output: %s", result.stdout) + + # Verify installation + hf_cli_path = which("hf") + if hf_cli_path: + LOG.info("Verified: huggingface-cli found at %s", hf_cli_path) return True + else: + LOG.warning("huggingface-cli installed but not found in PATH") + # Try common installation paths + common_paths = [ + os.path.expanduser("~/.local/bin/hf"), + "/usr/local/bin/hf", + "/usr/bin/hf" + ] + for path in common_paths: + if os.path.exists(path): + LOG.info("Found huggingface-cli at: %s", path) + return True + return False + else: + LOG.error("Failed to install Hugging Face CLI") + LOG.error("Error output: %s", result.stderr) + return False - LOG.error("Failed to install Hugging Face CLI") + except subprocess.TimeoutExpired: + LOG.error("Timeout while installing Hugging Face CLI") return False - except Exception as ex: LOG.error("Error installing Hugging Face CLI: %s", ex) return False - def download_model_from_hf(hf_model_id, local_dir, model_name): """ Download a model from Hugging Face Hub. @@ -108,7 +453,7 @@ def download_model_from_hf(hf_model_id, local_dir, model_name): LOG.info(" Destination: %s", model_path) result = subprocess.run( - ["huggingface-cli", "download", "--local-dir", model_path, hf_model_id], + ["hf", "download", "--local-dir", model_path, hf_model_id], capture_output=True, text=True, timeout=3600, @@ -138,6 +483,90 @@ def download_model_from_hf(hf_model_id, local_dir, model_name): LOG.error("Error downloading model: %s", ex) return False +def validate_model_with_sha(model_path): + """ + Validate model files by checking SHA256 checksums if available. + + :param model_path: Path to the model directory + :return: Tuple of (is_valid, validation_messages) + """ + import hashlib + import glob + validation_messages = [] + is_valid = True + required_files = [ + "config.json", + "tokenizer.json", + "tokenizer_config.json" + ] + for req_file in required_files: + file_path = os.path.join(model_path, req_file) + if not os.path.exists(file_path): + validation_messages.append(f"MISSING: {req_file}") + is_valid = False + elif os.path.getsize(file_path) == 0: + validation_messages.append(f"EMPTY: {req_file}") + is_valid = False + else: + validation_messages.append( + f"OK: {req_file} ({os.path.getsize(file_path)} bytes)") + weight_patterns = ["*.safetensors", "*.bin", "pytorch_model*.bin"] + weight_files = [] + for pattern in weight_patterns: + weight_files.extend(glob.glob(os.path.join(model_path, pattern))) + if not weight_files: + validation_messages.append("MISSING: No model weight files found") + is_valid = False + else: + validation_messages.append(f"Found {len(weight_files)} weight file(s)") + for weight_file in weight_files: + file_size = os.path.getsize(weight_file) + if file_size == 0: + validation_messages.append( + f"EMPTY: {os.path.basename(weight_file)}") + is_valid = False + else: + validation_messages.append( + f"OK: {os.path.basename(weight_file)} ({file_size} bytes)") + sha_file = os.path.join(model_path, "SHA256SUMS") + + if os.path.exists(sha_file): + validation_messages.append( + f"Found checksum file: {os.path.basename(sha_file)}") + try: + with open(sha_file, 'r') as f: + sha_content = f.read() + for weight_file in weight_files: + filename = os.path.basename(weight_file) + if filename in sha_content: + validation_messages.append( + f"Validating SHA256 for: {filename}") + sha256_hash = hashlib.sha256() + try: + with open(weight_file, "rb") as f: + for byte_block in iter(lambda: f.read(4096), b""): + sha256_hash.update(byte_block) + actual_sha = sha256_hash.hexdigest() + if actual_sha[:16] in sha_content: + validation_messages.append( + f"SHA256 VALID: {filename}") + else: + validation_messages.append( + f"SHA256 MISMATCH: {filename}") + validation_messages.append( + f" Calculated: {actual_sha[:16]}...") + is_valid = False + except Exception as sha_ex: + validation_messages.append( + f"SHA256 calculation failed for {filename}: {sha_ex}") + except Exception as ex: + validation_messages.append(f"Failed to read checksum file: {ex}") + else: + validation_messages.append( + "No checksum file found - skipping SHA validation") + + return is_valid, validation_messages + class PodmanException(Exception): pass @@ -167,17 +596,24 @@ def __init__(self, podman_bin=None): class Podman(_Podman): - def execute(self, *args): + def execute(self, *args, user=None): """Execute a command and return the returncode, stdout and stderr. :param args: Variable length argument list to be used as argument during execution. + :param user: Optional user to run command as (uses 'su - user -c') :rtype: tuple with returncode, stdout and stderr. """ try: LOG.debug("Executing %s", args) - cmd = [self.podman_bin, *args] + if user: + import shlex + podman_cmd = [self.podman_bin] + list(args) + podman_cmd_str = " ".join(shlex.quote(str(arg)) for arg in podman_cmd) + cmd = ["su", "-", user, "-c", podman_cmd_str] + else: + cmd = [self.podman_bin, *args] with subprocess.Popen( cmd, stdin=subprocess.DEVNULL, @@ -189,7 +625,7 @@ def execute(self, *args): LOG.debug("Stdout: %s", stdout.decode("utf-8", "replace")) LOG.debug("Stderr: %s", stderr.decode("utf-8", "replace")) if proc.returncode: - command_args = " ".join(args) + command_args = " ".join(str(a) for a in args) msg = f'Failure from command "{self.podman_bin} {command_args}": returned code "{proc.returncode}" stderr: "{stderr}"' LOG.error(msg) raise PodmanException(msg) @@ -200,7 +636,7 @@ def execute(self, *args): LOG.error("%s: %s", msg, str(ex)) raise PodmanException(msg) from ex - def copy_to_container(self, container_id, src, dst): + def copy_to_container(self, container_id, src, dst, user=None): """Copy artifacts from src to container:dst. This method allows copying the contents of src to the dst. Files will @@ -210,10 +646,11 @@ def copy_to_container(self, container_id, src, dst): :param str container_id: string with the container identification. :param str src: what file or directory you are trying to copy. :param str dst: the destination inside the container. + :param str user: Optional user to run command as. :rtype: tuple with returncode, stdout and stderr. """ try: - return self.execute("cp", src, f"{container_id}:{dst}") + return self.execute("cp", src, f"{container_id}:{dst}", user=user) except PodmanException as ex: error = f"Failed copying data to container {container_id}" LOG.error(error) @@ -237,58 +674,64 @@ def get_python_version(self, image): return int(output[0]), int(output[1]), output[2] return None - def get_container_info(self, container_id): + def get_container_info(self, container_id, user=None): """Return all information about specific container. :param container_id: identifier of container :type container_id: str + :param user: Optional user to run command as + :type user: str :rtype: dict """ try: _, stdout, _ = self.execute( - "ps", "--all", "--format=json", "--filter", f"id={container_id}" + "ps", "--all", "--format=json", "--filter", f"id={container_id}", + user=user ) except PodmanException as ex: raise PodmanException( f"Failed getting information about container: {container_id}." ) from ex containers = json.loads(stdout.decode()) - for container in containers: - if container.get("Id") == container_id: - return container + # Return first container if found (filter by id should return only one) + if containers and len(containers) > 0: + return containers[0] return {} - def start(self, container_id): + def start(self, container_id, user=None): """Starts a container and return the returncode, stdout and stderr. :param str container_id: Container identification string to start. + :param str user: Optional user to run command as. :rtype: tuple with returncode, stdout and stderr. """ try: - return self.execute("start", container_id) + return self.execute("start", container_id, user=user) except PodmanException as ex: raise PodmanException("Failed to start the container.") from ex - def stop(self, container_id): + def stop(self, container_id, user=None): """Stops a container and return the returncode, stdout and stderr. :param str container_id: Container identification string to stop. + :param str user: Optional user to run command as. :rtype: tuple with returncode, stdout and stderr. """ try: - return self.execute("stop", "-t=0", container_id) + return self.execute("stop", "-t=0", container_id, user=user) except PodmanException as ex: raise PodmanException("Failed to stop the container.") from ex - def restart(self, container_id, timeout=10): + def restart(self, container_id, timeout=10, user=None): """Restarts a container and return the returncode, stdout and stderr. :param str container_id: Container identification string to restart. :param int timeout: Timeout in seconds to wait for container to stop before killing it (default: 10). + :param str user: Optional user to run command as. :rtype: tuple with returncode, stdout and stderr. """ try: - return self.execute("restart", f"-t={timeout}", container_id) + return self.execute("restart", f"-t={timeout}", container_id, user=user) except PodmanException as ex: raise PodmanException("Failed to restart the container.") from ex @@ -301,16 +744,18 @@ def exec_command( env=None, interactive=False, tty=False, + podman_user=None, ): """Execute a command in a running container. :param str container_id: Container identification string. :param str or list command: Command to execute. Can be a string or list of arguments. - :param str user: Optional user to run the command as (e.g., "root", "1000", "1000:1000"). + :param str user: Optional user to run the command as inside container (e.g., "root", "1000", "1000:1000"). :param str workdir: Optional working directory for the command. :param dict env: Optional dictionary of environment variables to set. :param bool interactive: Keep STDIN open even if not attached (default: False). :param bool tty: Allocate a pseudo-TTY (default: False). + :param str podman_user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout and stderr. """ try: @@ -337,7 +782,7 @@ def exec_command( else: raise PodmanException("Command must be a string or list") - return self.execute(*cmd_args) + return self.execute(*cmd_args, user=podman_user) except PodmanException as ex: raise PodmanException( f"Failed to execute command in container {container_id}." @@ -350,6 +795,7 @@ def collect_container_aiu_metrics( dtcompiler_export_dir=None, stats_command="aiu-smi --csv", timeout=None, + user=None, ): """ Start collecting AIU metrics from a container in the background using nohup. @@ -364,6 +810,7 @@ def collect_container_aiu_metrics( (default: "aiu-smi --csv"). :param int timeout: Optional timeout in seconds. If specified, stats collection will automatically stop after this duration. If None, collection runs indefinitely. + :param str user: Optional system user to run podman command as. :return: Process object for the background process. :rtype: subprocess.Popen """ @@ -379,19 +826,18 @@ def collect_container_aiu_metrics( podman_cmd.extend(["bash", "--login", "-c", stats_command]) + # Build the command string using shlex.join to properly quote arguments + podman_cmd_str = shlex.join(podman_cmd) if timeout: - full_command = f"nohup bash -c 'timeout {timeout} {' '.join(podman_cmd)} | tee {output_file}' > /dev/null 2>&1 &" - LOG.info( - "Starting background stats collection with %d second timeout: %s", - timeout, - full_command, - ) + base_command = f"timeout {timeout} {podman_cmd_str} | tee {output_file}" else: - full_command = f"nohup bash -c '{' '.join(podman_cmd)} | tee {output_file}' > /dev/null 2>&1 &" - LOG.info( - "Starting background stats collection (no timeout): %s", - full_command, - ) + base_command = f"{podman_cmd_str} | tee {output_file}" + # Wrap with user if specified + if user: + full_command = f"nohup su - {user} -c {shlex.quote(base_command)} > /dev/null 2>&1 &" + else: + full_command = f"nohup bash -c {shlex.quote(base_command)} > /dev/null 2>&1 &" + LOG.info("Starting background stats collection: %s", full_command) process = subprocess.Popen( full_command, @@ -410,7 +856,7 @@ def collect_container_aiu_metrics( raise PodmanException(error_msg) from ex def collect_container_stats( - self, container_id, output_dir, interval=1, duration=60 + self, container_id, output_dir, interval=1, duration=60, user=None ): """ Collect Podman CPU and memory stats and write as JSON. @@ -424,12 +870,13 @@ def collect_container_stats( Container-specific subdirectories will be created. :param int interval: Interval in seconds between stat collections (default: 1). :param int duration: Total duration in seconds to collect stats (default: 60). + :param str user: Optional system user to run podman command as. :return: Path to the generated JSON stats file (or list of paths if container_id="all"). :rtype: str or list """ try: if container_id.lower() == "all": - _, stdout, _ = self.execute("ps", "--format", "{{.ID}}") + _, stdout, _ = self.execute("ps", "--format", "{{.ID}}", user=user) container_ids = stdout.decode().strip().split("\n") container_ids = [cid.strip() for cid in container_ids if cid.strip()] @@ -442,14 +889,14 @@ def collect_container_stats( json_files = [] for cid in container_ids: json_file = self._collect_single_container_stats( - cid, output_dir, interval, duration + cid, output_dir, interval, duration, user ) json_files.append(json_file) return json_files else: return self._collect_single_container_stats( - container_id, output_dir, interval, duration + container_id, output_dir, interval, duration, user ) except Exception as ex: @@ -458,7 +905,7 @@ def collect_container_stats( raise PodmanException(error_msg) from ex def _collect_single_container_stats( - self, container_id, base_output_dir, interval, duration + self, container_id, base_output_dir, interval, duration, user=None ): """ Internal method to collect stats for a single container. @@ -467,6 +914,7 @@ def _collect_single_container_stats( :param str base_output_dir: Base directory for output. :param int interval: Interval between collections. :param int duration: Total duration. + :param str user: Optional system user to run podman command as. :return: Path to JSON stats file. :rtype: str """ @@ -488,7 +936,7 @@ def _collect_single_container_stats( while time.time() < end_time: try: _, stdout, stderr = self.execute( - "stats", "--no-stream", "--format", "json", container_id + "stats", "--no-stream", "--format", "json", container_id, user=user ) if stderr: @@ -630,208 +1078,54 @@ def send_vllm_inference_request( LOG.error("%s: %s", error_msg, ex) raise PodmanException(error_msg) from ex - def run_vllm_container( - self, - image, - aiu_ids, - host_models_dir, - vllm_model_path, - aiu_world_size, - max_model_len, - max_batch_size, - memory="100G", - shm_size="2G", - device="/dev/vfio", - privileged="true", - pids_limit="0", - userns="keep-id", - group_add="keep-groups", - port_mapping="127.0.0.1::8000", - vllm_spyre_use_cb=None, - vllm_dt_chunk_len=None, - vllm_spyre_use_chunked_prefill=None, - enable_prefix_caching=None, - max_num_batched_tokens=None, - additional_vllm_args=None, - container_name=None, - user=None, - dtlog_level=None, - dtcompiler_keep_export=None, - vllm_spyre_require_precompiled_decoders=None, - enable_flex_timing=None, - flex_print_end_to_end_breakdown=None, - flex_skip_timestamp_calibration=None, - flex_scheduler_print_raw_timestamps=None, - flex_global_profile_prefix=None, - ): - """Run a VLLM container with AIU support. - - :param str image: Container image to run. - :param str aiu_ids: Space-separated AIU PCIe IDs. - :param str host_models_dir: Host directory containing models. - :param str vllm_model_path: Path to model inside container. - :param int aiu_world_size: AIU world size (tensor parallelism). - :param int max_model_len: Maximum model length. - :param int max_batch_size: Maximum batch size. - :param str memory: Memory limit (default: "100G" - adjust based on model size). - :param str shm_size: Shared memory size (default: "2G" - increase for larger batches). - :param str device: Device to mount (default: "/dev/vfio"). - :param str privileged: Run in privileged mode (default: "true"). - :param str pids_limit: PIDs limit (default: "0" - unlimited). - :param str userns: User namespace mode (default: "keep-id"). - :param str group_add: Group add mode (default: "keep-groups"). - :param str port_mapping: Port mapping in format - ``[host_ip]:[host_port]:container_port`` (default: - ``"127.0.0.1::8000"`` - localhost with random host port to - container port 8000). Examples: - ``"127.0.0.1:8000:8000"`` (fixed host port 8000), - ``"0.0.0.0::8000"`` (all interfaces, random host port), - ``"::8000"`` (all interfaces, random host port). - :param str vllm_spyre_use_cb: Optional VLLM Spyre use CB flag (e.g., "1"). If None, not set. - :param int vllm_dt_chunk_len: Optional DT chunk length. If None, not set. - :param int vllm_spyre_use_chunked_prefill: Optional chunked prefill flag. If None, not set. - :param bool enable_prefix_caching: Optional enable VLLM prefix caching. If None, not set. - :param int max_num_batched_tokens: Optional maximum number of batched tokens. If None, not set. - :param list additional_vllm_args: Additional VLLM command-line arguments as list of strings. - :param str container_name: Optional container name. - :param str user: Optional user to run container as (e.g., "1000:1000"). - :param str dtlog_level: Optional DT log level (e.g., "warning"). If None, not set. - :param str dtcompiler_keep_export: Optional DT compiler keep export flag (e.g., "true"). If None, not set. - :param str vllm_spyre_require_precompiled_decoders: Optional require precompiled decoders flag (e.g., "0"). If None, not set. - :param str enable_flex_timing: Optional enable flex timing flag (e.g., "0"). If None, not set. - :param str flex_print_end_to_end_breakdown: Optional flex print end-to-end breakdown flag (e.g., "0"). If None, not set. - :param str flex_skip_timestamp_calibration: Optional flex skip timestamp calibration flag (e.g., "0"). If None, not set. - :param str flex_scheduler_print_raw_timestamps: Optional flex scheduler print raw timestamps flag (e.g., "0"). If None, not set. - :param str flex_global_profile_prefix: Optional flex global profile prefix (e.g., "flex-logs"). If None, not set. - :rtype: tuple with returncode, stdout (container ID), stderr. + def run(self, podman_options=None, user=None): """ - cmd_args = [ - "run", - "-d", - "-it", - f"--device={device}", - "-v", - f"{host_models_dir}:/models", - "-e", - f"AIU_PCIE_IDS={aiu_ids}", - f"--privileged={privileged}", - "--pids-limit", - pids_limit, - f"--userns={userns}", - f"--group-add={group_add}", - "--memory", - memory, - "--shm-size", - shm_size, - "-p", - port_mapping, - ] - - if vllm_spyre_use_cb is not None: - cmd_args.extend(["-e", f"VLLM_SPYRE_USE_CB={vllm_spyre_use_cb}"]) - if vllm_dt_chunk_len is not None: - cmd_args.extend(["-e", f"VLLM_DT_CHUNK_LEN={vllm_dt_chunk_len}"]) - if vllm_spyre_use_chunked_prefill is not None: - cmd_args.extend( - [ - "-e", - f"VLLM_SPYRE_USE_CHUNKED_PREFILL={vllm_spyre_use_chunked_prefill}", - ] - ) - - if dtlog_level is not None: - cmd_args.extend(["-e", f"DTLOG_LEVEL={dtlog_level}"]) - if dtcompiler_keep_export is not None: - cmd_args.extend(["-e", f"DTCOMPILER_KEEP_EXPORT={dtcompiler_keep_export}"]) - if vllm_spyre_require_precompiled_decoders is not None: - cmd_args.extend( - [ - "-e", - f"VLLM_SPYRE_REQUIRE_PRECOMPILED_DECODERS={vllm_spyre_require_precompiled_decoders}", - ] - ) - if enable_flex_timing is not None: - cmd_args.extend(["-e", f"ENABLE_FLEX_TIMING={enable_flex_timing}"]) - if flex_print_end_to_end_breakdown is not None: - cmd_args.extend( - [ - "-e", - f"FLEX_PRINT_END_TO_END_BREAKDOWN={flex_print_end_to_end_breakdown}", - ] - ) - if flex_skip_timestamp_calibration is not None: - cmd_args.extend( - [ - "-e", - f"FLEX_SKIP_TIMESTAMP_CALIBRATION={flex_skip_timestamp_calibration}", - ] - ) - if flex_scheduler_print_raw_timestamps is not None: - cmd_args.extend( - [ - "-e", - f"FLEX_SCHEDULER_PRINT_RAW_TIMESTAMPS={flex_scheduler_print_raw_timestamps}", - ] - ) - if flex_global_profile_prefix is not None: - cmd_args.extend( - ["-e", f"FLEX_GLOBAL_PROFILE_PREFIX={flex_global_profile_prefix}"] - ) - - if user is not None: - cmd_args.extend(["--user", user]) - - if container_name: - cmd_args.extend(["--name", container_name]) - - cmd_args.extend( - [ - image, - "--model", - vllm_model_path, - "-tp", - str(aiu_world_size), - "--max-model-len", - str(max_model_len), - "--max-num-seqs", - str(max_batch_size), - ] - ) - - if enable_prefix_caching is not None and enable_prefix_caching: - cmd_args.append("--enable-prefix-caching") + Run a container with command-line arguments as a list. + + This is a simplified generic container runner that accepts a list + of command-line arguments to pass to 'podman run'. + + :param list podman_options: List of command-line arguments for 'podman run'. + Example: ["-d", "--name", "test", "-e", "VAR=value", + "--memory", "100G", "image:tag", "--model", "/path"] + :param str user: Optional user to run podman command as (uses 'su - user -c') + :return: tuple (returncode, stdout, stderr) + """ + if not podman_options: + raise PodmanException("podman_options is required") - if max_num_batched_tokens is not None: - cmd_args.extend(["--max-num-batched-tokens", str(max_num_batched_tokens)]) + if not isinstance(podman_options, list): + raise PodmanException("podman_options must be a list of strings") - if additional_vllm_args: - cmd_args.extend(additional_vllm_args) + cmd_args = ["run"] + podman_options try: - return self.execute(*cmd_args) + return self.execute(*cmd_args, user=user) except PodmanException as ex: - raise PodmanException("Failed to run VLLM container.") from ex + raise PodmanException("Failed to run container.") from ex - def list_containers(self, all_containers=True): + def list_containers(self, all_containers=True, user=None): """List containers. :param bool all_containers: If True, list all containers including stopped ones. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout (JSON list), stderr. """ try: args = ["ps", "--format=json"] if all_containers: args.append("--all") - return self.execute(*args) + return self.execute(*args, user=user) except PodmanException as ex: raise PodmanException("Failed to list containers.") from ex - def logs(self, container_id, follow=False, tail=None): + def logs(self, container_id, follow=False, tail=None, user=None): """Get container logs. :param str container_id: Container identification string. :param bool follow: If True, follow log output. :param int tail: Number of lines to show from the end of the logs. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout, stderr. """ try: @@ -841,30 +1135,32 @@ def logs(self, container_id, follow=False, tail=None): if tail is not None: args.extend(["--tail", str(tail)]) args.append(container_id) - return self.execute(*args) + return self.execute(*args, user=user) except PodmanException as ex: raise PodmanException( f"Failed to get logs for container {container_id}." ) from ex - def inspect(self, container_id): + def inspect(self, container_id, user=None): """Inspect a container and return detailed information. :param str container_id: Container identification string. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout (JSON), stderr. """ try: - return self.execute("inspect", container_id) + return self.execute("inspect", container_id, user=user) except PodmanException as ex: raise PodmanException( f"Failed to inspect container {container_id}." ) from ex - def remove(self, container_id, force=False): + def remove(self, container_id, force=False, user=None): """Remove a container. :param str container_id: Container identification string. :param bool force: If True, force removal of running container. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout, stderr. """ try: @@ -872,7 +1168,7 @@ def remove(self, container_id, force=False): if force: args.append("--force") args.append(container_id) - return self.execute(*args) + return self.execute(*args, user=user) except PodmanException as ex: raise PodmanException(f"Failed to remove container {container_id}.") from ex @@ -884,6 +1180,7 @@ def login( api_key=None, api_key_username="iamapikey", password_stdin=False, + user=None, ): """Login to a container registry. @@ -894,6 +1191,7 @@ def login( :param str api_key_username: Username to use with API key authentication (default: "iamapikey" for IBM Cloud). Other registries may use different conventions (e.g., "oauth2accesstoken" for GCR). :param bool password_stdin: If True, read password from stdin. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout, stderr. """ try: @@ -916,26 +1214,28 @@ def login( ) args.append(registry) - return self.execute(*args) + return self.execute(*args, user=user) except PodmanException as ex: raise PodmanException(f"Failed to login to registry {registry}.") from ex - def pull(self, image): + def pull(self, image, user=None): """Pull an image from a registry. :param str image: Image name to pull. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout, stderr. """ try: - return self.execute("pull", image) + return self.execute("pull", image, user=user) except PodmanException as ex: raise PodmanException(f"Failed to pull image {image}.") from ex - def stats(self, container_id, no_stream=True): + def stats(self, container_id, no_stream=True, user=None): """Get container resource usage statistics. :param str container_id: Container identification string. :param bool no_stream: If True, output stats once and exit. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout, stderr. """ try: @@ -943,30 +1243,69 @@ def stats(self, container_id, no_stream=True): if no_stream: args.append("--no-stream") args.append(container_id) - return self.execute(*args) + return self.execute(*args, user=user) except PodmanException as ex: raise PodmanException( f"Failed to get stats for container {container_id}." ) from ex + def get_container_port(self, container_id, port=8000, user=None): + """ + Get the actual host port mapped to a container port. + + :param container_id: Container ID + :param port: Container port to check (default: 8000) + :param user: Username if container was created by specific user + :return: Host port number or None + """ + return get_container_port(container_id, port, user, LOG) + + def save_container_logs(self, container_id, log_dir, test_name="test", user=None): + """ + Save complete container logs to a file. + + :param container_id: Container ID + :param log_dir: Directory to save logs + :param test_name: Test name for log file naming (default: "test") + :param user: Username if container was created by specific user + :return: Path to saved log file or None + """ + return save_container_logs(container_id, log_dir, test_name, user, LOG) + + class AsyncPodman(_Podman): - async def execute(self, *args): + + async def execute(self, *args, user=None): """Execute a command and return the returncode, stdout and stderr. :param args: Variable length argument list to be used as argument during execution. + :param user: Optional system user to run podman command as (uses 'su - user -c'). :rtype: tuple with returncode, stdout and stderr. """ try: LOG.debug("Executing %s", args) - proc = await create_subprocess_exec( - self.podman_bin, - *args, - stdout=asyncio_subprocess.PIPE, - stderr=asyncio_subprocess.PIPE, - ) + if user and user != "root": + import shlex + # Build the full podman command + podman_cmd = [self.podman_bin] + list(args) + podman_cmd_str = " ".join(shlex.quote(str(arg)) for arg in podman_cmd) + su_cmd = ["su", "-", user, "-c", podman_cmd_str] + LOG.debug("Executing as user %s: %s", user, su_cmd) + proc = await create_subprocess_exec( + *su_cmd, + stdout=asyncio_subprocess.PIPE, + stderr=asyncio_subprocess.PIPE, + ) + else: + proc = await create_subprocess_exec( + self.podman_bin, + *args, + stdout=asyncio_subprocess.PIPE, + stderr=asyncio_subprocess.PIPE, + ) stdout, stderr = await proc.communicate() LOG.debug("Return code: %s", proc.returncode) LOG.debug("Stdout: %s", stdout.decode("utf-8", "replace")) @@ -984,7 +1323,33 @@ async def execute(self, *args): return proc.returncode, stdout, stderr - async def copy_to_container(self, container_id, src, dst): + async def run(self, podman_options=None, user=None): + """ + Run a container with command-line arguments as a list (async version). + + This is a simplified async container runner that accepts a list + of command-line arguments to pass to 'podman run'. + + :param list podman_options: List of command-line arguments for 'podman run'. + Example: ["-d", "--name", "test", "-e", "VAR=value", + "--memory", "100G", "image:tag", "--model", "/path"] + :param str user: Optional user to run podman command as (uses 'su - user -c') + :return: tuple (returncode, stdout, stderr) + """ + if not podman_options: + raise PodmanException("podman_options is required") + + if not isinstance(podman_options, list): + raise PodmanException("podman_options must be a list of strings") + + cmd_args = ["run"] + podman_options + + try: + return await self.execute(*cmd_args, user=user) + except PodmanException as ex: + raise PodmanException("Failed to run container.") from ex + + async def copy_to_container(self, container_id, src, dst, user=None): """Copy artifacts from src to container:dst. This method allows copying the contents of src to the dst. Files will @@ -994,10 +1359,11 @@ async def copy_to_container(self, container_id, src, dst): :param str container_id: string with the container identification. :param str src: what file or directory you are trying to copy. :param str dst: the destination inside the container. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout and stderr. """ try: - return await self.execute("cp", src, f"{container_id}:{dst}") + return await self.execute("cp", src, f"{container_id}:{dst}", user=user) except PodmanException as ex: error = f"Failed copying data to container {container_id}" LOG.error(error) @@ -1020,58 +1386,62 @@ async def get_python_version(self, image): output = stdout.decode().strip().split() return int(output[0]), int(output[1]), output[2] - async def get_container_info(self, container_id): + async def get_container_info(self, container_id, user=None): """Return all information about specific container. :param container_id: identifier of container :type container_id: str + :param str user: Optional system user to run podman command as. :rtype: dict """ try: _, stdout, _ = await self.execute( - "ps", "--all", "--format=json", "--filter", f"id={container_id}" + "ps", "--all", "--format=json", "--filter", f"id={container_id}", user=user ) except PodmanException as ex: raise PodmanException( f"Failed getting information about container:" f" {container_id}." ) from ex containers = json.loads(stdout.decode()) - for container in containers: - if container["Id"] == container_id: - return container + # Return first container if found (filter by id should return only one) + if containers and len(containers) > 0: + return containers[0] return {} - async def start(self, container_id): + async def start(self, container_id, user=None): """Starts a container and return the returncode, stdout and stderr. :param str container_id: Container identification string to start. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout and stderr. """ try: - return await self.execute("start", container_id) + return await self.execute("start", container_id, user=user) except PodmanException as ex: raise PodmanException("Failed to start the container.") from ex - async def stop(self, container_id): + async def stop(self, container_id, user=None): """Stops a container and return the returncode, stdout and stderr. :param str container_id: Container identification string to stop. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout and stderr. """ try: - return await self.execute("stop", "-t=0", container_id) + return await self.execute("stop", "-t=0", container_id, user=user) except PodmanException as ex: raise PodmanException("Failed to stop the container.") from ex - async def restart(self, container_id, timeout=10): + async def restart(self, container_id, timeout=10, user=None): """Restarts a container and return the returncode, stdout and stderr. :param str container_id: Container identification string to restart. :param int timeout: Timeout in seconds to wait for container to stop before killing it (default: 10). + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout and stderr. """ try: - return await self.execute("restart", f"-t={timeout}", container_id) + return await self.execute("restart", f"-t={timeout}", container_id, user=user) except PodmanException as ex: raise PodmanException("Failed to restart the container.") from ex @@ -1084,16 +1454,18 @@ async def exec_command( env=None, interactive=False, tty=False, + podman_user=None, ): """Execute a command in a running container. :param str container_id: Container identification string. :param str or list command: Command to execute. Can be a string or list of arguments. - :param str user: Optional user to run the command as (e.g., "root", "1000", "1000:1000"). + :param str user: Optional user to run the command as inside container (e.g., "root", "1000", "1000:1000"). :param str workdir: Optional working directory for the command. :param dict env: Optional dictionary of environment variables to set. :param bool interactive: Keep STDIN open even if not attached (default: False). :param bool tty: Allocate a pseudo-TTY (default: False). + :param str podman_user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout and stderr. """ try: @@ -1124,7 +1496,7 @@ async def exec_command( else: raise PodmanException("Command must be a string or list") - return await self.execute(*cmd_args) + return await self.execute(*cmd_args, user=podman_user) except PodmanException as ex: raise PodmanException( f"Failed to execute command in container {container_id}." @@ -1137,6 +1509,7 @@ async def collect_container_aiu_metrics( dtcompiler_export_dir=None, stats_command="aiu-smi --csv", timeout=None, + user=None, ): """ Start collecting AIU metrics from a container in the background using nohup. @@ -1151,6 +1524,7 @@ async def collect_container_aiu_metrics( (default: "aiu-smi --csv"). :param int timeout: Optional timeout in seconds. If specified, stats collection will automatically stop after this duration. If None, collection runs indefinitely. + :param str user: Optional system user to run podman command as. :return: Process object for the background process. :rtype: subprocess.Popen """ @@ -1170,23 +1544,21 @@ async def collect_container_aiu_metrics( # Add the bash command to execute podman_cmd.extend(["bash", "--login", "-c", stats_command]) + # Build the command string using shlex.join to properly quote arguments + podman_cmd_str = shlex.join(podman_cmd) # Build the full command with optional timeout if timeout: # Use timeout command to automatically stop after specified duration - full_command = f"nohup bash -c 'timeout {timeout} {' '.join(podman_cmd)} | tee {output_file}' > /dev/null 2>&1 &" - LOG.info( - "Starting background stats collection with %d second timeout: %s", - timeout, - full_command, - ) + base_command = f"timeout {timeout} {podman_cmd_str} | tee {output_file}" else: # Run indefinitely without timeout - full_command = f"nohup bash -c '{' '.join(podman_cmd)} | tee {output_file}' > /dev/null 2>&1 &" - LOG.info( - "Starting background stats collection (no timeout): %s", - full_command, - ) - + base_command = f"{podman_cmd_str} | tee {output_file}" + # Wrap with su if user is specified + if user and user != "root": + full_command = f"su - {user} -c {shlex.quote(f'nohup bash -c {shlex.quote(base_command)} > /dev/null 2>&1 &')}" + else: + full_command = f"nohup bash -c {shlex.quote(base_command)} > /dev/null 2>&1 &" + LOG.info("Starting background stats collection: %s", full_command) # Execute the command in the background process = subprocess.Popen( full_command, @@ -1450,148 +1822,29 @@ async def send_vllm_inference_request( LOG.error("%s: %s", error_msg, ex) raise PodmanException(error_msg) from ex - async def run_vllm_container( - self, - image, - aiu_ids, - host_models_dir, - vllm_model_path, - aiu_world_size, - max_model_len, - max_batch_size, - memory="100G", - shm_size="2G", - device="/dev/vfio", - privileged="true", - pids_limit="0", - userns="keep-id", - group_add="keep-groups", - port_mapping="127.0.0.1::8000", - vllm_spyre_use_cb="1", - vllm_dt_chunk_len=None, - vllm_spyre_use_chunked_prefill=None, - enable_prefix_caching=True, - additional_vllm_args=None, - container_name=None, - ): - """Run a VLLM container with AIU support. - - :param str image: Container image to run. - :param str aiu_ids: Space-separated AIU PCIe IDs. - :param str host_models_dir: Host directory containing models. - :param str vllm_model_path: Path to model inside container. - :param int aiu_world_size: AIU world size (tensor parallelism). - :param int max_model_len: Maximum model length. - :param int max_batch_size: Maximum batch size. - :param str memory: Memory limit (default: "100G" - adjust based on model size). - :param str shm_size: Shared memory size (default: "2G" - increase for larger batches). - :param str device: Device to mount (default: "/dev/vfio"). - :param str privileged: Run in privileged mode (default: "true"). - :param str pids_limit: PIDs limit (default: "0" - unlimited). - :param str userns: User namespace mode (default: "keep-id"). - :param str group_add: Group add mode (default: "keep-groups"). - :param str port_mapping: Port mapping in format - ``[host_ip]:[host_port]:container_port`` (default: - ``"127.0.0.1::8000"`` - localhost with random host port to - container port 8000). Examples: - ``"127.0.0.1:8000:8000"`` (fixed host port 8000), - ``"0.0.0.0::8000"`` (all interfaces, random host port), - ``"::8000"`` (all interfaces, random host port). - :param str vllm_spyre_use_cb: Use CB flag (default: "1"). - :param int vllm_dt_chunk_len: Optional DT chunk length. - :param int vllm_spyre_use_chunked_prefill: Optional chunked prefill flag. - :param bool enable_prefix_caching: Enable VLLM prefix caching (default: True). - :param list additional_vllm_args: Additional VLLM command-line arguments as list of strings. - :param str container_name: Optional container name. - :rtype: tuple with returncode, stdout (container ID), stderr. - """ - cmd_args = [ - "run", - "-d", - "-it", - f"--device={device}", - "-v", - f"{host_models_dir}:/models", - "-e", - f"AIU_PCIE_IDS={aiu_ids}", - "-e", - f"VLLM_SPYRE_USE_CB={vllm_spyre_use_cb}", - f"--privileged={privileged}", - "--pids-limit", - pids_limit, - f"--userns={userns}", - f"--group-add={group_add}", - "--memory", - memory, - "--shm-size", - shm_size, - "-p", - port_mapping, - ] - - if vllm_dt_chunk_len is not None: - cmd_args.extend(["-e", f"VLLM_DT_CHUNK_LEN={vllm_dt_chunk_len}"]) - if vllm_spyre_use_chunked_prefill is not None: - cmd_args.extend( - [ - "-e", - f"VLLM_SPYRE_USE_CHUNKED_PREFILL={vllm_spyre_use_chunked_prefill}", - ] - ) - - if container_name: - cmd_args.extend(["--name", container_name]) - - cmd_args.extend( - [ - image, - "--model", - vllm_model_path, - "-tp", - str(aiu_world_size), - "--max-model-len", - str(max_model_len), - "--max-num-seqs", - str(max_batch_size), - ] - ) - - if enable_prefix_caching: - cmd_args.append("--enable-prefix-caching") - - if additional_vllm_args: - if isinstance(additional_vllm_args, list): - cmd_args.extend(additional_vllm_args) - else: - raise PodmanException( - f"additional_vllm_args must be a list, got {type(additional_vllm_args).__name__}" - ) - - try: - return await self.execute(*cmd_args) - except PodmanException as ex: - raise PodmanException("Failed to run VLLM container.") from ex - async def list_containers(self, all_containers=True): + async def list_containers(self, all_containers=True, user=None): """List containers. :param bool all_containers: If True, list all containers including stopped ones. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout (JSON list), stderr. """ try: args = ["ps", "--format=json"] if all_containers: args.append("--all") - return await self.execute(*args) + return await self.execute(*args, user=user) except PodmanException as ex: raise PodmanException("Failed to list containers.") from ex - async def logs(self, container_id, follow=False, tail=None): + async def logs(self, container_id, follow=False, tail=None, user=None): """Get container logs. :param str container_id: Container identification string. :param bool follow: If True, follow log output. :param int tail: Number of lines to show from the end of the logs. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout, stderr. """ try: @@ -1601,30 +1854,32 @@ async def logs(self, container_id, follow=False, tail=None): if tail is not None: args.extend(["--tail", str(tail)]) args.append(container_id) - return await self.execute(*args) + return await self.execute(*args, user=user) except PodmanException as ex: raise PodmanException( f"Failed to get logs for container {container_id}." ) from ex - async def inspect(self, container_id): + async def inspect(self, container_id, user=None): """Inspect a container and return detailed information. :param str container_id: Container identification string. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout (JSON), stderr. """ try: - return await self.execute("inspect", container_id) + return await self.execute("inspect", container_id, user=user) except PodmanException as ex: raise PodmanException( f"Failed to inspect container {container_id}." ) from ex - async def remove(self, container_id, force=False): + async def remove(self, container_id, force=False, user=None): """Remove a container. :param str container_id: Container identification string. :param bool force: If True, force removal of running container. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout, stderr. """ try: @@ -1632,7 +1887,7 @@ async def remove(self, container_id, force=False): if force: args.append("--force") args.append(container_id) - return await self.execute(*args) + return await self.execute(*args, user=user) except PodmanException as ex: raise PodmanException(f"Failed to remove container {container_id}.") from ex @@ -1644,6 +1899,7 @@ async def login( api_key=None, api_key_username="iamapikey", password_stdin=False, + user=None, ): """Login to a container registry. @@ -1654,6 +1910,7 @@ async def login( :param str api_key_username: Username to use with API key authentication (default: "iamapikey" for IBM Cloud). Other registries may use different conventions (e.g., "oauth2accesstoken" for GCR). :param bool password_stdin: If True, read password from stdin. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout, stderr. """ try: @@ -1676,26 +1933,28 @@ async def login( ) args.append(registry) - return await self.execute(*args) + return await self.execute(*args, user=user) except PodmanException as ex: raise PodmanException(f"Failed to login to registry {registry}.") from ex - async def pull(self, image): + async def pull(self, image, user=None): """Pull an image from a registry. :param str image: Image name to pull. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout, stderr. """ try: - return await self.execute("pull", image) + return await self.execute("pull", image, user=user) except PodmanException as ex: raise PodmanException(f"Failed to pull image {image}.") from ex - async def stats(self, container_id, no_stream=True): + async def stats(self, container_id, no_stream=True, user=None): """Get container resource usage statistics. :param str container_id: Container identification string. :param bool no_stream: If True, output stats once and exit. + :param str user: Optional system user to run podman command as. :rtype: tuple with returncode, stdout, stderr. """ try: @@ -1703,244 +1962,65 @@ async def stats(self, container_id, no_stream=True): if no_stream: args.append("--no-stream") args.append(container_id) - return await self.execute(*args) + return await self.execute(*args, user=user) except PodmanException as ex: raise PodmanException( f"Failed to get stats for container {container_id}." ) from ex - async def run_multiple_vllm_containers( - self, - num_containers, - image, - aiu_ids_list, - host_models_dir, - vllm_model_path, - aiu_world_size, - max_model_len, - max_batch_size, - memory="100G", - shm_size="2G", - device="/dev/vfio", - privileged="true", - pids_limit="0", - userns="keep-id", - group_add="keep-groups", - port_mapping="127.0.0.1::8000", - vllm_spyre_use_cb="1", - vllm_dt_chunk_len=None, - vllm_spyre_use_chunked_prefill=None, - enable_prefix_caching=True, - additional_vllm_args=None, - container_name_prefix="vllm-container", - ): - """Run multiple VLLM containers concurrently with different AIU IDs. - - :param int num_containers: Number of containers to create. - :param str image: Container image to run. - :param list aiu_ids_list: List of AIU PCIe IDs for each container. - Must provide exactly num_containers AIU ID sets. - :param str host_models_dir: Host directory containing models. - :param str vllm_model_path: Path to model inside container. - :param int aiu_world_size: AIU world size (tensor parallelism). - :param int max_model_len: Maximum model length. - :param int max_batch_size: Maximum batch size. - :param str memory: Memory limit (default: "100G"). - :param str shm_size: Shared memory size (default: "2G"). - :param str device: Device to mount (default: "/dev/vfio"). - :param str privileged: Run in privileged mode (default: "true"). - :param str pids_limit: PIDs limit (default: "0"). - :param str userns: User namespace mode (default: "keep-id"). - :param str group_add: Group add mode (default: "keep-groups"). - :param str port_mapping: Port mapping (default: "127.0.0.1::8000"). - :param str vllm_spyre_use_cb: Use CB flag (default: "1"). - :param int vllm_dt_chunk_len: Optional DT chunk length. - :param int vllm_spyre_use_chunked_prefill: Optional chunked prefill flag. - :param bool enable_prefix_caching: Enable VLLM prefix caching (default: True). - :param list additional_vllm_args: Additional VLLM command-line arguments as list of strings. - :param str container_name_prefix: Prefix for container names. - :rtype: list of tuples (container_name, returncode, stdout, stderr). + async def get_container_port(self, container_id, port=8000, user=None): """ - if not isinstance(aiu_ids_list, list): - raise PodmanException( - f"aiu_ids_list must be a list, got {type(aiu_ids_list).__name__}" - ) - - if len(aiu_ids_list) != num_containers: - raise PodmanException( - f"Number of AIU ID sets ({len(aiu_ids_list)}) must match " - f"number of containers ({num_containers}). " - f"Each container requires its own unique AIU IDs." - ) - - container_names = [] - tasks = [] - for i in range(num_containers): - container_name = f"{container_name_prefix}-{i}" - container_names.append(container_name) - task = self.run_vllm_container( - image=image, - aiu_ids=aiu_ids_list[i], - host_models_dir=host_models_dir, - vllm_model_path=vllm_model_path, - aiu_world_size=aiu_world_size, - max_model_len=max_model_len, - max_batch_size=max_batch_size, - memory=memory, - shm_size=shm_size, - device=device, - privileged=privileged, - pids_limit=pids_limit, - userns=userns, - group_add=group_add, - port_mapping=port_mapping, - vllm_spyre_use_cb=vllm_spyre_use_cb, - vllm_dt_chunk_len=vllm_dt_chunk_len, - vllm_spyre_use_chunked_prefill=vllm_spyre_use_chunked_prefill, - enable_prefix_caching=enable_prefix_caching, - additional_vllm_args=additional_vllm_args, - container_name=container_name, - ) - tasks.append(task) - - task_results = await asyncio.gather(*tasks, return_exceptions=True) - - results = [] - for container_name, result in zip(container_names, task_results): - if isinstance(result, Exception): - LOG.error( - "Failed to create container %s: %s", container_name, str(result) - ) - results.append((container_name, None, None, str(result))) - elif isinstance(result, tuple) and len(result) == 3: - returncode, stdout, stderr = result - results.append((container_name, returncode, stdout, stderr)) - LOG.info("Container %s created successfully", container_name) - else: - LOG.error( - "Unexpected result type for container %s: %s", - container_name, - type(result), - ) - results.append((container_name, None, None, "Unexpected result type")) - - return results - - async def start_multiple_containers(self, container_ids): - """Start multiple containers concurrently. + Get the actual host port mapped to a container port. - :param list container_ids: List of container IDs to start. - :rtype: list of tuples (container_id, returncode, stdout, stderr). + :param container_id: Container ID + :param port: Container port to check (default: 8000) + :param user: Username if container was created by specific user + :return: Host port number or None """ - tasks = [self.start(container_id) for container_id in container_ids] - results = await asyncio.gather(*tasks, return_exceptions=True) - - output = [] - for container_id, result in zip(container_ids, results): - if isinstance(result, Exception): - LOG.error("Failed to start container %s: %s", container_id, str(result)) - output.append((container_id, None, None, str(result))) - elif isinstance(result, tuple) and len(result) == 3: - returncode, stdout, stderr = result - output.append((container_id, returncode, stdout, stderr)) - LOG.info("Container %s started successfully", container_id) - else: - LOG.error( - "Unexpected result type for container %s: %s", - container_id, - type(result), - ) - output.append((container_id, None, None, "Unexpected result type")) - - return output + return get_container_port(container_id, port, user, LOG) - async def stop_multiple_containers(self, container_ids): - """Stop multiple containers concurrently. - - :param list container_ids: List of container IDs to stop. - :rtype: list of tuples (container_id, returncode, stdout, stderr). + async def save_container_logs(self, container_id, log_dir, test_name="test", user=None): """ - tasks = [self.stop(container_id) for container_id in container_ids] - results = await asyncio.gather(*tasks, return_exceptions=True) - - output = [] - for container_id, result in zip(container_ids, results): - if isinstance(result, Exception): - LOG.error("Failed to stop container %s: %s", container_id, str(result)) - output.append((container_id, None, None, str(result))) - elif isinstance(result, tuple) and len(result) == 3: - returncode, stdout, stderr = result - output.append((container_id, returncode, stdout, stderr)) - LOG.info("Container %s stopped successfully", container_id) - else: - LOG.error( - "Unexpected result type for container %s: %s", - container_id, - type(result), - ) - output.append((container_id, None, None, "Unexpected result type")) - - return output + Save complete container logs to a file. - async def remove_multiple_containers(self, container_ids, force=False): - """Remove multiple containers concurrently. - - :param list container_ids: List of container IDs to remove. - :param bool force: If True, force removal of running containers. - :rtype: list of tuples (container_id, returncode, stdout, stderr). + :param container_id: Container ID + :param log_dir: Directory to save logs + :param test_name: Test name for log file naming (default: "test") + :param user: Username if container was created by specific user + :return: Path to saved log file or None """ - tasks = [ - self.remove(container_id, force=force) for container_id in container_ids - ] - results = await asyncio.gather(*tasks, return_exceptions=True) - - output = [] - for container_id, result in zip(container_ids, results): - if isinstance(result, Exception): - LOG.error( - "Failed to remove container %s: %s", container_id, str(result) - ) - output.append((container_id, None, None, str(result))) - elif isinstance(result, tuple) and len(result) == 3: - returncode, stdout, stderr = result - output.append((container_id, returncode, stdout, stderr)) - LOG.info("Container %s removed successfully", container_id) - else: - LOG.error( - "Unexpected result type for container %s: %s", - container_id, - type(result), - ) - output.append((container_id, None, None, "Unexpected result type")) - - return output + return save_container_logs(container_id, log_dir, test_name, user, LOG) - async def get_multiple_container_logs(self, container_ids, tail=None): - """Get logs from multiple containers concurrently. - :param list container_ids: List of container IDs. - :param int tail: Number of lines to show from the end of the logs. - :rtype: list of tuples (container_id, returncode, stdout, stderr). + async def wait_for_vllm_startup( + self, + container_id, + success_pattern="Application startup complete.", + failure_pattern=None, + additional_failure_checks=None, + timeout=300, + check_interval=10, + user=None, + show_live_logs=True, + live_log_lines=10 + ): """ - tasks = [self.logs(container_id, tail=tail) for container_id in container_ids] - results = await asyncio.gather(*tasks, return_exceptions=True) - - output = [] - for container_id, result in zip(container_ids, results): - if isinstance(result, Exception): - LOG.error( - "Failed to get logs for container %s: %s", container_id, str(result) - ) - output.append((container_id, None, None, str(result))) - elif isinstance(result, tuple) and len(result) == 3: - returncode, stdout, stderr = result - output.append((container_id, returncode, stdout, stderr)) - else: - LOG.error( - "Unexpected result type for container %s: %s", - container_id, - type(result), - ) - output.append((container_id, None, None, "Unexpected result type")) - - return output + Async method: Wait for container to start by checking logs for a success pattern. + + This is a generic async method that can be used for any container type. + The success and failure patterns can be customized based on the application. + + :param container_id: Container ID to monitor + :param success_pattern: String pattern to look for in logs indicating successful startup + :param failure_pattern: Optional string pattern indicating startup failure (e.g., "BACKTRACE") + :param additional_failure_checks: Optional list of tuples [(pattern, case_sensitive), ...] + for additional failure detection. Example: + [("VFIO", False), ("fail", False)] checks for "VFIO" and "fail" + :param timeout: Maximum time to wait in seconds (default: 300) + :param check_interval: Time between log checks in seconds (default: 10) + :param user: Username if container was created by specific user + :param show_live_logs: If True, display recent log lines during each check (default: True) + :param live_log_lines: Number of recent log lines to display (default: 10) + :return: True if startup successful, False otherwise + """ + return wait_for_vllm_startup(container_id, success_pattern, failure_pattern, additional_failure_checks, timeout, check_interval, user, show_live_logs, live_log_lines)