diff --git a/curvine-client/src/block/block_reader.rs b/curvine-client/src/block/block_reader.rs index 8ae08eb9e..0f7eeaea4 100644 --- a/curvine-client/src/block/block_reader.rs +++ b/curvine-client/src/block/block_reader.rs @@ -14,7 +14,9 @@ use crate::block::block_reader::ReaderAdapter::{Hole, Local, Remote}; use crate::block::{BlockReaderHole, BlockReaderLocal, BlockReaderRemote}; -use crate::file::FsContext; +use crate::file::{FsContext, ReadChunkKey}; +use crate::p2p::ChunkId; +use bytes::Bytes; use curvine_common::state::{ClientAddress, ExtendedBlock, LocatedBlock, WorkerAddress}; use curvine_common::FsResult; use log::warn; @@ -24,6 +26,7 @@ use orpc::runtime::{RpcRuntime, Runtime}; use orpc::sys::DataSlice; use orpc::{err_box, CommonResult}; use std::sync::Arc; +use tokio::sync::{Mutex as AsyncMutex, OwnedMutexGuard}; enum ReaderAdapter { Local(BlockReaderLocal), @@ -110,14 +113,22 @@ pub struct BlockReader { inner: ReaderAdapter, locs: Vec, block: ExtendedBlock, + file_id: i64, + file_version_epoch: i64, + file_mtime: i64, fs_context: Arc, } +type ReadChunkFlight = (Arc>, OwnedMutexGuard<()>); + impl BlockReader { pub async fn new( fs_context: Arc, located: LocatedBlock, off: i64, + file_id: i64, + file_version_epoch: i64, + file_mtime: i64, ) -> CommonResult { let len = located.block.len; @@ -134,6 +145,9 @@ impl BlockReader { inner: adapter, locs, block: located.block, + file_id, + file_version_epoch, + file_mtime, fs_context, }; @@ -220,37 +234,43 @@ impl BlockReader { return Ok(DataSlice::empty()); } - loop { - match self.inner.read().await { - Ok(v) => return Ok(v), + let read_key = ReadChunkKey::new( + self.file_id, + self.file_version_epoch, + self.block.id, + self.pos(), + ); + if let Some(chunk) = self.try_read_chunk_cache(&read_key)? { + return Ok(chunk); + } - Err(e) => { - // For Hole readers or when all workers exhausted, fail immediately - if matches!(&self.inner, Hole(_)) || self.locs.is_empty() { - return Err(e.ctx(format!( - "failed to read block on {}", - self.inner.worker_address() - ))); - } + let mut flight = self.acquire_read_chunk_flight(&read_key).await; + if let Some(chunk) = self.try_read_chunk_cache(&read_key)? { + self.release_read_chunk_flight(&read_key, &mut flight); + return Ok(chunk); + } - warn!( - "read data error block id {}, addr {}: {}", - self.block_id(), - self.inner.worker_address(), - e - ); - self.locs.retain(|x| x != self.inner.worker_address()); - self.inner = Self::get_reader( - &self.locs, - self.block.clone(), - self.fs_context.clone(), - self.pos(), - self.len(), - ) - .await?; - } - } + let chunk_id = ChunkId::with_version( + self.file_id, + self.file_version_epoch, + self.block.id, + self.pos(), + ); + let expect_len = self + .remaining() + .min(self.fs_context.read_chunk_size() as i64) + .max(0) as usize; + if let Some(chunk) = self + .try_read_from_p2p(chunk_id, expect_len, &read_key) + .await? + { + self.release_read_chunk_flight(&read_key, &mut flight); + return Ok(chunk); } + + let chunk = self.read_from_worker(&read_key, chunk_id).await; + self.release_read_chunk_flight(&read_key, &mut flight); + chunk } pub fn blocking_read(&mut self, rt: &Runtime) -> FsResult { @@ -295,4 +315,112 @@ impl BlockReader { pub fn block_id(&self) -> i64 { self.inner.block_id() } + + async fn acquire_read_chunk_flight(&self, read_key: &ReadChunkKey) -> Option { + if !self.fs_context.read_chunk_cache_enabled() { + return None; + } + let lock = self.fs_context.read_chunk_flight_lock(read_key.clone()); + let guard = lock.clone().lock_owned().await; + Some((lock, guard)) + } + + fn release_read_chunk_flight( + &self, + read_key: &ReadChunkKey, + flight: &mut Option, + ) { + if let Some((lock, guard)) = flight.take() { + drop(guard); + self.fs_context.cleanup_read_chunk_flight(read_key, &lock); + } + } + + fn try_read_chunk_cache(&mut self, read_key: &ReadChunkKey) -> FsResult> { + let Some(cached) = self.fs_context.get_read_chunk_cache(read_key) else { + return Ok(None); + }; + self.advance_cached_position(cached.len())?; + Ok(Some(DataSlice::bytes(cached))) + } + + async fn try_read_from_p2p( + &mut self, + chunk_id: ChunkId, + expect_len: usize, + read_key: &ReadChunkKey, + ) -> FsResult> { + if expect_len == 0 || !matches!(&self.inner, Remote(_)) { + return Ok(None); + } + let Some(service) = self.fs_context.p2p_service() else { + return Ok(None); + }; + if let Some(cached) = service + .fetch_chunk(chunk_id, expect_len, Some(self.file_mtime)) + .await + { + self.advance_cached_position(cached.len())?; + self.fs_context + .put_read_chunk_cache(read_key.clone(), cached.clone()); + return Ok(Some(DataSlice::bytes(cached))); + } + if service.conf().fallback_worker_on_fail { + Ok(None) + } else { + err_box!("p2p read miss and worker fallback is disabled") + } + } + + async fn read_from_worker( + &mut self, + read_key: &ReadChunkKey, + chunk_id: ChunkId, + ) -> FsResult { + loop { + match self.inner.read().await { + Ok(chunk) => { + if !chunk.is_empty() { + self.fs_context.on_worker_chunk_read( + read_key.clone(), + chunk_id, + Bytes::copy_from_slice(chunk.as_slice()), + self.file_mtime, + ); + } + return Ok(chunk); + } + Err(e) => { + if matches!(&self.inner, Hole(_)) || self.locs.is_empty() { + return Err(e.ctx(format!( + "failed to read block on {}", + self.inner.worker_address() + ))); + } + + warn!( + "read data error block id {}, addr {}: {}", + self.block_id(), + self.inner.worker_address(), + e + ); + self.locs.retain(|x| x != self.inner.worker_address()); + self.inner = Self::get_reader( + &self.locs, + self.block.clone(), + self.fs_context.clone(), + self.pos(), + self.len(), + ) + .await?; + } + } + } + } + + fn advance_cached_position(&mut self, len: usize) -> FsResult<()> { + let next_pos = self.pos() + len as i64; + self.inner.seek(next_pos)?; + Ok(()) + } } diff --git a/curvine-client/src/file/fs_context.rs b/curvine-client/src/file/fs_context.rs index d860fb00e..f638ddd2c 100644 --- a/curvine-client/src/file/fs_context.rs +++ b/curvine-client/src/file/fs_context.rs @@ -14,8 +14,10 @@ use crate::block::{BlockClient, BlockClientPool}; use crate::file::CurvineFileSystem; +use crate::p2p::ChunkId; use crate::p2p::P2pService; use crate::ClientMetrics; +use bytes::Bytes; use curvine_common::conf::ClusterConf; use curvine_common::proto::ClientAddressProto; use curvine_common::state::{ClientAddress, WorkerAddress}; @@ -31,12 +33,33 @@ use orpc::common::Utils; use orpc::io::net::NetUtils; use orpc::io::IOResult; use orpc::runtime::{RpcRuntime, Runtime}; +use orpc::sync::FastDashMap; use orpc::sys::CacheManager; use std::hash::BuildHasherDefault; use std::sync::Arc; +use tokio::sync::Mutex as AsyncMutex; static CLIENT_METRICS: OnceCell = OnceCell::new(); +#[derive(Debug, Clone, Hash, Eq, PartialEq)] +pub(crate) struct ReadChunkKey { + pub(crate) file_id: i64, + pub(crate) version_epoch: i64, + pub(crate) block_id: i64, + pub(crate) off: i64, +} + +impl ReadChunkKey { + pub(crate) fn new(file_id: i64, version_epoch: i64, block_id: i64, off: i64) -> Self { + Self { + file_id, + version_epoch, + block_id, + off, + } + } +} + // The core feature of the file system is thread-safe, which can be shared between multiple threads through Arc. // 1. The cluster configuration file is saved. // 2. Create client. @@ -47,6 +70,8 @@ pub struct FsContext { pub(crate) client_addr: ClientAddress, pub(crate) os_cache: CacheManager, pub(crate) failed_workers: Cache>, + pub(crate) read_chunk_cache: Cache>, + pub(crate) read_chunk_flights: FastDashMap>>, pub(crate) block_pool: Arc, pub(crate) p2p_service: Option>, } @@ -86,6 +111,15 @@ impl FsContext { .time_to_live(conf.client.failed_worker_ttl) .eviction_policy(EvictionPolicy::lru()) .build_with_hasher(BuildHasherDefault::::default()); + let read_chunk_cache_capacity = (conf.client.p2p.cache_capacity + / conf.client.read_chunk_size.max(1) as u64) + .clamp(1, 65_536); + let read_chunk_cache = CacheBuilder::default() + .max_capacity(read_chunk_cache_capacity) + .time_to_live(conf.client.p2p.cache_ttl) + .eviction_policy(EvictionPolicy::lru()) + .build_with_hasher(BuildHasherDefault::::default()); + let read_chunk_flights = FastDashMap::default(); let block_pool = Arc::new(BlockClientPool::new( conf.client.enable_block_conn_pool, @@ -106,6 +140,8 @@ impl FsContext { client_addr, os_cache, failed_workers: exclude_workers, + read_chunk_cache, + read_chunk_flights, block_pool, p2p_service, }; @@ -188,6 +224,60 @@ impl FsContext { self.p2p_service.clone() } + pub(crate) fn read_chunk_cache_enabled(&self) -> bool { + self.p2p_service.is_some() + } + + pub(crate) fn get_read_chunk_cache(&self, key: &ReadChunkKey) -> Option { + if !self.read_chunk_cache_enabled() { + return None; + } + self.read_chunk_cache.get(key) + } + + pub(crate) fn put_read_chunk_cache(&self, key: ReadChunkKey, data: Bytes) { + if self.read_chunk_cache_enabled() && !data.is_empty() { + self.read_chunk_cache.insert(key, data); + } + } + + pub(crate) fn read_chunk_flight_lock(&self, key: ReadChunkKey) -> Arc> { + if let Some(lock) = self.read_chunk_flights.get(&key) { + return lock.clone(); + } + let lock = Arc::new(AsyncMutex::new(())); + self.read_chunk_flights + .entry(key) + .or_insert_with(|| lock.clone()) + .clone() + } + + pub(crate) fn cleanup_read_chunk_flight(&self, key: &ReadChunkKey, lock: &Arc>) { + if let Some(existing) = self.read_chunk_flights.get(key) { + let should_remove = Arc::ptr_eq(existing.value(), lock); + drop(existing); + if should_remove { + self.read_chunk_flights.remove(key); + } + } + } + + pub(crate) fn on_worker_chunk_read( + &self, + read_key: ReadChunkKey, + chunk_id: ChunkId, + data: Bytes, + mtime: i64, + ) { + if data.is_empty() { + return; + } + self.put_read_chunk_cache(read_key, data.clone()); + if let Some(service) = self.p2p_service() { + let _ = service.publish_chunk(chunk_id, data, mtime); + } + } + pub fn get_metrics<'a>() -> &'a ClientMetrics { CLIENT_METRICS.get().expect("client get metrics error!") } @@ -241,8 +331,9 @@ impl FsContext { #[cfg(test)] mod tests { - use super::FsContext; - use crate::p2p::P2pState; + use super::{FsContext, ReadChunkKey}; + use crate::p2p::{ChunkId, P2pState}; + use bytes::Bytes; use curvine_common::conf::ClusterConf; use std::sync::Arc; @@ -263,4 +354,31 @@ mod tests { let service = ctx.p2p_service().expect("p2p service should exist"); assert_eq!(service.state(), P2pState::Running); } + + #[test] + fn worker_chunk_read_populates_local_cache_and_p2p_registry() { + let mut conf = ClusterConf::default(); + conf.client.p2p.enable = true; + + let rt_a = Arc::new(conf.client_rpc_conf().create_runtime()); + let ctx_a = FsContext::with_rt(conf.clone(), rt_a).expect("fs context should build"); + let rt_b = Arc::new(conf.client_rpc_conf().create_runtime()); + let ctx_b = FsContext::with_rt(conf, rt_b).expect("fs context should build"); + + let service_a = ctx_a.p2p_service().expect("service should exist"); + let service_b = ctx_b.p2p_service().expect("service should exist"); + assert!(service_a.start()); + assert!(service_b.start()); + + let read_key = ReadChunkKey::new(11, 22, 33, 44); + let chunk_id = ChunkId::with_version(11, 22, 33, 44); + let data = Bytes::from_static(b"cached-worker-chunk"); + ctx_a.on_worker_chunk_read(read_key.clone(), chunk_id, data.clone(), 99); + + assert_eq!(ctx_a.get_read_chunk_cache(&read_key), Some(data.clone())); + + let rt = tokio::runtime::Runtime::new().expect("tokio runtime"); + let fetched = rt.block_on(service_b.fetch_chunk(chunk_id, data.len(), Some(99))); + assert_eq!(fetched, Some(data)); + } } diff --git a/curvine-client/src/file/fs_reader_base.rs b/curvine-client/src/file/fs_reader_base.rs index 92c9a1877..ab2a51261 100644 --- a/curvine-client/src/file/fs_reader_base.rs +++ b/curvine-client/src/file/fs_reader_base.rs @@ -192,8 +192,15 @@ impl FsReaderBase { } None => { - // Create a new block reader - BlockReader::new(self.fs_context.clone(), loc.clone(), block_off).await? + BlockReader::new( + self.fs_context.clone(), + loc.clone(), + block_off, + self.file_blocks.status.id, + self.file_blocks.status.version_epoch, + self.file_blocks.status.mtime, + ) + .await? } }; self.update_reader(Some(new_reader), false).await?; diff --git a/curvine-client/src/file/mod.rs b/curvine-client/src/file/mod.rs index aed6bb0a6..42b4a972b 100644 --- a/curvine-client/src/file/mod.rs +++ b/curvine-client/src/file/mod.rs @@ -26,6 +26,7 @@ pub use self::fs_writer::FsWriter; mod fs_context; pub use self::fs_context::FsContext; +pub(crate) use self::fs_context::ReadChunkKey; mod fs_reader_base; pub use self::fs_reader_base::FsReaderBase; diff --git a/curvine-client/src/p2p/service.rs b/curvine-client/src/p2p/service.rs index 0c0dd3442..88ff58b78 100644 --- a/curvine-client/src/p2p/service.rs +++ b/curvine-client/src/p2p/service.rs @@ -12,11 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::p2p::{CacheSnapshot, DiscoverySnapshot}; +use crate::p2p::{CacheSnapshot, ChunkId, DiscoverySnapshot}; +use bytes::Bytes; use curvine_common::conf::ClientP2pConf; -use std::sync::atomic::{AtomicU8, Ordering}; +use once_cell::sync::Lazy; +use orpc::sync::FastDashMap; +use std::sync::atomic::{AtomicU64, AtomicU8, Ordering}; use std::sync::Mutex; +#[derive(Debug, Clone)] +struct PublishedChunk { + owner_id: u64, + data: Bytes, + mtime: i64, +} + +static NEXT_SERVICE_ID: AtomicU64 = AtomicU64::new(1); +static PUBLISHED_CHUNKS: Lazy> = + Lazy::new(FastDashMap::default); + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum P2pState { Disabled = 0, @@ -44,6 +58,9 @@ pub struct P2pService { conf: ClientP2pConf, state: AtomicU8, stats: Mutex, + service_id: u64, + published_bytes: AtomicU64, + published_chunks: FastDashMap, } impl P2pService { @@ -57,6 +74,9 @@ impl P2pService { conf, state: AtomicU8::new(state as u8), stats: Mutex::new(P2pStatsSnapshot::default()), + service_id: NEXT_SERVICE_ID.fetch_add(1, Ordering::Relaxed), + published_bytes: AtomicU64::new(0), + published_chunks: FastDashMap::default(), } } @@ -72,6 +92,10 @@ impl P2pService { self.state() == P2pState::Running } + pub fn is_enabled(&self) -> bool { + self.conf.enable + } + pub fn start(&self) -> bool { if !self.conf.enable { return false; @@ -84,14 +108,23 @@ impl P2pService { if !self.conf.enable { return; } + self.clear_published_chunks(); self.state.store(P2pState::Stopped as u8, Ordering::Relaxed); } pub fn stats_snapshot(&self) -> P2pStatsSnapshot { - self.stats + let mut snapshot = self + .stats .lock() .unwrap_or_else(|poisoned| poisoned.into_inner()) - .clone() + .clone(); + let usage = self.published_bytes.load(Ordering::Relaxed); + let capacity = self.conf.cache_capacity.max(1); + snapshot.cache.usage_bytes = usage; + snapshot.cache.capacity_bytes = capacity; + snapshot.cache.usage_ratio = usage as f64 / capacity as f64; + snapshot.cache.cached_chunks_count = self.published_chunks.len(); + snapshot } pub fn update_stats(&self, snapshot: P2pStatsSnapshot) { @@ -100,12 +133,89 @@ impl P2pService { .lock() .unwrap_or_else(|poisoned| poisoned.into_inner()) = snapshot; } + + pub fn publish_chunk(&self, chunk_id: ChunkId, data: Bytes, mtime: i64) -> bool { + if !self.is_running() || data.is_empty() { + return false; + } + + let len = data.len(); + let previous_len = self.published_chunks.insert(chunk_id, len); + if let Some(previous_len) = previous_len { + self.published_bytes + .fetch_sub(previous_len as u64, Ordering::Relaxed); + } + self.published_bytes + .fetch_add(len as u64, Ordering::Relaxed); + PUBLISHED_CHUNKS.insert( + chunk_id, + PublishedChunk { + owner_id: self.service_id, + data, + mtime, + }, + ); + true + } + + pub async fn fetch_chunk( + &self, + chunk_id: ChunkId, + max_len: usize, + expected_mtime: Option, + ) -> Option { + if !self.is_running() || max_len == 0 { + return None; + } + + let chunk = PUBLISHED_CHUNKS.get(&chunk_id)?; + if expected_mtime.is_some_and(|mtime| mtime > 0 && chunk.mtime != mtime) { + return None; + } + + Some(chunk.data.slice(0..chunk.data.len().min(max_len))) + } + + fn clear_published_chunks(&self) { + let chunk_ids: Vec = self + .published_chunks + .iter() + .map(|entry| *entry.key()) + .collect(); + for chunk_id in chunk_ids { + if let Some(entry) = PUBLISHED_CHUNKS.get(&chunk_id) { + let owned = entry.owner_id == self.service_id; + drop(entry); + if owned { + PUBLISHED_CHUNKS.remove(&chunk_id); + } + } + } + self.published_chunks.clear(); + self.published_bytes.store(0, Ordering::Relaxed); + } +} + +impl Drop for P2pService { + fn drop(&mut self) { + self.clear_published_chunks(); + } } #[cfg(test)] mod tests { use super::{P2pService, P2pState}; + use crate::p2p::ChunkId; + use bytes::Bytes; use curvine_common::conf::ClientP2pConf; + use std::sync::atomic::{AtomicI64, Ordering}; + + static TEST_CHUNK_SEQ: AtomicI64 = AtomicI64::new(1); + + fn next_chunk_id() -> ChunkId { + let seq = TEST_CHUNK_SEQ.fetch_add(1, Ordering::Relaxed); + ChunkId::with_version(seq, seq, seq, 0) + } #[test] fn disabled_service_does_not_start() { @@ -127,4 +237,29 @@ mod tests { service.stop(); assert_eq!(service.state(), P2pState::Stopped); } + + #[test] + fn running_services_share_published_chunks_and_cleanup_on_stop() { + let conf = ClientP2pConf { + enable: true, + ..ClientP2pConf::default() + }; + let publisher = P2pService::new(conf.clone()); + let consumer = P2pService::new(conf); + assert!(publisher.start()); + assert!(consumer.start()); + + let chunk_id = next_chunk_id(); + let data = Bytes::from_static(b"peer-data"); + assert!(publisher.publish_chunk(chunk_id, data.clone(), 7)); + + let rt = tokio::runtime::Runtime::new().expect("tokio runtime"); + let fetched = rt.block_on(consumer.fetch_chunk(chunk_id, 4, Some(7))); + assert_eq!(fetched, Some(Bytes::from_static(b"peer"))); + + publisher.stop(); + + let after_stop = rt.block_on(consumer.fetch_chunk(chunk_id, 4, Some(7))); + assert!(after_stop.is_none()); + } }