Skip to content

Commit bcaf860

Browse files
committed
poolmanager: filter staging pools by zone
Motivation: When zones are enabled and configured, staged files should land in a pool in the same zone as the client that requested it. Modification: refactor selectStagePool with new filterStagePool helper method in PoolMonitorV5. Result: Best effort ("soft") filtering of stage pool lists by zone. Fallback to full unfiltered list, if no eligible pool in correct zone was found. if configured, p2p should trigger and copy to a pool in clients zone when selcted stage pool is not a read pool (no extra modification needed). Target: master Require-book: no
1 parent 32cd45a commit bcaf860

2 files changed

Lines changed: 72 additions & 12 deletions

File tree

modules/dcache/src/main/java/diskCacheV111/poolManager/PoolMonitorV5.java

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -521,25 +521,48 @@ public Partition.P2pPair selectPool2Pool(String poolGroup,
521521

522522
@Override
523523
public SelectedPool selectStagePool(Optional<PoolInfo> previous)
524-
throws CacheException {
524+
throws CacheException {
525525
Collection<String> locations = filteredFileLocations();
526526
LOGGER.debug("[stage] Existing locations of the file: {}", locations);
527527

528+
if (_zone.isPresent()) {
529+
SelectedPool pool = filterStagePool(locations, previous, true);
530+
if (pool != null) {
531+
return pool;
532+
}
533+
}
534+
535+
SelectedPool pool = filterStagePool(locations, previous, false);
536+
if (pool != null) {
537+
return pool;
538+
}
539+
540+
throw new CacheException(149, "No pool candidates available/configured/left for stage");
541+
}
542+
543+
public SelectedPool filterStagePool(Collection<String> locations,
544+
Optional<PoolInfo> previous,
545+
boolean filterByZone) throws CacheException{
546+
528547
CostException costException = null;
529548
for (PoolPreferenceLevel level : match(DirectionType.CACHE)) {
530549
try {
531550
List<PoolInfo> pools =
532-
level.getPoolList().stream()
533-
.filter(pool -> !locations.contains(pool))
534-
.map(_costModule::getPoolInfo)
535-
.filter(Objects::nonNull)
536-
.collect(toList());
551+
level.getPoolList().stream()
552+
.filter(pool -> !locations.contains(pool))
553+
.map(_costModule::getPoolInfo)
554+
.filter(Objects::nonNull)
555+
.collect(toList());
556+
if (filterByZone) {
557+
pools = filterByZone(pools);
558+
}
559+
537560
if (!pools.isEmpty()) {
538561
LOGGER.debug("[stage] Online stage candidates: {}", pools);
539562
Partition partition =
540-
_partitionManager.getPartition(level.getTag());
563+
_partitionManager.getPartition(level.getTag());
541564
return partition.selectStagePool(_costModule, pools,
542-
previous, _fileAttributes);
565+
previous, _fileAttributes);
543566
}
544567
} catch (CostException e) {
545568
costException = e;
@@ -549,15 +572,15 @@ public SelectedPool selectStagePool(Optional<PoolInfo> previous)
549572
}
550573
}
551574

552-
if (costException != null) {
575+
if (costException != null && !filterByZone) {
553576
SelectedPool pool = costException.getPool();
554577
if (pool != null) {
555578
return pool;
556579
}
557580
throw costException;
558581
}
559582

560-
throw new CacheException(149, "No pool candidates available/configured/left for stage");
583+
return null;
561584
}
562585

563586
// FIXME: There is a fair amount of overlap between this method

modules/dcache/src/test/java/org/dcache/tests/poolmanager/PoolMonitorTest.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,39 @@ public void testWritePoolFallBackAllPoolsInZoneFull() throws Exception {
223223

224224
assertEquals("pool2", selector.selectWritePool(0).name());
225225
}
226+
@Test
227+
public void testStagePoolZonePreference() throws Exception {
228+
_partitionManager.setProperties(null, Map.of("fallback-onspace", "yes"));
229+
prepareCostModule(false, true, false, false);
230+
231+
FileAttributes attributes = FileAttributes.of()
232+
.pnfsId(_pnfsId)
233+
.locations(Collections.emptyList())
234+
.build();
235+
StorageInfos.injectInto(_storageInfo, attributes);
236+
237+
PoolSelector selector = _poolMonitor.getPoolSelector(attributes, _protocolInfo,
238+
null, Optional.of("1"), Collections.EMPTY_SET);
239+
240+
assertEquals("pool1", selector.selectStagePool(Optional.empty()).name());
241+
}
242+
243+
@Test
244+
public void testStagePoolZoneFallback() throws Exception {
245+
_partitionManager.setProperties(null, Map.of("fallback-onspace", "yes"));
246+
prepareCostModule(false, true, false, true);
247+
248+
FileAttributes attributes = FileAttributes.of()
249+
.pnfsId(_pnfsId)
250+
.locations(Collections.emptyList())
251+
.build();
252+
StorageInfos.injectInto(_storageInfo, attributes);
253+
254+
PoolSelector selector = _poolMonitor.getPoolSelector(attributes, _protocolInfo,
255+
null, Optional.of("1"), Collections.EMPTY_SET);
256+
257+
assertEquals("pool2", selector.selectStagePool(Optional.empty()).name());
258+
}
226259

227260
private void prepareCostModule(boolean linkPerPool) throws Exception {
228261
prepareCostModule(linkPerPool, false);
@@ -255,7 +288,7 @@ private void prepareCostModule(boolean linkPerPool, boolean withZones, boolean p
255288
}
256289

257290
PoolManagerPoolUpMessage pool1UpMessage = new PoolManagerPoolUpMessage("pool1",
258-
serialId, poolMode, poolCost1);
291+
serialId, poolMode, poolCost1);
259292
pool1UpMessage.setHostName(_localhost);
260293

261294
if (withZones) {
@@ -266,21 +299,25 @@ private void prepareCostModule(boolean linkPerPool, boolean withZones, boolean p
266299
envelope1.addSourceAddress(new CellAddressCore("pool1"));
267300
_costModule.messageArrived(envelope1, pool1UpMessage);
268301

302+
_selectionUnit.getPool("pool1").setHsmInstances(Set.of("osm"));
303+
269304
if (!pool2Offline) {
270305
PoolCostInfo poolCost2 = new PoolCostInfo("pool2", IoQueueManager.DEFAULT_QUEUE);
271306
poolCost2.setSpaceUsage(100, 20, 30, 50);
272307
PoolManagerPoolUpMessage pool2UpMessage = new PoolManagerPoolUpMessage("pool2",
273-
serialId, poolMode, poolCost2);
308+
serialId, poolMode, poolCost2);
274309
pool2UpMessage.setHostName(_localhost);
275310
if (withZones) {
276311
pool2UpMessage.setTagMap(Map.of("zone", "2"));
277312
}
278313
CellMessage envelope2 = new CellMessage(new CellAddressCore("PoolManager"), null);
279314
envelope2.addSourceAddress(new CellAddressCore("pool2"));
280315
_costModule.messageArrived(envelope2, pool2UpMessage);
316+
_selectionUnit.getPool("pool2").setHsmInstances(Set.of("osm"));
281317
}
282318
}
283319

320+
284321
private PoolSelector prepareHostExclusion() throws Exception {
285322
/*
286323
* Emulate the poolup message arrival on the PoolManager

0 commit comments

Comments
 (0)