Skip to content

joeloudjinz/InzPipeline

Repository files navigation

InzPipeline - Fluent Pipeline Processing Library

Full Unit Tests NuGet Version NuGet Downloads

Table of Contents

Overview

InzPipeline is a sophisticated .NET 9.0 library that implements a fluent pipeline pattern for processing data through a series of configurable, composable, and resilient steps (pipes). The library provides a powerful API for building data processing pipelines where each pipe performs a specific transformation or operation on data as it flows through the pipeline.

The architecture follows the pipeline pattern with a fluent builder interface that allows for sequential, parallel, conditional, and sub-pipeline execution. InzPipeline provides comprehensive error handling, performance monitoring, cancellation support, and resource sharing capabilities to create robust and maintainable data processing workflows.

The solution is organized into three primary projects:

  • InzPipeline.Core: Core pipeline implementation with interfaces and pipeline orchestration
  • InzPipeline.Sample: Example demonstrating how to use the pipeline library
  • InzPipeline.Tests: Comprehensive test suite with unit and integration tests

Key Features

Sequential Execution

Execute pipes one after another in a specified order with full control over data flow:

var pipeline = new PipelineBuilder<InputData, OutputData>();
await pipeline.SetSource(input)
    .AttachContext(context)
    .AttachPipe(new StepOnePipe())
    .AttachPipe(new StepTwoPipe())
    .AttachPipe(new StepThreePipe())
    .Flush();

Parallel Execution

Execute multiple pipes concurrently to improve performance for independent operations:

await pipeline
    .AttachParallelPipes(
        new StepOnePipe(),
        new StepTwoPipe(),
        new StepThreePipe()
    )
    .Flush();

Conditional Execution

Execute pipes based on runtime conditions, allowing for dynamic pipeline behavior:

await pipeline.SetSource(input)
    .AttachContext(context)
    .AttachPipe(new StepOnePipe())
    .AttachConditionalPipe(new StepTwoPipe(), context => context.Input.CanExecutePipeTwo)
    .AttachConditionalPipe(new StepThreePipe(), context => !context.Input.CanExecutePipeTwo)
    .Flush();

Sub-Pipelines

Create reusable pipeline components for better composability and maintainability:

var dataProcessingPipeline = new SubPipeline<InputData, OutputData>(subBuilder =>
    subBuilder
        .AttachPipe(new ValidateInputPipe())
        .AttachPipe(new TransformDataPipe())
        .AttachPipe(new EnrichDataPipe())
);

await pipeline
    .AttachSubPipeline(dataProcessingPipeline)
    .Flush();

Resource Sharing and Context Management

Share data between pipes through the context repository with thread-safe operations:

// In one pipe
context.AddResource("userId", 123);

// In another pipe
var userId = context.GetResource<int>("userId");

// Thread-safe operations
context.TryAddResource("cacheKey", "value"); // Returns false if key exists
context.UpdateResource("existingKey", "newValue");

Advanced Error Handling

Comprehensive error handling with multiple policies and strategies for resilience:

  • Policies (applied during execution):

    • Retry Policy: Automatically retry failed operations during execution
    • Circuit Breaker Policy: Prevent cascading failures by temporarily blocking requests when failures exceed a threshold
    • Fallback Policy: Execute alternative operations when the primary pipe fails during execution
  • Recovery Strategies (applied after failure occurs):

    • Circuit Breaker Strategy: Reactive circuit breaker for post-failure recovery
    • Retry With Backoff Strategy: Retry failed operations with configurable delays and exponential backoff after failure occurs

For more detailed information on implementing these error handling techniques, see the Error Handling Guide.

Pipeline Validation

Comprehensive validation system to catch configuration issues early:

// Attach a custom validator (optional - if not attached, default validation is used)
var validator = new DefaultPipelineValidator<InputData, OutputData>();
await pipeline.AttachValidator(validator);

// Validate the pipeline configuration
await pipeline.SetSource(input)
    .AttachContext(context)
    .AttachPipe(new StepOnePipe())
    .AttachPipe(new StepTwoPipe())
    .ValidateConfiguration(); // This will throw an exception if validation fails

// Or get validation results without throwing
var (errors, warnings) = await pipeline.ValidateConfigurationForResult();
if (errors.Any())
{
    // Handle errors
    Console.WriteLine($"Validation errors: {string.Join(", ", errors)}");
}

// Pipes can declare their resource dependencies to enable validation:
public class MyPipe : IPipe<InputData, OutputData>
{
    public async Task Handle(IPipelineContext<InputData, OutputData> context, CancellationToken cancellationToken = default)
    {
        // Pipe implementation
        await Task.CompletedTask;
    }
    
    // Declare required resources
    public IEnumerable<string> GetRequiredResources() => ["user_id", "session_token"];
    
    // Declare provided resources
    public IEnumerable<string> GetProvidedResources() => ["processed_data"];
}

Performance Monitoring and Metrics

Detailed performance tracking with comprehensive metrics collection:

await pipeline.SetSource(input)
    .AttachContext(context)
    .EnablePerformanceMetrics("my-correlation-id")
    .AttachPipe(new StepOnePipe())
    .AttachPipe(new StepTwoPipe())
    .Flush();

Console.WriteLine(context.GetPerformanceMetricsSummary());

// Or access individual metrics
Console.WriteLine($"Total Duration: {context.PerformanceMetrics?.TotalDurationMs:F2} ms");
Console.WriteLine($"Initial Memory: {context.PerformanceMetrics?.MemoryMetrics?.InitialMemoryBytes} bytes");
Console.WriteLine($"Final Memory: {context.PerformanceMetrics?.MemoryMetrics?.FinalMemoryBytes} bytes");

Enhanced Cancellation Support

Granular control over cancellation with proper propagation to sub-pipelines and ICancellablePipe implementation:

var cancellationTokenSource = new CancellationTokenSource();

// Use cancellation with pipeline execution
await pipeline.SetSource(input)
    .AttachContext(context)
    .AttachPipe(new StepOnePipe())
    .AttachPipe(new StepTwoPipe())
    .Flush(cancellationTokenSource.Token);

// Later cancel the operation
cancellationTokenSource.Cancel();

For pipes that need to handle cancellation more gracefully and perform cleanup operations, implement ICancellablePipe:

public class MyCancellablePipe : ICancellablePipe<InputData, OutputData>
{
    public async Task Handle(IPipelineContext<InputData, OutputData> context, CancellationToken cancellationToken = default)
    {
        // Check for cancellation periodically during long-running operations
        for (int i = 0; i < 100; i++)
        {
            cancellationToken.ThrowIfCancellationRequested();
            await Task.Delay(100, cancellationToken);
            // Perform work
        }
    }
    
    public async Task HandleCancellation(IPipelineContext<InputData, OutputData> context, CancellationToken cancellationToken = default)
    {
        // Perform cleanup operations when cancellation is requested
        // This method is called before the pipe execution is terminated
        // allowing for proper resource cleanup and state management
        Console.WriteLine("Performing cleanup before cancellation...");
        await Task.CompletedTask;
    }
}

Type Safety and Async Operations

Full support for strongly-typed generic pipelines with async/await patterns:

public class PipelineBuilder<TIn, TOut> where TOut : class
{
    // Strongly typed pipeline with compile-time safety
}

Installation

The InzPipeline library is available as a NuGet package:

Package Manager

Install-Package InzSoftwares.NetPipeline

.NET CLI

dotnet add package InzSoftwares.NetPipeline

Package Reference

<PackageReference Include="InzSoftwares.NetPipeline" Version="0.0.3" />

Getting Started

Prerequisites

  • .NET 9.0 SDK or later

Quick Start

  1. Install the NuGet package into your project as shown above.

  2. Define your data models:

public class InputData
{
    public string Name { get; set; } = string.Empty;
    public int Age { get; set; }
    public string Email { get; set; } = string.Empty;
}

public class OutputData
{
    public bool IsValid { get; set; }
    public string ProcessedName { get; set; } = string.Empty;
    public string HashedEmail { get; set; } = string.Empty;
    public List<string> Warnings { get; set; } = new();
}
  1. Create a pipeline context:
using InzPipeline.Core;

public class ProcessingContext : PipelineContext<InputData, OutputData>
{
    public ProcessingContext()
    {
        Input = new InputData();
        Output = new OutputData();
    }
}
  1. Implement your pipes:
using InzPipeline.Core.Contracts;

public class ValidateInputPipe : IPipe<InputData, OutputData>
{
    public async Task Handle(IPipelineContext<InputData, OutputData> context, 
        CancellationToken cancellationToken = default)
    {
        if (string.IsNullOrEmpty(context.Input.Name))
        {
            context.Output.Warnings.Add("Name is empty");
        }
        
        context.Output.IsValid = !string.IsNullOrEmpty(context.Input.Email) && context.Input.Age >= 0;
        
        await Task.CompletedTask;
    }
}

public class ProcessNamePipe : IPipe<InputData, OutputData>
{
    public async Task Handle(IPipelineContext<InputData, OutputData> context, 
        CancellationToken cancellationToken = default)
    {
        context.Output.ProcessedName = context.Input.Name.ToUpper();
        await Task.CompletedTask;
    }
}
  1. Build and execute your pipeline:
using InzPipeline.Core;

var pipeline = new PipelineBuilder<InputData, OutputData>();
var context = new ProcessingContext();

await pipeline.SetSource(new InputData { Name = "John Doe", Email = "john@example.com", Age = 30 })
    .AttachContext(context)
    .AttachPipe(new ValidateInputPipe())
    .AttachPipe(new ProcessNamePipe())
    .Flush();

Console.WriteLine($"Result: {context.Output.ProcessedName}, Valid: {context.Output.IsValid}");

Usage Examples

Here's a complete example of using the InzPipeline library:

1. Define your input and output data models:

public class UserInput
{
    public string Name { get; set; } = string.Empty;
    public int Age { get; set; }
    public string Email { get; set; } = string.Empty;
}

public class UserOutput 
{
    public bool IsValid { get; set; }
    public string ProcessedName { get; set; } = string.Empty;
    public string HashedEmail { get; set; } = string.Empty;
    public List<string> Warnings { get; set; } = new();
}

2. Create a pipeline context:

public class UserProcessingContext : PipelineContext<UserInput, UserOutput>
{
    public UserProcessingContext()
    {
        Input = new UserInput();
        Output = new UserOutput();
    }
}

3. Implement your pipes:

public class ValidateUserPipe : IPipe<UserInput, UserOutput>
{
    public async Task Handle(IPipelineContext<UserInput, UserOutput> context, 
        CancellationToken cancellationToken = default)
    {
        if (string.IsNullOrEmpty(context.Input.Name))
        {
            context.Output.Warnings.Add("Name is empty");
        }
        
        context.Output.IsValid = !string.IsNullOrEmpty(context.Input.Email) && context.Input.Age >= 0;
        
        await Task.CompletedTask;
    }
}

public class ProcessUserNamePipe : IPipe<UserInput, UserOutput>
{
    public async Task Handle(IPipelineContext<UserInput, UserOutput> context, 
        CancellationToken cancellationToken = default)
    {
        context.Output.ProcessedName = context.Input.Name.ToUpper();
        await Task.CompletedTask;
    }
}

public class HashEmailPipe : IPipe<UserInput, UserOutput>
{
    public async Task Handle(IPipelineContext<UserInput, UserOutput> context, 
        CancellationToken cancellationToken = default)
    {
        // Example implementation for hashing email
        context.Output.HashedEmail = Convert.ToBase64String(Encoding.UTF8.GetBytes(context.Input.Email));
        await Task.CompletedTask;
    }
}

4. Build and execute your pipeline:

var pipeline = new PipelineBuilder<UserInput, UserOutput>();
var context = new UserProcessingContext();

await pipeline.SetSource(new UserInput { Name = "John Doe", Email = "john@example.com", Age = 30 })
    .AttachContext(context)
    .AttachPipe(new ValidateUserPipe())
    .AttachPipe(new ProcessUserNamePipe())
    .AttachPipe(new HashEmailPipe())
    .Flush();

Console.WriteLine($"Result: {context.Output.ProcessedName}, Valid: {context.Output.IsValid}");

Advanced Features

Error Handling with Policies

// Retry Policy - automatically retry failed operations during execution
await pipeline.SetSource(input)
    .AttachContext(context)
    .AttachPipeWithRetryPolicy(new ExternalServicePipe(), maxAttempts: 3, delay: TimeSpan.FromSeconds(1))
    .Flush();

// Circuit Breaker Policy - prevent cascading failures during execution
await pipeline.SetSource(input)
    .AttachContext(context)
    .AttachPipeWithCircuitBreakerPolicy(new DatabasePipe(), failureThreshold: 5, timeout: TimeSpan.FromMinutes(2))
    .Flush();

// Fallback Policy - execute alternative operations when primary fails during execution
await pipeline.SetSource(input)
    .AttachContext(context)
    .AttachPipeWithFallbackPolicy(new PrimaryServicePipe(), new FallbackServicePipe())
    .Flush();

Recovery Strategies (After Failure)

// Circuit Breaker Strategy - reactive circuit breaker for post-failure recovery
await pipeline.SetSource(input)
    .AttachContext(context)
    .AttachPipeWithCircuitBreakerStrategy(new UnreliableServicePipe(), failureThreshold: 2, timeout: TimeSpan.FromMinutes(1))
    .Flush();

