11package org .hypertrace .core .kafkastreams .framework .punctuators ;
22
3- import io .micrometer .core .instrument .Gauge ;
43import io .micrometer .core .instrument .MeterRegistry ;
4+ import io .micrometer .core .instrument .Tags ;
55import java .time .Clock ;
66import java .util .ArrayList ;
77import java .util .Collections ;
1010import java .util .List ;
1111import java .util .Map ;
1212import java .util .Optional ;
13+ import java .util .concurrent .ConcurrentHashMap ;
1314import java .util .concurrent .atomic .AtomicInteger ;
1415import lombok .extern .slf4j .Slf4j ;
1516import org .apache .kafka .streams .KeyValue ;
2021
2122@ Slf4j
2223public abstract class AbstractThrottledPunctuator <T > implements Punctuator {
24+ private static final ConcurrentHashMap <String , AtomicInteger > totalEventCountGauge =
25+ new ConcurrentHashMap <>();
26+ private static final String TOTAL_EVENT_COUNT_GAUGE_NAME =
27+ "abstract.throttled.punctuator.total.events.count" ;
2328 private final Clock clock ;
2429 private final KeyValueStore <Long , List <T >> eventStore ;
2530 private final ThrottledPunctuatorConfig config ;
26- private final CustomMetrics customMetrics ;
27-
28- public AbstractThrottledPunctuator (
29- Clock clock , ThrottledPunctuatorConfig config , KeyValueStore <Long , List <T >> eventStore ) {
30- this .clock = clock ;
31- this .config = config ;
32- this .eventStore = eventStore ;
33- this .customMetrics = null ;
34- }
31+ private final MeterRegistry meterRegistry ;
3532
3633 public AbstractThrottledPunctuator (
3734 Clock clock ,
3835 ThrottledPunctuatorConfig config ,
3936 KeyValueStore <Long , List <T >> eventStore ,
40- MeterRegistry meterRegistry ,
41- String name ) {
37+ MeterRegistry meterRegistry ) {
4238 this .clock = clock ;
4339 this .config = config ;
4440 this .eventStore = eventStore ;
45- this .customMetrics = new CustomMetrics ( meterRegistry , name ) ;
41+ this .meterRegistry = meterRegistry ;
4642 }
4743
4844 public void scheduleTask (long scheduleMs , T event ) {
@@ -85,6 +81,7 @@ public final void punctuate(long timestamp) {
8581 long startTime = clock .millis ();
8682 int totalProcessedWindows = 0 ;
8783 int totalProcessedTasks = 0 ;
84+ int totalEventCount = 0 ;
8885
8986 log .debug (
9087 "Processing tasks with throttling yield of {} until timestamp {}" ,
@@ -98,11 +95,11 @@ public final void punctuate(long timestamp) {
9895 totalProcessedWindows ++;
9996 List <T > events = kv .value ;
10097 long windowMs = kv .key ;
98+ totalEventCount += events .size ();
10199 // collect all tasks to be rescheduled by key to perform bulk reschedules
102100 Map <Long , List <T >> rescheduledTasks = new HashMap <>();
103101 // loop through all events for this key until yield timeout is reached
104102 int i = 0 ;
105- int tasksBefore = totalProcessedTasks ;
106103 for (; i < events .size () && !shouldYieldNow (startTime ); i ++) {
107104 T event = events .get (i );
108105 totalProcessedTasks ++;
@@ -115,10 +112,6 @@ public final void punctuate(long timestamp) {
115112 .computeIfAbsent (normalize (rescheduleTimestamp ), (t ) -> new ArrayList <>())
116113 .add (event ));
117114 }
118- if (customMetrics != null ) {
119- customMetrics .setTaskProcessingStatus (
120- totalProcessedTasks > tasksBefore ? 0 : 1 ); // 0 if at least one task processed
121- }
122115 // process all reschedules
123116 rescheduledTasks .forEach (
124117 (newWindowMs , rescheduledEvents ) -> {
@@ -146,14 +139,16 @@ public final void punctuate(long timestamp) {
146139 }
147140 }
148141 }
149- if (customMetrics != null ) {
150- customMetrics .setEventStoreSize (currentTotalEventCount ());
151- }
142+ long timeTakenMs = clock .millis () - startTime ;
143+ boolean yielded = shouldYieldNow (startTime );
144+ updateEventCountGauge (totalEventCount , yielded );
145+
152146 log .debug (
153- "processed windows: {}, processed tasks: {}, time taken: {}" ,
147+ "processed windows: {}, processed tasks: {}, total events: {}, time taken: {}ms " ,
154148 totalProcessedWindows ,
155149 totalProcessedTasks ,
156- clock .millis () - startTime );
150+ totalEventCount ,
151+ timeTakenMs );
157152 }
158153
159154 protected abstract TaskResult executeTask (long punctuateTimestamp , T object );
@@ -174,45 +169,22 @@ private long normalize(long timestamp) {
174169 return timestamp - (timestamp % config .getWindowMs ());
175170 }
176171
177- private int currentTotalEventCount () {
178- int count = 0 ;
179- try (KeyValueIterator <Long , List <T >> it = eventStore .all ()) {
180- while (it .hasNext ()) {
181- count += Optional .ofNullable (it .next ().value ).map (List ::size ).orElse (0 );
182- }
172+ private void updateEventCountGauge (int totalEventCount , boolean yielded ) {
173+ if (meterRegistry == null ) {
174+ return ;
183175 }
184- return count ;
185- }
186176
187- protected final class CustomMetrics {
188- private final AtomicInteger eventStoreSize ;
189- private final AtomicInteger noTasksProcessed ;
190-
191- public CustomMetrics (MeterRegistry meterRegistry , String name ) {
192- this .noTasksProcessed = new AtomicInteger (1 ); // default to 1 = no tasks processed
193- this .eventStoreSize = new AtomicInteger (0 );
194- if (meterRegistry != null ) {
195- Gauge .builder ("abstract.throttled.punctuator.task.size" , eventStoreSize , AtomicInteger ::get )
196- .description ("Total number of scheduled tasks across all windows" )
197- .tag ("name" , name )
198- .register (meterRegistry );
199- Gauge .builder (
200- "abstract.throttled.punctuator.no.tasks.processed" ,
201- noTasksProcessed ,
202- AtomicInteger ::get )
203- .description (
204- "1 if no tasks processed in last punctuate call, 0 if at least one processed" )
205- .tag ("name" , name )
206- .register (meterRegistry );
207- }
208- }
177+ String tagValue = String .valueOf (yielded );
209178
210- public void setTaskProcessingStatus (int value ) {
211- this .noTasksProcessed .set (value );
212- }
179+ AtomicInteger gauge =
180+ totalEventCountGauge .computeIfAbsent (
181+ tagValue ,
182+ key -> {
183+ AtomicInteger newGauge = new AtomicInteger (0 );
184+ meterRegistry .gauge (TOTAL_EVENT_COUNT_GAUGE_NAME , Tags .of ("yielded" , key ), newGauge );
185+ return newGauge ;
186+ });
213187
214- public void setEventStoreSize (int value ) {
215- this .eventStoreSize .set (value );
216- }
188+ gauge .set (totalEventCount );
217189 }
218190}
0 commit comments