22namespace Microsoft . ComponentDetection . Detectors . Helm ;
33
44using System ;
5- using System . Collections . Concurrent ;
65using System . Collections . Generic ;
76using System . IO ;
7+ using System . Linq ;
88using System . Reactive . Linq ;
99using System . Threading ;
1010using System . Threading . Tasks ;
@@ -17,8 +17,6 @@ namespace Microsoft.ComponentDetection.Detectors.Helm;
1717
1818public class HelmComponentDetector : FileComponentDetector , IDefaultOffComponentDetector
1919{
20- private readonly ConcurrentDictionary < string , bool > helmChartDirectories = new ( StringComparer . OrdinalIgnoreCase ) ;
21-
2220 public HelmComponentDetector (
2321 IComponentStreamEnumerableFactory componentStreamEnumerableFactory ,
2422 IObservableDirectoryWalkerFactory walkerFactory ,
@@ -43,120 +41,28 @@ public HelmComponentDetector(
4341
4442 public override IEnumerable < string > Categories => [ nameof ( DetectorClass . Helm ) ] ;
4543
46- public override async Task < IndividualDetectorScanResult > ExecuteDetectorAsync ( ScanRequest request , CancellationToken cancellationToken = default )
47- {
48- this . helmChartDirectories . Clear ( ) ;
49- return await base . ExecuteDetectorAsync ( request , cancellationToken ) ;
50- }
51-
5244 /// <summary>
53- /// Pre-filters scan work to values files that are co-located with a Helm chart.
54- ///
55- /// This is intentionally implemented as a streaming pipeline with per-directory
56- /// buffering (not a two-pass approach that materializes all files first).
57- /// Instead of collecting all matching files upfront, this observable pipeline:
58- /// 1. Streams file matches as they arrive from the file walker
59- /// 2. Buffers values files per directory until a Chart.yaml is observed
60- /// 3. Immediately emits Chart files to mark their directory as a valid Helm chart
61- /// 4. Releases buffered values files for directories that now have charts
62- ///
63- /// This approach reduces peak memory usage and starts emitting work earlier on
64- /// large repositories compared to a full materialization pass. Enumeration order
65- /// is not guaranteed, so values files may be seen before Chart.yaml; per-directory
66- /// buffering ensures correctness regardless of enumeration order.
45+ /// Pre-filters scan work to only values files co-located with a Chart.yaml/Chart.yml.
46+ /// Materializes all matched files, identifies Helm chart directories, then filters.
6747 /// </summary>
68- /// <returns>
69- /// A prepared observable that emits only values-file requests belonging to
70- /// directories that contain a Chart.yaml/Chart.yml.
71- /// </returns>
72- protected override Task < IObservable < ProcessRequest > > OnPrepareDetectionAsync (
48+ /// <returns>An observable of only the values-file requests in Helm chart directories.</returns>
49+ protected override async Task < IObservable < ProcessRequest > > OnPrepareDetectionAsync (
7350 IObservable < ProcessRequest > processRequests ,
7451 IDictionary < string , string > detectorArgs ,
7552 CancellationToken cancellationToken = default )
7653 {
77- return Task . FromResult ( Observable . Create < ProcessRequest > ( observer =>
78- {
79- // Buffer only values files whose directory has not yet produced a Chart file.
80- var pendingValuesByDirectory = new Dictionary < string , List < ProcessRequest > > ( StringComparer . OrdinalIgnoreCase ) ;
81- var gate = new object ( ) ;
82-
83- var subscription = processRequests . Subscribe (
84- request =>
85- {
86- var fileName = Path . GetFileName ( request . ComponentStream . Location ) ;
87- var directory = Path . GetDirectoryName ( request . ComponentStream . Location ) ?? string . Empty ;
88-
89- var requestsToEmit = ( List < ProcessRequest > ? ) null ;
90- var emitCurrentRequest = false ;
91-
92- // Protect shared state because IObservable callbacks may arrive concurrently.
93- // Decide what to emit inside the lock, then emit outside to avoid blocking
94- // other concurrent operations and reduce risk of deadlocks in Rx pipelines.
95- lock ( gate )
96- {
97- if ( IsChartFile ( fileName ) )
98- {
99- // Mark this directory as a Helm chart directory.
100- this . helmChartDirectories . TryAdd ( directory , true ) ;
101-
102- // Capture any values files that arrived earlier for this directory
103- // to be emitted after releasing the lock.
104- if ( pendingValuesByDirectory . Remove ( directory , out var pendingRequests ) )
105- {
106- requestsToEmit = pendingRequests ;
107- }
108- }
109- else if ( IsValuesFile ( fileName ) )
110- {
111- if ( this . helmChartDirectories . ContainsKey ( directory ) )
112- {
113- // Directory is already known to be a chart; mark for immediate emission.
114- emitCurrentRequest = true ;
115- }
116- else
117- {
118- // Chart file not seen yet for this directory; queue for later release.
119- if ( ! pendingValuesByDirectory . TryGetValue ( directory , out var pendingRequestsForDirectory ) )
120- {
121- pendingRequestsForDirectory = [ ] ;
122- pendingValuesByDirectory [ directory ] = pendingRequestsForDirectory ;
123- }
124-
125- pendingRequestsForDirectory . Add ( request ) ;
126- }
127- }
128- }
129-
130- // Emit outside the lock to avoid blocking other concurrent operations.
131- if ( requestsToEmit != null )
132- {
133- foreach ( var pendingRequest in requestsToEmit )
134- {
135- observer . OnNext ( pendingRequest ) ;
136- }
137- }
138-
139- if ( emitCurrentRequest )
140- {
141- observer . OnNext ( request ) ;
142- }
143- } ,
144- observer . OnError ,
145- observer . OnCompleted ) ;
146-
147- var cancellationRegistration = cancellationToken . Register ( ( ) =>
148- {
149- // Stop forwarding events if detection is cancelled.
150- subscription . Dispose ( ) ;
151- observer . OnCompleted ( ) ;
152- } ) ;
153-
154- return ( ) =>
155- {
156- cancellationRegistration . Dispose ( ) ;
157- subscription . Dispose ( ) ;
158- } ;
159- } ) ) ;
54+ var allRequests = await processRequests . ToList ( ) ;
55+
56+ var chartDirectories = new HashSet < string > (
57+ allRequests
58+ . Where ( r => IsChartFile ( Path . GetFileName ( r . ComponentStream . Location ) ) )
59+ . Select ( r => Path . GetDirectoryName ( r . ComponentStream . Location ) ?? string . Empty ) ,
60+ StringComparer . OrdinalIgnoreCase ) ;
61+
62+ return allRequests
63+ . Where ( r => IsValuesFile ( Path . GetFileName ( r . ComponentStream . Location ) )
64+ && chartDirectories . Contains ( Path . GetDirectoryName ( r . ComponentStream . Location ) ?? string . Empty ) )
65+ . ToObservable ( ) ;
16066 }
16167
16268 protected override async Task OnFileFoundAsync ( ProcessRequest processRequest , IDictionary < string , string > detectorArgs , CancellationToken cancellationToken = default )
0 commit comments