// Retry With Backoff Strategy - retry failed operations after failure occurs
await pipeline.SetSource(input)
    .AttachContext(context)
    .AttachPipeWithRetryStrategy(new NetworkOperationPipe(), maxAttempts: 3, initialDelay: TimeSpan.FromSeconds(1))
    .Flush();

// Global Recovery Strategy - apply recovery strategy to all pipes in the pipeline
await pipeline.SetSource(input)
    .AttachContext(context)
    .WithRecoveryStrategy(new RetryWithBackoffStrategy<InputData, OutputData>(maxAttempts: 2))
    .AttachPipe(new FlakyOperationPipe())
    .Flush();

Performance Metrics and Monitoring

await pipeline.SetSource(input)
    .AttachContext(context)
    .EnablePerformanceMetrics("my-correlation-id")
    .AttachPipe(new StepOnePipe())
    .AttachPipe(new StepTwoPipe())
    .Flush();

Console.WriteLine($"Total Duration: {context.PerformanceMetrics.TotalDurationMs}ms");

API Reference

PipelineBuilder<TIn, TOut>

The main entry point for building pipelines. Provides methods to attach pipes, set source data, and configure various pipeline behaviors including sequential, parallel, conditional, and sub-pipeline execution. Supports all error handling mechanisms and performance metrics collection.

PipelineContext<TIn, TOut>

Manages data flow between pipes, contains methods for resource sharing using a ConcurrentDictionary for thread-safe operations, error handling with detailed error tracking, and performance metrics collection. Provides methods for adding, getting, updating, and removing resources from the shared repository.

IPipe<TIn, TOut>

Interface that all pipe implementations must implement. Contains a Handle method that performs the pipe's operation and methods for declaring resource dependencies via GetRequiredResources() and GetProvidedResources().

SubPipeline<TIn, TOut>

Allows for creating reusable pipeline components that can be attached to parent pipelines, enabling composition and reusability of pipeline segments.

ICancellablePipe<TIn, TOut>

Specialized interface for pipes that need to handle cancellation more gracefully by implementing a HandleCancellation method to perform cleanup operations when cancellation is requested.

Configuration Options

  • PipeConfiguration<TIn, TOut>: Fine-grained configuration for individual pipes
  • ErrorHandlingOptions<TIn, TOut>: Global error handling configuration
  • PerformanceMetrics: Detailed performance tracking and metrics collection
  • IPipelineValidator<TIn, TOut>: Pipeline validation before execution

Best Practices

Pipeline Design

  • Keep pipes focused on a single responsibility to maintain testability and maintainability
  • Design pipes to be stateless and idempotent when possible to ensure predictable behavior
  • Use meaningful names for your pipe classes that clearly describe their function
  • Implement ICancellablePipe for long-running operations to allow proper resource cleanup

Error Handling

  • Use retry policies for transient failures (network timeouts, temporary service unavailability)
  • Use circuit breakers for unreliable external services to prevent cascading failures
  • Use fallback policies for graceful degradation when primary functionality is unavailable
  • Set context.ContinueOnFailure = true if you want to continue after an error
  • Use the PipelineErrors collection to access detailed error information
  • Implement proper logging within your pipes
  • Configure appropriate retry attempts and delays to avoid overwhelming failing systems
  • Use exception filtering to target specific error types for different handling strategies

Performance

  • Use parallel execution for independent operations to improve throughput
  • Implement cancellation token checks in long-running operations to allow early termination
  • Monitor pipeline metrics to identify bottlenecks and optimize performance
  • Enable performance metrics collection during development and in production environments
  • Consider using sub-pipelines for better organization and testability of complex pipelines

Resource Management

  • Use the context's resource repository for sharing data between pipes efficiently
  • Always use unique keys when adding resources to avoid conflicts in the shared repository
  • Clean up resources when no longer needed to prevent memory leaks
  • Use TryAddResource when you want to avoid exceptions if a resource already exists
  • Prefer resource sharing over complex data structures when pipes need to communicate

Validation and Testing

  • Implement custom validators for complex pipeline validation rules
  • Use GetRequiredResources and GetProvidedResources to enable automatic dependency validation
  • Test individual pipes in isolation as well as complete pipeline flows
  • Validate pipeline configuration before execution in production environments

Project Structure

InzPipeline/
├── .github/workflows/           # GitHub Actions workflows
├── Documents/                   # Documentation files
│   ├── Error Handling Guide.md  # Comprehensive guide on error handling
│   ├── CSProj PropertyGroup Tags Explanation.md  # Project configuration details
│   ├── InzPipeline Improvement Roadmap.md        # Future development plans
├── InzPipeline.sln             # Solution file
├── README.md                   # Main documentation
├── .gitignore                  # Git ignore patterns
├── InzPipeline.Core/           # Core pipeline implementation
│   ├── InzPipeline.Core.csproj # Project file with package details
│   ├── PipelineBuilder.cs      # Main pipeline orchestration
│   ├── PipelineContext.cs      # Pipeline context management
│   ├── SubPipeline.cs          # Sub-pipeline functionality
│   ├── Cancellation/           # Cancellation support
│   ├── Configuration/          # Pipeline configuration options
│   ├── Contracts/              # Interface definitions
│   ├── ErrorHandling/          # Error handling policies and strategies
│   ├── Models/                 # Data models used internally
│   ├── Steps/                  # Pipeline step implementations
│   ├── Validation/             # Pipeline validation components
│   └── LICENSE                 # License file
├── InzPipeline.Sample/         # Sample application demonstrating usage
│   ├── InzPipeline.Sample.csproj
│   ├── Program.cs              # Entry point
│   ├── DemoPipeline.cs         # Complete pipeline example
│   ├── Models/                 # Sample data models
│   ├── Pipes/                  # Sample pipe implementations
│   └── Utilities.cs            # Helper utilities
└── InzPipeline.Tests/          # Unit and integration tests

Performance Monitoring

InzPipeline provides comprehensive performance monitoring capabilities:

  • Correlation IDs: Track requests across distributed systems
  • Execution Time Tracking: Monitor individual pipe and overall pipeline execution times
  • Memory Metrics: Track memory usage before and after pipeline execution
  • Custom Metrics: Add your own metrics for business-specific monitoring
  • Detailed Summaries: Get comprehensive performance metrics summaries

Example Usage:

await pipeline.SetSource(input)
    .AttachContext(context)
    .EnablePerformanceMetrics("my-correlation-id")
    .AttachPipe(new StepOnePipe())
    .AttachPipe(new StepTwoPipe())
    .Flush();

// Get a summary of all metrics
Console.WriteLine(context.GetPerformanceMetricsSummary());

// Access specific metrics
Console.WriteLine($"Total Duration: {context.PerformanceMetrics.TotalDurationMs}ms");
Console.WriteLine($"Memory Increase: {context.PerformanceMetrics.MemoryMetrics.MemoryIncrease} bytes");

Testing and Quality Assurance

The InzPipeline library includes comprehensive test coverage to ensure reliability:

  • Unit Tests: Comprehensive coverage of individual components
  • Integration Tests: Tests for complex pipeline scenarios combining all features
  • Error Handling Tests: Validation of all error handling mechanisms
  • Cancellation Tests: Verification of proper cancellation behavior
  • Performance Tests: Benchmarks to ensure optimal performance
  • Validation Tests: Verification of pipeline validation functionality

Run all tests with:

dotnet test

For more detailed test output, use:

dotnet test --logger "console;verbosity=detailed"

To run tests in a specific test project:

dotnet test InzPipeline.Tests/InzPipeline.Tests.csproj

Contributing

Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.

Development Setup

  1. Create your feature branch: git checkout -b feature/AmazingFeature
  2. Make your changes
  3. Run tests: dotnet test
  4. Commit your changes: git commit -m 'Add some AmazingFeature'
  5. Push to the branch: git push origin feature/AmazingFeature
  6. Open a Pull Request

Code Standards

  • Follow C# coding conventions and .NET best practices
  • Write comprehensive unit tests for all new functionality
  • Update documentation as needed
  • Ensure all tests pass before submitting a pull request

Versioning

We use Semantic Versioning (SemVer) for versioning. For the versions available, see the tags on this repository.

Support & Feedback

If you have questions, issues, or suggestions about InzPipeline, please:

  1. Check the documentation in this repository
  2. Open an issue in the GitHub repository if you encounter bugs or have feature requests
  3. Create a pull request if you'd like to contribute improvements
  4. For detailed API documentation, see the code documentation comments

License

This project is licensed under the MIT License - see the LICENSE file for details.


InzPipeline - A powerful and flexible pipeline processing library for .NET applications

About

Small, slightly opinionated .NET library designed to simplify complex business logic. It helps you break down intricate processes into a sequence of simple steps using a clean, fluent API.

Resources

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages