public ParallelPipelineProcessor(int parallelismLevel = 3) { _stages = new List<IPipelineStage<WorkItem, WorkItem>> { new ValidationStage(), new TransformationStage(), new EnrichmentStage() }; _parallelismLevel = parallelismLevel; }
This feature demonstrates parallel processing, task coordination, and cancellation tokens. Complete Implementation using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace ParallelDataPipeline { // Custom data entity public class WorkItem { public int Id { get; set; } public string InputData { get; set; } public string ProcessedData { get; set; } public DateTime StartTime { get; set; } public DateTime EndTime { get; set; } public bool IsValid { get; set; } } net fx 4.0
// Stage 1: Data Validation public class ValidationStage : IPipelineStage<WorkItem, WorkItem> { public string StageName => "Validation"; { new ValidationStage()
// Parallel Pipeline Processor public class ParallelPipelineProcessor { private readonly List<IPipelineStage<WorkItem, WorkItem>> _stages; private readonly int _parallelismLevel; new EnrichmentStage() }
// Simulate enrichment (e.g., database lookup, API call) await Task.Delay(80, token); input.ProcessedData = $"[ENRICHED] {input.ProcessedData} - Length: {input.ProcessedData.Length}"; Console.WriteLine($"[{StageName}] Item {input.Id}: Enriched data"); return input; } }
// Stage 3: Data Enrichment public class EnrichmentStage : IPipelineStage<WorkItem, WorkItem> { public string StageName => "Enrichment";