Skip to content

PR A2: Gemini Batch API for one-shot corpus run (split from #40) #51

@jakebromberg

Description

@jakebromberg

Problem

PR A's first half ships context caching (#50). The second half — batch mode — was deferred because it requires a separate orchestration pattern (24h-SLA submit-poll-fetch) that doesn't compose with the existing per-page async pipeline. This issue tracks that follow-up.

Gemini exposes a Batch API that runs at ~50% of real-time pricing with a 24h SLA. For a one-shot full-corpus run this is the right knob — we don't need real-time turnaround. Per-page billing drops by half on top of whatever cache savings PR A delivers.

End state

A new opt-in flag (--batch on flowsheets process) routes the run through Gemini's Batch API. The flow:

  1. Submit: build a list of pending pages, write them to a batch request, call client.batches.create (or the SDK's equivalent).
  2. Poll: a flowsheets batch-status <batch_id> subcommand checks state.
  3. Fetch: when batch is SUCCEEDED, walk the result payload, map each response to its (pdf_path, page_number), write the JSON result, mark completed.

Real-time mode stays the default; the batch path is wholly opt-in so dev iteration isn't affected.

Where

  • core/gemini.py — add submit_batch(jobs) and fetch_batch_results(batch_id) methods. Keep them independent of extract_page so the per-page sync path doesn't carry batch concerns.
  • core/jobs.py — add a submitted job status with batch_id and within_batch_index columns so we can map responses back when the batch returns.
  • core/pipeline.py — new submit_pending_batch and fetch_batch_completions functions, parallel to process_pending.
  • cli.pyflowsheets process --batch, flowsheets batch-status <id>, flowsheets batch-fetch <id>.

Constraints

  • Per-issue body of PR A: Batch mode + context caching in core/gemini.py #40: real-time stays default; batch is opt-in.
  • 24h SLA — don't block dev iteration on it. The CLI should submit and return; the user runs flowsheets batch-fetch later.
  • Don't conflate caching tests with batch tests — they're independent.
  • Existing 34 corpus JSONs must stay valid (no schema mutation).

Acceptance criteria

  • flowsheets process --batch --limit 10 submits 10 pages and returns immediately with a batch id.
  • flowsheets batch-status <id> shows the batch state (PENDING, RUNNING, SUCCEEDED, FAILED).
  • flowsheets batch-fetch <id> walks results, writes per-page JSONs, marks jobs completed.
  • Submit/poll/fetch covered by unit tests (mocked SDK).
  • Billed input + output cost on a small batch run lands at ~50% of real-time on the same workload.
  • All existing tests pass.

Notes for implementer

  • The Gemini SDK's batch entry-point is client.batches.create (or client.aio.batches.create for async). Request body is a list of GenerateContentRequest-shaped items; each one carries its own model, contents, config. Caching (PR A) composes — submit each batch item with cached_content=<cache_name> and you get both discounts.
  • Map response back to job rows via order index (batch responses are returned in submission order). Store the order in jobs.db when submitting so an out-of-band batch-fetch invocation can reconcile.
  • A batch that's partially failed needs a per-item error path. Surface those as failed job rows the same as real-time errors so retry semantics work uniformly.

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions