RSS

Tag Archives: TPL Data Flow

Implementing a Concurrent Data Flow in C#

It is very common to do parallel processing in modern day applications that do intensive amount of processing. In those tasks, you might need to use set of chained tasks that are interlinked each other such a way that you do processing in set of stages/phases. However, you cannot wait all the input raw data/initial steps if you don’t have input data to finish undergoing initial transformation/processing to start subsequent transformation/processing steps to proceed. Such processing pipelines could also be much faster to process since different stages might more likely to use different type of resources available in the execution environment.

work flow model

To implement to parallel pipeline workflow we can use Microsoft TPL data flow library which is available as a nuget package.
nuget TPL data flow package

This library provide ability to define blocks of code that can be interlinked to run concurrently as a workflow. Best way to figure this out is to see a sample code implementation given below which is a simulation of image download workflow.

Problem:
Sample Workflow
Imagine we have an application where we need to download images from external provider routinely with following steps. To make whole process faster we use need to run each steps concurrently.
1. Read download URL of an image from data source
2. Download images from remote source
3. After each image get downloaded we need to save it into disk and update database
Solution:
With TPL data flow library we can abstract these three steps into different TPL blocks and link each other to create a parallel processing mesh as shown below. (inside each task actual image downloading code omitted for clarity)


var cts = new CancellationTokenSource();
            Task.Run(() =>
            {
                if (ReadKey().KeyChar == 'c')
                    cts.Cancel();
            });

            var inputBlock = new BufferBlock<DownloadInput>(
            new DataflowBlockOptions
            {
                BoundedCapacity = 5,
                CancellationToken = cts.Token
            });

            var downloadBlock = new TransformBlock<DownloadInput, DownloadResult>(
            n =>
            {
                DownloadResult result = new DownloadResult();
                Console.WriteLine("Downloading {0} image on thread id {1}", n.DownloadUrl, Thread.CurrentThread.ManagedThreadId);
                //do the actual download here 
                Thread.Sleep(TimeSpan.FromMilliseconds(2000)); //image download simulation
                return result;
            },
            new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4,
                CancellationToken =
                cts.Token
            });

            var outputBlock = new ActionBlock<DownloadResult>(
            s =>
            {

                //do other stuff such as updating flags in database etc that image has been downloaded for maintenance processcess
                Thread.Sleep(TimeSpan.FromMilliseconds(200)); //simulation of other work
                Console.WriteLine("Saving image to database {0} on thread id {1}", s.DownloadUrl, Thread.CurrentThread.ManagedThreadId);
            },
            new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4,
                CancellationToken =
                cts.Token
            });

            inputBlock.LinkTo(downloadBlock, new DataflowLinkOptions
            {
                PropagateCompletion = true
            });

            downloadBlock.LinkTo(outputBlock, new DataflowLinkOptions
            { PropagateCompletion = true });

Finally, I used following code to test run the workflow.

try
            {
                Parallel.For(0, 20, new ParallelOptions
                {
                    MaxDegreeOfParallelism = 4,
                    CancellationToken =
                cts.Token
                }
                , i =>
                {
                   
                    var downloadInput = new DownloadInput();
                    downloadInput.DownloadUrl = string.Format("http://myimagesite.com/{0}", i);
                    Console.WriteLine("added {0} to source data on thread id {1}", downloadInput.DownloadUrl, Thread.CurrentThread.ManagedThreadId);
                    inputBlock.SendAsync(downloadInput).GetAwaiter().GetResult();
                });
                inputBlock.Complete();
                await outputBlock.Completion;
                Console.WriteLine("Press ENTER to exit.");
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("Operation has been canceled! Press ENTER to exit.");
            }
            Console.ReadLine();

I have tested this code in a console project so the out put with simulated work looked like below.
output

TPL data flow blocks also accepts cancellation tokens. This is extremely useful when you need to cancel entire workflow, otherwise might be difficult to manage the cancellation process.

Advertisements
 
Leave a comment

Posted by on April 24, 2017 in .NET, C#

 

Tags: , , , ,