Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/cluster-client/zisk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ thiserror.workspace = true
tokio = { workspace = true, features = ["time"] }
tonic.workspace = true
tonic-prost.workspace = true
tracing.workspace = true

# Local dependencies
ere-compiler-core.workspace = true
Expand Down
84 changes: 39 additions & 45 deletions crates/cluster-client/zisk/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use ere_verifier_zisk::{
use serde::Deserialize;
use tokio::time::{Instant, sleep, timeout, timeout_at};
use tonic::{Code, transport::Channel};
use tracing::warn;

use crate::{
api::{
Expand Down Expand Up @@ -62,32 +63,10 @@ impl ZiskClusterClient {
/// Submits a prove job and returns its `job_id` immediately, without waiting for completion.
pub async fn create_prove_job(&self, input: &Input) -> Result<String, Error> {
let mut client = self.client.clone();
let job = JobKind {
kind: Some(job_kind::Kind::Prove(ProveRequest {
hash_id: self.hash_id.clone(),
input: Some(InputKind {
kind: Some(input_kind::Kind::Inline(InputChunk {
data: framed_stdin(input.stdin()),
})),
}),
proof_dest: ProofKind::StarkMinimal as i32,
proof_timeout: None,
hints: None,
})),
};
let req = JobRequestMessage {
job_kind: Some(job),
};
let job_id = match client.job_request(req.clone()).await {
Ok(res) => res.into_inner().job_id,
Err(err)
if err.code() == Code::Unavailable && err.message().contains("setup not done") =>
{
setup(&mut client, self.elf.clone()).await?;
client.job_request(req).await?.into_inner().job_id
}
Err(err) => Err(err)?,
job_kind: Some(prove_job(&self.hash_id, input)),
};
let job_id = client.job_request(req).await?.into_inner().job_id;
Ok(job_id)
}

Expand Down Expand Up @@ -120,15 +99,11 @@ impl ZiskClusterClient {
Ok(cancelled)
}

/// Submits a prove job, wait up to `timeout` for completion, cancel the job on deadline.
/// Submits a prove job, wait for completion, cancel the job on deadline.
///
/// Returns `Error::ProveTimeout` if the deadline expires before the job terminates.
///
/// Retries prove job submission until deadline if cluster response the unavailable errors:
///
/// - `setup not done`
/// - `no workers connected`
/// - `workers are setting up`
/// Retries prove job submission on every 5 seconds until deadline.
///
/// Returns `Error::ClusterUnavailable` if the deadline expires before the job submission.
pub async fn prove(
Expand All @@ -137,23 +112,26 @@ impl ZiskClusterClient {
deadline: Instant,
) -> Result<(ZiskProof, Duration), Error> {
let fut = async {
let mut client = self.client.clone();
let job = prove_job(&self.hash_id, input);
loop {
let result = self.create_prove_job(input).await;
if let Err(Error::Grpc(status)) = &result
&& status.code() == Code::Unavailable
{
if status.message().contains("setup not done") {
setup(&mut self.client.clone(), self.elf.clone()).await?;
continue;
}
if status.message().contains("no workers connected")
|| status.message().contains("workers are setting up")
{
sleep(Duration::from_secs(5)).await;
continue;
let req = JobRequestMessage {
job_kind: Some(job.clone()),
};
let status = match client.job_request(req).await {
Ok(res) => return Ok::<_, Error>(res.into_inner().job_id),
Err(status) => {
if status.code() == Code::Unavailable
&& status.message().contains("setup not done")
{
setup(&mut client, self.elf.clone()).await?;
continue;
}
status
}
}
return result;
};
warn!(?status, "job submission failed, retrying...");
sleep(Duration::from_secs(5)).await;
}
};

Expand Down Expand Up @@ -246,6 +224,22 @@ async fn wait_job(
}
}

fn prove_job(hash_id: &str, input: &Input) -> JobKind {
JobKind {
kind: Some(job_kind::Kind::Prove(ProveRequest {
hash_id: hash_id.to_string(),
input: Some(InputKind {
kind: Some(input_kind::Kind::Inline(InputChunk {
data: framed_stdin(input.stdin()),
})),
}),
proof_dest: ProofKind::StarkMinimal as i32,
proof_timeout: None,
hints: None,
})),
}
}

/// Returns `data` with a LE u64 length prefix and padding to multiple of 8.
///
/// The length prefix and padding is expected by ZisK emulator/prover runtime.
Expand Down
Loading