Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 157 additions & 29 deletions curvine-client/src/block/block_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

use crate::block::block_reader::ReaderAdapter::{Hole, Local, Remote};
use crate::block::{BlockReaderHole, BlockReaderLocal, BlockReaderRemote};
use crate::file::FsContext;
use crate::file::{FsContext, ReadChunkKey};
use crate::p2p::ChunkId;
use bytes::Bytes;
use curvine_common::state::{ClientAddress, ExtendedBlock, LocatedBlock, WorkerAddress};
use curvine_common::FsResult;
use log::warn;
Expand All @@ -24,6 +26,7 @@ use orpc::runtime::{RpcRuntime, Runtime};
use orpc::sys::DataSlice;
use orpc::{err_box, CommonResult};
use std::sync::Arc;
use tokio::sync::{Mutex as AsyncMutex, OwnedMutexGuard};

enum ReaderAdapter {
Local(BlockReaderLocal),
Expand Down Expand Up @@ -110,14 +113,22 @@ pub struct BlockReader {
inner: ReaderAdapter,
locs: Vec<WorkerAddress>,
block: ExtendedBlock,
file_id: i64,
file_version_epoch: i64,
file_mtime: i64,
fs_context: Arc<FsContext>,
}

type ReadChunkFlight = (Arc<AsyncMutex<()>>, OwnedMutexGuard<()>);

impl BlockReader {
pub async fn new(
fs_context: Arc<FsContext>,
located: LocatedBlock,
off: i64,
file_id: i64,
file_version_epoch: i64,
file_mtime: i64,
) -> CommonResult<Self> {
let len = located.block.len;

Expand All @@ -134,6 +145,9 @@ impl BlockReader {
inner: adapter,
locs,
block: located.block,
file_id,
file_version_epoch,
file_mtime,
fs_context,
};

Expand Down Expand Up @@ -220,37 +234,43 @@ impl BlockReader {
return Ok(DataSlice::empty());
}

loop {
match self.inner.read().await {
Ok(v) => return Ok(v),
let read_key = ReadChunkKey::new(
self.file_id,
self.file_version_epoch,
self.block.id,
self.pos(),
);
if let Some(chunk) = self.try_read_chunk_cache(&read_key)? {
return Ok(chunk);
}

Err(e) => {
// For Hole readers or when all workers exhausted, fail immediately
if matches!(&self.inner, Hole(_)) || self.locs.is_empty() {
return Err(e.ctx(format!(
"failed to read block on {}",
self.inner.worker_address()
)));
}
let mut flight = self.acquire_read_chunk_flight(&read_key).await;
if let Some(chunk) = self.try_read_chunk_cache(&read_key)? {
self.release_read_chunk_flight(&read_key, &mut flight);
return Ok(chunk);
}

warn!(
"read data error block id {}, addr {}: {}",
self.block_id(),
self.inner.worker_address(),
e
);
self.locs.retain(|x| x != self.inner.worker_address());
self.inner = Self::get_reader(
&self.locs,
self.block.clone(),
self.fs_context.clone(),
self.pos(),
self.len(),
)
.await?;
}
}
let chunk_id = ChunkId::with_version(
self.file_id,
self.file_version_epoch,
self.block.id,
self.pos(),
);
let expect_len = self
.remaining()
.min(self.fs_context.read_chunk_size() as i64)
.max(0) as usize;
if let Some(chunk) = self
.try_read_from_p2p(chunk_id, expect_len, &read_key)
.await?
{
self.release_read_chunk_flight(&read_key, &mut flight);
return Ok(chunk);
}

let chunk = self.read_from_worker(&read_key, chunk_id).await;
self.release_read_chunk_flight(&read_key, &mut flight);
chunk
}

pub fn blocking_read(&mut self, rt: &Runtime) -> FsResult<DataSlice> {
Expand Down Expand Up @@ -295,4 +315,112 @@ impl BlockReader {
pub fn block_id(&self) -> i64 {
self.inner.block_id()
}

async fn acquire_read_chunk_flight(&self, read_key: &ReadChunkKey) -> Option<ReadChunkFlight> {
if !self.fs_context.read_chunk_cache_enabled() {
return None;
}
let lock = self.fs_context.read_chunk_flight_lock(read_key.clone());
let guard = lock.clone().lock_owned().await;
Some((lock, guard))
}

fn release_read_chunk_flight(
&self,
read_key: &ReadChunkKey,
flight: &mut Option<ReadChunkFlight>,
) {
if let Some((lock, guard)) = flight.take() {
drop(guard);
self.fs_context.cleanup_read_chunk_flight(read_key, &lock);
}
}

fn try_read_chunk_cache(&mut self, read_key: &ReadChunkKey) -> FsResult<Option<DataSlice>> {
let Some(cached) = self.fs_context.get_read_chunk_cache(read_key) else {
return Ok(None);
};
self.advance_cached_position(cached.len())?;
Ok(Some(DataSlice::bytes(cached)))
}

async fn try_read_from_p2p(
&mut self,
chunk_id: ChunkId,
expect_len: usize,
read_key: &ReadChunkKey,
) -> FsResult<Option<DataSlice>> {
if expect_len == 0 || !matches!(&self.inner, Remote(_)) {
return Ok(None);
}
let Some(service) = self.fs_context.p2p_service() else {
return Ok(None);
};
if let Some(cached) = service
.fetch_chunk(chunk_id, expect_len, Some(self.file_mtime))
.await
{
self.advance_cached_position(cached.len())?;
self.fs_context
.put_read_chunk_cache(read_key.clone(), cached.clone());
return Ok(Some(DataSlice::bytes(cached)));
}
if service.conf().fallback_worker_on_fail {
Ok(None)
} else {
err_box!("p2p read miss and worker fallback is disabled")
}
}

async fn read_from_worker(
&mut self,
read_key: &ReadChunkKey,
chunk_id: ChunkId,
) -> FsResult<DataSlice> {
loop {
match self.inner.read().await {
Ok(chunk) => {
if !chunk.is_empty() {
self.fs_context.on_worker_chunk_read(
read_key.clone(),
chunk_id,
Bytes::copy_from_slice(chunk.as_slice()),
self.file_mtime,
);
}
return Ok(chunk);
}
Err(e) => {
if matches!(&self.inner, Hole(_)) || self.locs.is_empty() {
return Err(e.ctx(format!(
"failed to read block on {}",
self.inner.worker_address()
)));
}

warn!(
"read data error block id {}, addr {}: {}",
self.block_id(),
self.inner.worker_address(),
e
);
self.locs.retain(|x| x != self.inner.worker_address());
self.inner = Self::get_reader(
&self.locs,
self.block.clone(),
self.fs_context.clone(),
self.pos(),
self.len(),
)
.await?;
}
}
}
}

fn advance_cached_position(&mut self, len: usize) -> FsResult<()> {
let next_pos = self.pos() + len as i64;
self.inner.seek(next_pos)?;
Ok(())
}
}
Loading
Loading