@@ -252,18 +252,45 @@ public SelectedPool selectWritePool(long preallocated)
252252 .toLowerCase ()));
253253 }
254254
255+ if (_zone .isPresent ()) {
256+ SelectedPool pool = filterWritePool (levels , true , preallocated );
257+ if (pool != null ) {
258+ return pool ;
259+ }
260+ }
261+
262+ SelectedPool pool = filterWritePool (levels , false , preallocated );
263+ if (pool != null ) {
264+ return pool ;
265+ }
266+
267+ throw new CacheException (CacheException .NO_POOL_ONLINE ,
268+ noOnlinePoolsErrorMessage (DirectionType .WRITE .name ()
269+ .toLowerCase ()));
270+ }
271+
272+ @ Nullable
273+ private SelectedPool filterWritePool (PoolPreferenceLevel [] levels ,
274+ boolean filterByZone ,
275+ long preallocated ) throws CacheException {
255276 CostException fallback = null ;
277+
256278 for (PoolPreferenceLevel level : levels ) {
257279 List <PoolInfo > pools =
258- level .getPoolList ().stream ()
259- .map (_costModule ::getPoolInfo )
260- .filter (Objects ::nonNull )
261- .collect (toList ());
280+ level .getPoolList ().stream ()
281+ .map (_costModule ::getPoolInfo )
282+ .filter (Objects ::nonNull )
283+ .collect (toList ());
284+
285+ if (filterByZone ){
286+ pools = filterByZone (pools );
287+ }
288+
262289 if (!pools .isEmpty ()) {
263290 Partition partition = _partitionManager .getPartition (level .getTag ());
264291 try {
265292 return partition .selectWritePool (_costModule , pools , _fileAttributes ,
266- preallocated );
293+ preallocated );
267294 } catch (CostException e ) {
268295 if (!e .shouldFallBack ()) {
269296 throw e ;
@@ -273,16 +300,11 @@ public SelectedPool selectWritePool(long preallocated)
273300 }
274301 }
275302
276- /* We were asked to fall back, but all available links were
277- * exhausted. Let the caller deal with it.
278- */
279- if (fallback != null ) {
303+ if (!filterByZone && fallback != null ) {
280304 throw fallback ;
281305 }
282306
283- throw new CacheException (CacheException .NO_POOL_ONLINE ,
284- noOnlinePoolsErrorMessage (DirectionType .WRITE .name ()
285- .toLowerCase ()));
307+ return null ;
286308 }
287309
288310 @ Override
@@ -316,12 +338,37 @@ public SelectedPool selectReadPool()
316338 .toLowerCase ()));
317339 }
318340
341+
342+ if (_zone .isPresent ()) {
343+ SelectedPool pool = filterReadPool (level , onlinePoolsWithFile , true );
344+ if (pool != null ) {
345+ return pool ;
346+ }
347+ }
348+
349+ SelectedPool pool = filterReadPool (level , onlinePoolsWithFile , false );
350+ if (pool != null ) {
351+ return pool ;
352+ }
353+
354+ /* None of the pools we were allowed to read from were
355+ * online or had the file.
356+ */
357+ throw new PermissionDeniedCacheException (
358+ "File is online, but not in read-allowed pool" );
359+ }
360+
361+ @ Nullable
362+ private SelectedPool filterReadPool (PoolPreferenceLevel [] level ,
363+ Map <String , PoolInfo > onlinePoolsWithFile ,
364+ boolean filterByZone ) throws CacheException {
365+
319366 CostException costException = null ;
320367
321368 for (int prio = 0 ; prio < level .length ; prio ++) {
322369 List <String > poolsInCurrentLevel = level [prio ].getPoolList ();
323370 LOGGER .debug ("[read] Allowed pools at level {}: {}" ,
324- prio , poolsInCurrentLevel );
371+ prio , poolsInCurrentLevel );
325372
326373 if (poolsInCurrentLevel .isEmpty ()) {
327374 // No pools in this level....skip it.
@@ -339,7 +386,11 @@ public SelectedPool selectReadPool()
339386 }
340387 }
341388 LOGGER .debug ("[read] Available pools at level {}: {}" ,
342- prio , pools );
389+ prio , pools );
390+
391+ if (filterByZone ) {
392+ pools = filterByZone (pools );
393+ }
343394
344395 /* If allowed, fallback to next link if current link doesn't point
345396 * to any pool holding the file.
@@ -356,14 +407,14 @@ public SelectedPool selectReadPool()
356407 * to select a pool.
357408 */
358409 _partition =
359- _partitionManager .getPartition (level [prio ].getTag ());
410+ _partitionManager .getPartition (level [prio ].getTag ());
360411
361412 /* The actual pool selection is delegated to the
362413 * Partition.
363414 */
364415 try {
365416 return _partition .selectReadPool (_costModule , pools ,
366- _fileAttributes );
417+ _fileAttributes );
367418 } catch (CostException e ) {
368419 costException = e ;
369420 if (!e .shouldFallBack ()) {
@@ -372,25 +423,24 @@ public SelectedPool selectReadPool()
372423 }
373424 }
374425
375- /* If we have a CostException where a pool was selected and
376- * shouldTryAlternatives not set then we return that pool anyway.
377- * REVISIT: consider updating partitions so they don't throw
378- * an exception in this case.
379- */
426+ if (filterByZone ){
427+ return null ;
428+ }
380429 if (costException != null ) {
381430 if (costException .getPool () != null
382- && !costException .shouldTryAlternatives ()) {
431+ && !costException .shouldTryAlternatives ()) {
383432 return costException .getPool ();
384433 }
385-
386434 throw costException ;
387435 }
436+ return null ;
437+ }
388438
389- /* None of the pools we were allowed to read from were
390- * online or had the file.
391- */
392- throw new PermissionDeniedCacheException (
393- "File is online, but not in read-allowed pool" ) ;
439+ private List < PoolInfo > filterByZone ( List < PoolInfo > pools ) {
440+ pools = pools . stream ()
441+ . filter ( p -> Objects . equals ( p . getTags (). get ( "zone" ), _zone . get ()))
442+ . toList ();
443+ return pools ;
394444 }
395445
396446 @ Nullable
@@ -459,6 +509,7 @@ public Partition.P2pPair selectPool2Pool(String poolGroup,
459509 Lists .newArrayList (sources .values ()),
460510 pools ,
461511 _fileAttributes ,
512+ _zone ,
462513 force );
463514 }
464515 }
@@ -470,25 +521,48 @@ public Partition.P2pPair selectPool2Pool(String poolGroup,
470521
471522 @ Override
472523 public SelectedPool selectStagePool (Optional <PoolInfo > previous )
473- throws CacheException {
524+ throws CacheException {
474525 Collection <String > locations = filteredFileLocations ();
475526 LOGGER .debug ("[stage] Existing locations of the file: {}" , locations );
476527
528+ if (_zone .isPresent ()) {
529+ SelectedPool pool = filterStagePool (locations , previous , true );
530+ if (pool != null ) {
531+ return pool ;
532+ }
533+ }
534+
535+ SelectedPool pool = filterStagePool (locations , previous , false );
536+ if (pool != null ) {
537+ return pool ;
538+ }
539+
540+ throw new CacheException (149 , "No pool candidates available/configured/left for stage" );
541+ }
542+
543+ public SelectedPool filterStagePool (Collection <String > locations ,
544+ Optional <PoolInfo > previous ,
545+ boolean filterByZone ) throws CacheException {
546+
477547 CostException costException = null ;
478548 for (PoolPreferenceLevel level : match (DirectionType .CACHE )) {
479549 try {
480550 List <PoolInfo > pools =
481- level .getPoolList ().stream ()
482- .filter (pool -> !locations .contains (pool ))
483- .map (_costModule ::getPoolInfo )
484- .filter (Objects ::nonNull )
485- .collect (toList ());
551+ level .getPoolList ().stream ()
552+ .filter (pool -> !locations .contains (pool ))
553+ .map (_costModule ::getPoolInfo )
554+ .filter (Objects ::nonNull )
555+ .collect (toList ());
556+ if (filterByZone ) {
557+ pools = filterByZone (pools );
558+ }
559+
486560 if (!pools .isEmpty ()) {
487561 LOGGER .debug ("[stage] Online stage candidates: {}" , pools );
488562 Partition partition =
489- _partitionManager .getPartition (level .getTag ());
563+ _partitionManager .getPartition (level .getTag ());
490564 return partition .selectStagePool (_costModule , pools ,
491- previous , _fileAttributes );
565+ previous , _fileAttributes );
492566 }
493567 } catch (CostException e ) {
494568 costException = e ;
@@ -498,15 +572,15 @@ public SelectedPool selectStagePool(Optional<PoolInfo> previous)
498572 }
499573 }
500574
501- if (costException != null ) {
575+ if (costException != null && ! filterByZone ) {
502576 SelectedPool pool = costException .getPool ();
503577 if (pool != null ) {
504578 return pool ;
505579 }
506580 throw costException ;
507581 }
508582
509- throw new CacheException ( 149 , "No pool candidates available/configured/left for stage" ) ;
583+ return null ;
510584 }
511585
512586 // FIXME: There is a fair amount of overlap between this method
0 commit comments