diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/PooledHashMap.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/PooledHashMap.java index 8975a9ec84e..ceb5878c79c 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/PooledHashMap.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/PooledHashMap.java @@ -222,12 +222,14 @@ public void clear() { size = 0; } + // Walks each bucket back-to-front so the action can remove the entry it's currently on (as metric + // collection does to drop stale series) without the next entry being skipped. @Override public void forEach(BiConsumer action) { for (int j = 0; j < table.length; j++) { ArrayList> bucket = table[j]; if (bucket != null) { - for (int i = 0; i < bucket.size(); i++) { + for (int i = bucket.size() - 1; i >= 0; i--) { Entry entry = bucket.get(i); action.accept(entry.key, entry.value); } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java index e16c61fbb34..de6a29d76de 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorageTest.java @@ -39,6 +39,9 @@ import java.time.Duration; import java.time.Instant; import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.Set; +import java.util.stream.Collectors; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; @@ -413,6 +416,42 @@ void collect_CumulativeSeriesDisappearsAndReappears(MemoryMode memoryMode) { .hasValue(7))); } + @ParameterizedTest + @EnumSource(MemoryMode.class) + void collect_CumulativeRetainsLiveSeriesWhenStaleSeriesAreRemoved(MemoryMode memoryMode) { + setup(memoryMode); + + // Enough series to share buckets, but under the cardinality limit so there's no overflow + // series. + int seriesCount = 20; + + // First collection reports everything. + testClock.advance(Duration.ofSeconds(10)); + for (int i = 0; i < seriesCount; i++) { + longCounterStorage.record(Attributes.builder().put("key", "v" + i).build(), 1); + } + longCounterStorage.collect(resource, scope, testClock.now()); + registeredReader.setLastCollectEpochNanos(testClock.now()); + + // Next time only the even series report; the odd ones go stale and get removed while iterating. + // None of the live series should be skipped. + testClock.advance(Duration.ofSeconds(10)); + Set expected = new LinkedHashSet<>(); + for (int i = 0; i < seriesCount; i += 2) { + Attributes attributes = Attributes.builder().put("key", "v" + i).build(); + longCounterStorage.record(attributes, 2); + expected.add(attributes); + } + + MetricData metricData = longCounterStorage.collect(resource, scope, testClock.now()); + Set collected = + metricData.getLongSumData().getPoints().stream() + .map(PointData::getAttributes) + .collect(Collectors.toCollection(LinkedHashSet::new)); + + assertThat(collected).isEqualTo(expected); + } + @ParameterizedTest @EnumSource(MemoryMode.class) void collect_DeltaComputesDiff(MemoryMode memoryMode) { diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/PooledHashMapTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/PooledHashMapTest.java index 0e4a5ab0328..8c68988a44a 100644 --- a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/PooledHashMapTest.java +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/internal/state/PooledHashMapTest.java @@ -9,6 +9,7 @@ import java.util.HashMap; import java.util.Map; +import javax.annotation.Nullable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -73,4 +74,46 @@ void forEachTest() { assertThat(actualMap).containsOnlyKeys("One", "Two").containsValues(1, 2); } + + @Test + void forEachAllowsRemovalOfCurrentEntry() { + // Keys with the same hashcode always share a bucket (independent of capacity), mirroring + // collection removing a stale series while iterating the others. + PooledHashMap collidingMap = new PooledHashMap<>(); + SameBucketKey first = new SameBucketKey("first"); + SameBucketKey second = new SameBucketKey("second"); + collidingMap.put(first, 1); + collidingMap.put(second, 2); + + Map visited = new HashMap<>(); + collidingMap.forEach( + (key, value) -> { + visited.put(key, value); + if (visited.size() == 1) { + collidingMap.remove(key); // drop the first one we see + } + }); + + assertThat(visited).containsOnlyKeys(first, second).containsValues(1, 2); + assertThat(collidingMap.size()).isEqualTo(1); + } + + /** Key whose hashcode is constant, so all instances fall in the same bucket. */ + private static final class SameBucketKey { + private final String name; + + SameBucketKey(String name) { + this.name = name; + } + + @Override + public int hashCode() { + return 1; + } + + @Override + public boolean equals(@Nullable Object o) { + return o instanceof SameBucketKey && name.equals(((SameBucketKey) o).name); + } + } }