diff --git a/mpflash/ask_input.py b/mpflash/ask_input.py index af2c6f4..9ceea7e 100644 --- a/mpflash/ask_input.py +++ b/mpflash/ask_input.py @@ -113,6 +113,9 @@ def _ask_select( def ask_missing_params( params: ParamType, + *, + ask_board: bool = True, + ask_for_port: bool = False, ) -> ParamType: """ Asks the user for parameters not supplied on the command line and returns updated params. @@ -139,9 +142,10 @@ def ask_missing_params( variant_unknown = isinstance(params, FlashParams) and params.variant == "?" needs_serial = not multi_select and (not params.serial or "?" in params.serial) needs_versions = params.versions == [] or "?" in params.versions - needs_board = not params.boards or "?" in params.boards or variant_unknown + needs_board = ask_board and (not params.boards or "?" in params.boards or variant_unknown) + needs_port = isinstance(params, FlashParams) and ask_for_port and (not params.ports or "?" in params.ports) - if not (needs_serial or needs_versions or needs_board): + if not (needs_serial or needs_versions or needs_board or needs_port): return params from rich.console import Console @@ -173,6 +177,13 @@ def ask_missing_params( else: answers["versions"] = params.versions # type: ignore + # Port only (direct firmware mode where board selection is intentionally skipped) + if needs_port and not needs_board: + port = ask_mp_port(action=action) + if not port: + return [] # type: ignore + answers["port"] = port + # Port, board(s), and variant if needs_board: port, boards, variant = ask_port_board_variant( @@ -507,6 +518,21 @@ def ask_serialport( return result if result else None +def ask_mp_port(*, action: str) -> Optional[str]: + """Ask for MicroPython port using tab-completion.""" + ports = known_ports() + if not ports: + log.warning("No known ports found in database.") + return None + port_meta = _port_meta() + result = _ask_with_completion( + f"Port to {action}?", + ports, + meta=port_meta, + ) + return result if result and result in ports else None + + # --------------------------------------------------------------------------- # Private helpers # --------------------------------------------------------------------------- diff --git a/mpflash/cli_flash.py b/mpflash/cli_flash.py index c038b71..c4dc2b9 100644 --- a/mpflash/cli_flash.py +++ b/mpflash/cli_flash.py @@ -1,5 +1,9 @@ +from dataclasses import dataclass from pathlib import Path +from tempfile import NamedTemporaryFile from typing import List +from urllib.parse import unquote, urlparse +import re import rich_click as click from loguru import logger as log @@ -13,6 +17,19 @@ # CLI # ######################################################################################################### +DOWNLOAD_CHUNK_SIZE = 64 * 1024 +DOWNLOAD_TIMEOUT = (10, 120) + + +@dataclass +class DirectFirmwareInfo: + firmware_path: Path + source_hint: str + inferred_port: str + inferred_board: str + inferred_variant: str + cleanup_path: Path | None = None + @cli.command( "flash", @@ -115,12 +132,31 @@ help="""How to enter the (MicroPython) bootloader before flashing.""", ) @click.option( - "--force", + "--file", "-f", + "firmware_file", + type=click.Path(exists=True, dir_okay=False, path_type=Path), + default=None, + show_default=False, + help="Flash directly from a local firmware file.", + metavar="FIRMWARE_FILE", +) +@click.option( + "--url", + "-u", + "firmware_url", + default="", + show_default=False, + help="Flash directly from a firmware URL.", + metavar="URL", +) +@click.option( + "--force", + "-F", default=False, is_flag=True, show_default=True, - help="""Force download of firmware even if it already exists.""", + help="""Force download of firmware even if it already exists (no short flag; -f is used by --file).""", ) @click.option( "--flash_mode", @@ -169,7 +205,7 @@ def cli_flash_board(**kwargs) -> int: from mpflash.connected import connected_ports_boards_variants from mpflash.list import show_mcus from mpflash.flash import flash_tasks - from mpflash.flash.worklist import FlashTaskList, create_worklist + from mpflash.flash.worklist import FlashTask, FlashTaskList, create_worklist from mpflash.mpremoteboard import MPRemoteBoard def _create_worklist_or_fail(*create_args, **create_kwargs) -> FlashTaskList: @@ -183,6 +219,158 @@ def _create_worklist_or_fail(*create_args, **create_kwargs) -> FlashTaskList: "Try specifying both --board and --serial, or use '--serial *' to auto-detect ports." ) from None + def _split_board_variant(board_id: str, port: str = "") -> tuple[str, str]: + """Split board+variant identifiers where variant suffixes are expected.""" + if port != "esp32" or "-" not in board_id: + return board_id, "" + # ESP32 board IDs use board-variant naming in the upstream board database. + board, variant = board_id.split("-", 1) + return board, variant + + def _requires_board_prompt_for_direct_flash(inferred_port: str, inferred_board: str, inferred_variant: str) -> bool: + """Return True when direct firmware inference is insufficient for safe flashing.""" + if not inferred_board: + return True + if inferred_port == "esp32" and not inferred_variant: + return True + return False + + def _infer_port_board_variant(source_hint: str) -> tuple[str, str, str]: + """Infer (port, board, variant) from a firmware path or URL.""" + from mpflash.custom.naming import port_and_boardid_from_path + + parsed = urlparse(source_hint) + inspect_path = unquote(parsed.path) if parsed.scheme in {"http", "https"} else source_hint + normalized = inspect_path.replace("\\", "/") + + build_match = re.search(r"/ports/([^/]+)/build-([^/]+)/", normalized, re.IGNORECASE) + if build_match: + port = build_match.group(1).lower() + board, variant = _split_board_variant(build_match.group(2), port=port) + return port, board, variant + + detected_port, detected_board = port_and_boardid_from_path(Path(normalized)) + if not detected_port: + return "", "", "" + if detected_board: + board, variant = _split_board_variant(detected_board, port=detected_port) + else: + board, variant = "", "" + return detected_port, board, variant + + def _download_firmware_url(url: str) -> Path: + """Download a firmware URL to a temporary file and return its path. + + Temporary files are cleaned by the caller in a finally block after flashing. + """ + import requests + + parsed = urlparse(url) + if parsed.scheme not in {"http", "https"}: + raise click.UsageError("Invalid --url. Only http:// or https:// URLs are supported.") + + suffix = Path(unquote(parsed.path)).suffix or ".bin" + temp_file_path: Path | None = None + try: + # delete=False so we can pass a stable path into flash task creation. + with NamedTemporaryFile(prefix="mpflash-fw-", suffix=suffix, delete=False) as temp_file: + temp_file_path = Path(temp_file.name) + response = requests.get(url, stream=True, timeout=DOWNLOAD_TIMEOUT) + response.raise_for_status() + for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE): + if chunk: + temp_file.write(chunk) + return temp_file_path + except requests.RequestException as exc: + if temp_file_path and temp_file_path.exists(): + temp_file_path.unlink(missing_ok=True) + raise click.UsageError(f"Failed to download firmware from URL: {exc}") from None + + def _prepare_direct_firmware(path: Path | None, url: str) -> DirectFirmwareInfo | None: + """Prepare direct firmware source and inferred board metadata.""" + if not path and not url: + return None + if path and url: + raise click.UsageError("Use either --file or --url, not both.") + if path: + local_path = path.expanduser().resolve() + if not local_path.exists() or not local_path.is_file(): + raise click.UsageError(f"Firmware file not found: {local_path}") + source_hint = local_path.as_posix() + inferred_port, inferred_board, inferred_variant = _infer_port_board_variant(source_hint) + return DirectFirmwareInfo( + firmware_path=local_path, + source_hint=source_hint, + inferred_port=inferred_port, + inferred_board=inferred_board, + inferred_variant=inferred_variant, + ) + downloaded = _download_firmware_url(url) + inferred_port, inferred_board, inferred_variant = _infer_port_board_variant(url) + return DirectFirmwareInfo( + firmware_path=downloaded, + source_hint=url, + inferred_port=inferred_port, + inferred_board=inferred_board, + inferred_variant=inferred_variant, + cleanup_path=downloaded, + ) + + def _attach_direct_firmware( + tasks: FlashTaskList, + firmware_path: Path, + source: str, + version: str, + *, + fallback_port: str = "", + fallback_board: str = "", + ) -> None: + """Attach a direct firmware file to all tasks.""" + from mpflash.db.models import Firmware + + for task in tasks: + if fallback_port and not task.board.port: + task.board.port = fallback_port + if fallback_board and not task.board.board_id: + task.board.board_id = fallback_board + + board_id = task.board.board_id or task.board.board or firmware_path.stem + task.firmware = Firmware( + board_id=board_id, + version=version, + port=task.board.port or fallback_port, + firmware_file=firmware_path.as_posix(), + source=source, + custom=False, + ) + + def _create_direct_firmware_tasks(params: FlashParams, detected_boards: List[MPRemoteBoard]) -> FlashTaskList: + """Create flash tasks for direct firmware without requiring board DB lookup.""" + comports = filtered_comports( + ignore=params.ignore, + include=params.serial, + bluetooth=params.bluetooth, + ) + if not comports: + serial_filter = ", ".join(params.serial) + raise click.UsageError( + f"No serial ports matched: {serial_filter}. Check the port name, or use '--serial *' to auto-detect." + ) + + by_serial = {board.serialport: board for board in detected_boards} + tasks: FlashTaskList = [] + for serial in comports: + board = by_serial.get(serial) or MPRemoteBoard(serial) + if params.ports and not board.port: + board.port = params.ports[0] + if params.boards and not board.board_id: + board.board_id = f"{params.boards[0]}-{params.variant}" if params.variant else params.boards[0] + tasks.append(FlashTask(board=board, firmware=None)) + return tasks + + firmware_file = kwargs.pop("firmware_file", None) + firmware_url = kwargs.pop("firmware_url", "") + # version to versions, board to boards kwargs["versions"] = [kwargs.pop("version")] if kwargs["version"] is not None else [] if kwargs["board"] is None: @@ -199,138 +387,174 @@ def _create_worklist_or_fail(*create_args, **create_kwargs) -> FlashTaskList: params.ignore = list(params.ignore) params.volumes = list(params.volumes) params.bootloader = BootloaderMethod(params.bootloader) + direct_firmware = _prepare_direct_firmware(firmware_file, firmware_url) + try: + if direct_firmware: + params.custom = True + if not params.ports and direct_firmware.inferred_port: + params.ports = [direct_firmware.inferred_port] + if not params.boards: + if not _requires_board_prompt_for_direct_flash( + direct_firmware.inferred_port, + direct_firmware.inferred_board, + direct_firmware.inferred_variant, + ): + params.boards = [direct_firmware.inferred_board] + if direct_firmware.inferred_variant and not params.variant: + params.variant = direct_firmware.inferred_variant - # make it simple for the user to flash one board by asking for the serial port if not specified - if params.boards == ["?"] or params.serial == "?": # or params.variant == "?": - params.serial = ["?"] - # if params.variant == "?": - # params.boards = ["?"] # trigger full interactive board+variant flow - if params.boards == ["*"]: - # No bard specified - params.boards = ["?"] - - # Detect connected boards if not specified, - # and ask for input if boards cannot be detected - all_boards = [] - if not params.boards: - # nothing specified - detect connected boards - params.ports, params.boards, variants, all_boards = connected_ports_boards_variants( - include=params.ports, - ignore=params.ignore, - bluetooth=params.bluetooth, - ) - if variants and len(variants) >= 1: - params.variant = variants[0] - if params.boards == []: - # No MicroPython boards detected, but it could be unflashed or in bootloader mode - # Ask for serial port and board_id to flash + # make it simple for the user to flash one board by asking for the serial port if not specified + if params.boards == ["?"] or params.serial == "?": # or params.variant == "?": params.serial = ["?"] - params.boards = ["?"] - # assume manual mode if no board is detected - params.bootloader = BootloaderMethod("manual") - else: - mpboard_id.resolve_board_ids(params) - - # Ask for missing input if needed - params = ask_missing_params(params) - if not params: # Cancelled by user - return 2 - assert isinstance(params, FlashParams) - - if len(params.versions) > 1: - log.error(f"Only one version can be flashed at a time, not {params.versions}") - raise MPFlashError("Only one version can be flashed at a time") - - params.versions = [clean_version(v) for v in params.versions] - tasks = [] - - # Normalize volume paths: accept both Windows backslashes and POSIX forward slashes - if params.volumes: - params.volumes = [str(Path(v)) for v in params.volumes] - - if params.volumes: - # Explicit UF2 mount path(s) for boards already in bootloader mode. - if not params.boards: - raise click.UsageError("--volume requires --board so firmware can be selected") - board_id = f"{params.boards[0]}-{params.variant}" if params.variant else params.boards[0] - tasks = _create_worklist_or_fail( - params.versions[0], - serial_ports=params.volumes, - board_id=board_id, - custom_firmware=params.custom, - port=params.ports[0] if params.ports else None, - ) - if any(task.board.port not in UF2_PORTS for task in tasks): - raise click.UsageError(f"--volume is only supported for UF2-capable ports ({', '.join(sorted(UF2_PORTS))})") - elif len(params.versions) == 1 and len(params.boards) == 1 and params.serial == ["*"]: - # One or more serial ports including the board / variant (auto-detect ports) - comports = filtered_comports( - ignore=params.ignore, - include=params.serial, - bluetooth=params.bluetooth, - ) - board_id = f"{params.boards[0]}-{params.variant}" if params.variant else params.boards[0] - log.info(f"Flashing {board_id} {params.versions[0]} to {len(comports)} serial ports") - log.info(f"Target ports: {', '.join(comports)}") - tasks = _create_worklist_or_fail( - params.versions[0], - serial_ports=comports, - board_id=board_id, - custom_firmware=params.custom, - port=params.ports[0] if params.ports else None, - ) - elif params.serial == ["*"] and params.boards: - # Auto mode on detected boards with optional include/ignore filtering - if not all_boards: - log.trace("No boards detected yet, scanning for connected boards") - _, _, _, all_boards = connected_ports_boards_variants(include=params.ports, ignore=params.ignore) - if params.variant: - for b in all_boards: - b.variant = params.variant if (params.variant.lower() not in {"-", "none"}) else "" - tasks = _create_worklist_or_fail( - params.versions[0], - connected_comports=all_boards, - include_ports=params.serial, - ignore_ports=params.ignore, - ) - elif params.versions[0] and params.boards and params.serial: - # Manual specification of serial ports + board - comports = filtered_comports( - ignore=params.ignore, - include=params.serial, - bluetooth=params.bluetooth, - ) - if not comports: - serial_filter = ", ".join(params.serial) - raise click.UsageError(f"No serial ports matched: {serial_filter}. Check the port name, or use '--serial *' to auto-detect.") - board_id = f"{params.boards[0]}-{params.variant}" if params.variant else params.boards[0] - tasks = _create_worklist_or_fail( - params.versions[0], - serial_ports=comports, - board_id=board_id, - port=params.ports[0] if params.ports else None, - ) - else: - # Single serial port auto-detection - connected_comports = [MPRemoteBoard(params.serial[0])] - tasks = _create_worklist_or_fail( - params.versions[0], - connected_comports=connected_comports, + # if params.variant == "?": + # params.boards = ["?"] # trigger full interactive board+variant flow + if params.boards == ["*"]: + # No board specified + params.boards = ["?"] + + # Detect connected boards if not specified, + # and ask for input if boards cannot be detected. + # In direct firmware mode, avoid inferring board/port from current firmware; + # ask for missing inputs explicitly instead. + all_boards = [] + if not params.boards and not direct_firmware: + # nothing specified - detect connected boards + params.ports, params.boards, variants, all_boards = connected_ports_boards_variants( + include=params.ports, + ignore=params.ignore, + bluetooth=params.bluetooth, + ) + if variants and len(variants) >= 1: + params.variant = variants[0] + if params.boards == []: + # No MicroPython boards detected, but it could be unflashed or in bootloader mode + # Ask for serial port to flash. For direct firmware, board_id is optional. + params.serial = ["?"] + if not direct_firmware: + params.boards = ["?"] + # assume manual mode if no board is detected + params.bootloader = BootloaderMethod("manual") + elif params.boards: + mpboard_id.resolve_board_ids(params) + + # Ask for missing input if needed + params = ask_missing_params( + params, + ask_board=not bool(direct_firmware), + ask_for_port=bool(direct_firmware and not params.ports), ) - if not params.custom: - jid.ensure_firmware_downloaded_tasks(tasks, version=params.versions[0], force=params.force) - if flashed := flash_tasks( - tasks, - params.erase, - params.bootloader, - flash_mode=params.flash_mode, - retry_on_error=params.retry_on_error, - retry_baud=params.retry_baud, - retry_flash_mode=params.retry_flash_mode, - ): - log.info(f"Flashed {len(flashed)} boards") - show_mcus(flashed, title="Updated boards after flashing") - return 0 - else: - log.error("No boards were flashed") - return 1 + if not params: # Cancelled by user + return 2 + assert isinstance(params, FlashParams) + + if len(params.versions) > 1: + log.error(f"Only one version can be flashed at a time, not {params.versions}") + raise MPFlashError("Only one version can be flashed at a time") + + params.versions = [clean_version(v) for v in params.versions] + tasks = [] + + # Normalize volume paths: accept both Windows backslashes and POSIX forward slashes + if params.volumes: + params.volumes = [str(Path(v)) for v in params.volumes] + + if direct_firmware and not params.volumes: + tasks = _create_direct_firmware_tasks(params, all_boards) + elif params.volumes: + # Explicit UF2 mount path(s) for boards already in bootloader mode. + if not params.boards: + raise click.UsageError("--volume requires --board so firmware can be selected") + board_id = f"{params.boards[0]}-{params.variant}" if params.variant else params.boards[0] + tasks = _create_worklist_or_fail( + params.versions[0], + serial_ports=params.volumes, + board_id=board_id, + custom_firmware=params.custom, + port=params.ports[0] if params.ports else None, + ) + if any(task.board.port not in UF2_PORTS for task in tasks): + raise click.UsageError(f"--volume is only supported for UF2-capable ports ({', '.join(sorted(UF2_PORTS))})") + elif len(params.versions) == 1 and len(params.boards) == 1 and params.serial == ["*"]: + # One or more serial ports including the board / variant (auto-detect ports) + comports = filtered_comports( + ignore=params.ignore, + include=params.serial, + bluetooth=params.bluetooth, + ) + board_id = f"{params.boards[0]}-{params.variant}" if params.variant else params.boards[0] + log.info(f"Flashing {board_id} {params.versions[0]} to {len(comports)} serial ports") + log.info(f"Target ports: {', '.join(comports)}") + tasks = _create_worklist_or_fail( + params.versions[0], + serial_ports=comports, + board_id=board_id, + custom_firmware=params.custom, + port=params.ports[0] if params.ports else None, + ) + elif params.serial == ["*"] and params.boards: + # Auto mode on detected boards with optional include/ignore filtering + if not all_boards: + log.trace("No boards detected yet, scanning for connected boards") + _, _, _, all_boards = connected_ports_boards_variants(include=params.ports, ignore=params.ignore) + if params.variant: + for b in all_boards: + b.variant = params.variant if (params.variant.lower() not in {"-", "none"}) else "" + tasks = _create_worklist_or_fail( + params.versions[0], + connected_comports=all_boards, + include_ports=params.serial, + ignore_ports=params.ignore, + ) + elif params.versions[0] and params.boards and params.serial: + # Manual specification of serial ports + board + comports = filtered_comports( + ignore=params.ignore, + include=params.serial, + bluetooth=params.bluetooth, + ) + if not comports: + serial_filter = ", ".join(params.serial) + raise click.UsageError(f"No serial ports matched: {serial_filter}. Check the port name, or use '--serial *' to auto-detect.") + board_id = f"{params.boards[0]}-{params.variant}" if params.variant else params.boards[0] + tasks = _create_worklist_or_fail( + params.versions[0], + serial_ports=comports, + board_id=board_id, + port=params.ports[0] if params.ports else None, + ) + else: + # Single serial port auto-detection + connected_comports = [MPRemoteBoard(params.serial[0])] + tasks = _create_worklist_or_fail( + params.versions[0], + connected_comports=connected_comports, + ) + if direct_firmware: + _attach_direct_firmware( + tasks, + direct_firmware.firmware_path, + direct_firmware.source_hint, + params.versions[0], + fallback_port=params.ports[0] if params.ports else "", + fallback_board=params.boards[0] if params.boards else "", + ) + elif not params.custom: + jid.ensure_firmware_downloaded_tasks(tasks, version=params.versions[0], force=params.force) + if flashed := flash_tasks( + tasks, + params.erase, + params.bootloader, + flash_mode=params.flash_mode, + retry_on_error=params.retry_on_error, + retry_baud=params.retry_baud, + retry_flash_mode=params.retry_flash_mode, + ): + log.info(f"Flashed {len(flashed)} boards") + show_mcus(flashed, title="Updated boards after flashing") + return 0 + else: + log.error("No boards were flashed") + return 1 + finally: + if direct_firmware and direct_firmware.cleanup_path and direct_firmware.cleanup_path.exists(): + direct_firmware.cleanup_path.unlink(missing_ok=True) diff --git a/mpflash/custom/naming.py b/mpflash/custom/naming.py index 528eaee..eed1faf 100644 --- a/mpflash/custom/naming.py +++ b/mpflash/custom/naming.py @@ -84,11 +84,27 @@ def port_and_boardid_from_path(firmware_path: Path) -> Tuple[Optional[str], Opti port = port_match.group(1) return port, None + # Pattern: detect known port name in directory structure + # (e.g., /downloads/rp2/BOARD_ID-v1.0.0.uf2 or /firmware/esp32/GENERIC.bin) + known_ports = list(PORT_FWTYPES.keys()) + list(SA_PORTS) + alt = "|".join(re.escape(p) for p in known_ports) + dir_match = re.search(rf"/({alt})(?:/|\\)([^/\\]*)?$", path_str, re.IGNORECASE) + if dir_match: + port = dir_match.group(1).lower() + # Try to extract board_id from the filename by removing version/variant suffixes + filename_stem = Path(path_str).stem + if filename_stem: + # Remove common version/build suffixes (e.g., -v1.28.0, -20250510, _v1_21_0) + # Keep the first part that looks like a board ID + board_id = re.sub(r"[-_]v?\d+.*$", "", filename_stem, flags=re.IGNORECASE) + if board_id: + return port, board_id + return port, None + # Fallback: try to detect a known port name embedded in the filename # (e.g. lvgl_micropy_ESP32_GENERIC-SPIRAM-16.bin -> port=esp32) filename = Path(path_str).name if filename: - known_ports = list(PORT_FWTYPES.keys()) + list(SA_PORTS) # Single regex with alternation to avoid recompiling per iteration. alt = "|".join(re.escape(p) for p in known_ports) m = re.search(rf"(?:^|[_\-])({alt})(?:[_\-]|\.|$)", filename, re.IGNORECASE) diff --git a/mpflash/flash/__init__.py b/mpflash/flash/__init__.py index 939a409..102f5f5 100644 --- a/mpflash/flash/__init__.py +++ b/mpflash/flash/__init__.py @@ -69,6 +69,11 @@ def flash_mcu( if not enter_bootloader(mcu, bootloader): raise MPFlashError(f"Failed to enter bootloader for {mcu.board} on {mcu.serialport}") updated = flash_uf2(mcu, fw_file=fw_file, erase=erase) + elif fw_file.suffix == ".uf2": + raise MPFlashError( + "Cannot determine a UF2-capable port for this device. " + "Use '--port ' (for example 'rp2', 'samd', or 'nrf') or specify '--board'." + ) elif mcu.port in ["stm32"]: if not enter_bootloader(mcu, bootloader): raise MPFlashError(f"Failed to enter bootloader for {mcu.board} on {mcu.serialport}") diff --git a/tests/cli/test_cli_flash.py b/tests/cli/test_cli_flash.py index 1b08cd0..e72723a 100644 --- a/tests/cli/test_cli_flash.py +++ b/tests/cli/test_cli_flash.py @@ -1,3 +1,4 @@ +from pathlib import Path from typing import List from unittest.mock import Mock @@ -261,6 +262,157 @@ def test_mpflash_flash_with_volume_rejects_non_uf2_ports(mocker: MockerFixture): assert result.exit_code == 2 + +def test_mpflash_flash_direct_file_infers_esp32_board_variant(mocker: MockerFixture, tmp_path): + """Flash directly from --file and infer esp32 board+variant from build path.""" + fw_path = tmp_path / "micropython" / "ports" / "esp32" / "build-ESP32_GENERIC-SPIRAM" / "micropython.bin" + fw_path.parent.mkdir(parents=True, exist_ok=True) + fw_path.write_bytes(b"firmware") + args = ["flash", "--version", "1.27.0", "--serial", "COM8", "--file", str(fw_path)] + + mocker.patch( + "mpflash.ask_input.ask_missing_params", + Mock(side_effect=fake_ask_missing_params), + ) + mocker.patch("mpflash.cli_flash.filtered_comports", return_value=["COM8"]) + board = MPRemoteBoard("COM8") + board.port = "esp32" + board.board_id = "ESP32_GENERIC-SPIRAM" + task = FlashTask(board=board, firmware=None) + m_create_worklist = mocker.patch("mpflash.flash.worklist.create_worklist", return_value=[task]) + m_flash_tasks = mocker.patch("mpflash.flash.flash_tasks", return_value=[board]) + m_ensure = mocker.patch("mpflash.download.jid.ensure_firmware_downloaded_tasks") + mocker.patch("mpflash.list.show_mcus") + + runner = CliRunner() + result = runner.invoke(cli_main.cli, args, standalone_mode=True) + + assert result.exit_code == 0 + assert m_create_worklist.call_args.kwargs["board_id"] == "ESP32_GENERIC-SPIRAM" + assert m_create_worklist.call_args.kwargs["port"] == "esp32" + flashed_task = m_flash_tasks.call_args.args[0][0] + assert flashed_task.firmware.firmware_file == fw_path.as_posix() + m_ensure.assert_not_called() + + +def test_mpflash_flash_direct_file_prompts_when_esp32_variant_unclear(mocker: MockerFixture, tmp_path): + """If esp32 variant cannot be inferred from file path, ask for board selection.""" + fw_path = tmp_path / "micropython" / "ports" / "esp32" / "micropython.bin" + fw_path.parent.mkdir(parents=True, exist_ok=True) + fw_path.write_bytes(b"firmware") + args = ["flash", "--version", "1.27.0", "--serial", "COM8", "--file", str(fw_path)] + + observed = {} + + def _fake_ask(params): + observed["boards_before_prompt"] = list(params.boards) + params.boards = ["ESP32_GENERIC"] + params.variant = "SPIRAM" + params.ports = ["esp32"] + return params + + mocker.patch( + "mpflash.ask_input.ask_missing_params", + Mock(side_effect=_fake_ask), + ) + mocker.patch("mpflash.cli_flash.filtered_comports", return_value=["COM8"]) + board = MPRemoteBoard("COM8") + board.port = "esp32" + board.board_id = "ESP32_GENERIC-SPIRAM" + task = FlashTask(board=board, firmware=None) + mocker.patch("mpflash.flash.worklist.create_worklist", return_value=[task]) + mocker.patch("mpflash.flash.flash_tasks", return_value=[board]) + mocker.patch("mpflash.download.jid.ensure_firmware_downloaded_tasks") + mocker.patch("mpflash.list.show_mcus") + + runner = CliRunner() + result = runner.invoke(cli_main.cli, args, standalone_mode=True) + + assert result.exit_code == 0 + assert observed["boards_before_prompt"] == ["?"] + + +def test_mpflash_flash_direct_url_downloads_firmware(mocker: MockerFixture): + """Download firmware via --url and flash directly without DB download step.""" + args = [ + "flash", + "--version", + "1.27.0", + "--board", + "ESP32_GENERIC", + "--serial", + "COM8", + "--url", + "https://example.com/micropython.bin", + ] + + class _Response: + def raise_for_status(self): + return None + + def iter_content(self, chunk_size=0): + yield b"1234" + + mocker.patch("mpflash.ask_input.ask_missing_params", Mock(side_effect=fake_ask_missing_params)) + mocker.patch("requests.get", return_value=_Response()) + mocker.patch("mpflash.cli_flash.filtered_comports", return_value=["COM8"]) + board = MPRemoteBoard("COM8") + board.port = "esp32" + board.board_id = "ESP32_GENERIC" + task = FlashTask(board=board, firmware=None) + mocker.patch("mpflash.flash.worklist.create_worklist", return_value=[task]) + m_flash_tasks = mocker.patch("mpflash.flash.flash_tasks", return_value=[board]) + m_ensure = mocker.patch("mpflash.download.jid.ensure_firmware_downloaded_tasks") + mocker.patch("mpflash.list.show_mcus") + + runner = CliRunner() + result = runner.invoke(cli_main.cli, args, standalone_mode=True) + + assert result.exit_code == 0 + flashed_firmware_path = Path(m_flash_tasks.call_args.args[0][0].firmware.firmware_file) + assert flashed_firmware_path.name.startswith("mpflash-fw-") + assert not flashed_firmware_path.exists() + m_ensure.assert_not_called() + + +def test_mpflash_flash_direct_url_cleans_temp_file_on_flash_error(mocker: MockerFixture): + """Cleanup temp URL firmware file even when flashing fails.""" + args = [ + "flash", + "--version", + "1.27.0", + "--board", + "ESP32_GENERIC", + "--serial", + "COM8", + "--url", + "https://example.com/micropython.bin", + ] + + class _Response: + def raise_for_status(self): + return None + + def iter_content(self, chunk_size=0): + yield b"1234" + + mocker.patch("mpflash.ask_input.ask_missing_params", Mock(side_effect=fake_ask_missing_params)) + mocker.patch("requests.get", return_value=_Response()) + mocker.patch("mpflash.cli_flash.filtered_comports", return_value=["COM8"]) + board = MPRemoteBoard("COM8") + board.port = "esp32" + board.board_id = "ESP32_GENERIC" + task = FlashTask(board=board, firmware=None) + mocker.patch("mpflash.flash.worklist.create_worklist", return_value=[task]) + m_flash_tasks = mocker.patch("mpflash.flash.flash_tasks", side_effect=RuntimeError("boom")) + + runner = CliRunner() + result = runner.invoke(cli_main.cli, args, standalone_mode=True) + + assert result.exit_code == 1 + flashed_firmware_path = Path(m_flash_tasks.call_args.args[0][0].firmware.firmware_file) + assert not flashed_firmware_path.exists() + @pytest.mark.skip("TODO: Test Broken") def test_flash_triggers_just_in_time_download(mocker: MockerFixture, session_fx): """