@@ -52,13 +52,18 @@ public override async Task<IndividualDetectorScanResult> ExecuteDetectorAsync(Sc
5252 /// <summary>
5353 /// Pre-filters scan work to values files that are co-located with a Helm chart.
5454 ///
55- /// This is intentionally implemented as a streaming pipeline (instead of
56- /// materializing all matching files with ToList) to reduce peak memory usage and
57- /// start emitting work earlier on large repositories.
55+ /// This is intentionally implemented as a streaming pipeline with per-directory
56+ /// buffering (not a two-pass approach that materializes all files first).
57+ /// Instead of collecting all matching files upfront, this observable pipeline:
58+ /// 1. Streams file matches as they arrive from the file walker
59+ /// 2. Buffers values files per directory until a Chart.yaml is observed
60+ /// 3. Immediately emits Chart files to mark their directory as a valid Helm chart
61+ /// 4. Releases buffered values files for directories that now have charts
5862 ///
59- /// Enumeration order is not guaranteed, so values files may be seen before the
60- /// corresponding Chart.yaml. To preserve correctness, values files are buffered
61- /// per directory until a chart file for that directory is observed, then released.
63+ /// This approach reduces peak memory usage and starts emitting work earlier on
64+ /// large repositories compared to a full materialization pass. Enumeration order
65+ /// is not guaranteed, so values files may be seen before Chart.yaml; per-directory
66+ /// buffering ensures correctness regardless of enumeration order.
6267 /// </summary>
6368 /// <returns>
6469 /// A prepared observable that emits only values-file requests belonging to
@@ -81,47 +86,59 @@ protected override Task<IObservable<ProcessRequest>> OnPrepareDetectionAsync(
8186 var fileName = Path . GetFileName ( request . ComponentStream . Location ) ;
8287 var directory = Path . GetDirectoryName ( request . ComponentStream . Location ) ?? string . Empty ;
8388
89+ var requestsToEmit = ( List < ProcessRequest > ? ) null ;
90+ var emitCurrentRequest = false ;
91+
8492 // Protect shared state because IObservable callbacks may arrive concurrently.
93+ // Decide what to emit inside the lock, then emit outside to avoid blocking
94+ // other concurrent operations and reduce risk of deadlocks in Rx pipelines.
8595 lock ( gate )
8696 {
8797 if ( IsChartFile ( fileName ) )
8898 {
8999 // Mark this directory as a Helm chart directory.
90100 this . helmChartDirectories . TryAdd ( directory , true ) ;
91101
92- // Release any values files that arrived earlier for this directory.
102+ // Capture any values files that arrived earlier for this directory
103+ // to be emitted after releasing the lock.
93104 if ( pendingValuesByDirectory . Remove ( directory , out var pendingRequests ) )
94105 {
95- foreach ( var pendingRequest in pendingRequests )
96- {
97- observer . OnNext ( pendingRequest ) ;
98- }
106+ requestsToEmit = pendingRequests ;
99107 }
100-
101- return ;
102108 }
103-
104- if ( ! IsValuesFile ( fileName ) )
109+ else if ( IsValuesFile ( fileName ) )
105110 {
106- // Ignore non-values files (Chart files are handled above).
107- return ;
108- }
111+ if ( this . helmChartDirectories . ContainsKey ( directory ) )
112+ {
113+ // Directory is already known to be a chart; mark for immediate emission.
114+ emitCurrentRequest = true ;
115+ }
116+ else
117+ {
118+ // Chart file not seen yet for this directory; queue for later release.
119+ if ( ! pendingValuesByDirectory . TryGetValue ( directory , out var pendingRequestsForDirectory ) )
120+ {
121+ pendingRequestsForDirectory = [ ] ;
122+ pendingValuesByDirectory [ directory ] = pendingRequestsForDirectory ;
123+ }
109124
110- if ( this . helmChartDirectories . ContainsKey ( directory ) )
111- {
112- // Directory is already known to be a chart; emit immediately.
113- observer . OnNext ( request ) ;
114- return ;
125+ pendingRequestsForDirectory . Add ( request ) ;
126+ }
115127 }
128+ }
116129
117- // Chart file not seen yet for this directory; queue for later release.
118- if ( ! pendingValuesByDirectory . TryGetValue ( directory , out var pendingRequestsForDirectory ) )
130+ // Emit outside the lock to avoid blocking other concurrent operations.
131+ if ( requestsToEmit != null )
132+ {
133+ foreach ( var pendingRequest in requestsToEmit )
119134 {
120- pendingRequestsForDirectory = [ ] ;
121- pendingValuesByDirectory [ directory ] = pendingRequestsForDirectory ;
135+ observer . OnNext ( pendingRequest ) ;
122136 }
137+ }
123138
124- pendingRequestsForDirectory . Add ( request ) ;
139+ if ( emitCurrentRequest )
140+ {
141+ observer . OnNext ( request ) ;
125142 }
126143 } ,
127144 observer . OnError ,
0 commit comments