There are components named as “Data Flow Components” in .NET 4.5 which enables the capability of in process message passing for coarse grained and pipelining tasks.
Data Flow components are used to build the channel or pipes, through which messages are passed to process further.
Below are few default data flow components introduced and a sample code follows.
1) Action Block
2) Buffer Block
3) Transform Block
4) TransformManyBlock
5) BatchBlock
6) JoinBlock
7) BatchedJoinBlock
8) WriteOnceBlock
9) BroadCast Block
//action block
var actionBlock = new ActionBlock<int>((a) =>
{
Console.WriteLine("Message recieved : {0}", a);
});
IObserver<Int32> observer = actionBlock.AsObserver();
observer.OnNext(10000);
Task<bool> wait = actionBlock.SendAsync(100);
actionBlock.Post(10);
//buffer block
var bufferBlock = new BufferBlock<int>();
IObservable<int> bObservable = bufferBlock.AsObservable<int>();
IObserver<int> bObserver = bufferBlock.AsObserver<int>();
//linking currnet buffer to an another buffer block
var bufferBlock2 = new BufferBlock<int>();
bufferBlock.LinkTo(bufferBlock2, new DataflowLinkOptions() { Append = true });
bObservable.Subscribe(bObserver);
bObserver.OnNext(20);
bufferBlock.Post<int>(200);
int a1 = 0;
bool b1 = bufferBlock.TryReceive(out a1);
bool b2 = bufferBlock.TryReceive(out a1);
bool b11 = bufferBlock2.TryReceive(out a1);
bool b21 = bufferBlock2.TryReceive(out a1);
//broad cast block
var broadCast = new BroadcastBlock<int>((a) => a + 10);
//linking current broadcast block to a buffer block
broadCast.LinkTo(bufferBlock2);
broadCast.Post(10);
broadCast.Post(20);
Int32 bd1 = broadCast.Receive<int>();
broadCast.Post(30);
broadCast.Post(40);
bd1 = broadCast.Receive<int>();
//write once block
var writeOnceBlock = new WriteOnceBlock<int>((a) => a + 100);
writeOnceBlock.Post(400);
Int32 wr1 = writeOnceBlock.Receive();
writeOnceBlock.Post(200);
wr1 = writeOnceBlock.Receive();
writeOnceBlock.Post(500);
wr1 = writeOnceBlock.Receive();
//transform block
var transformBlock = new TransformBlock<float, float>((a) => a);
transformBlock.Post(180f);
float f1 = transformBlock.Receive();
transformBlock.Post(280f);
float f2 = transformBlock.Receive();
//transform many block
var transformmanyBlock = new TransformManyBlock<int, int>(a => Enumerable.Range(0, a).ToList());
transformmanyBlock.Post(100);
Int32 b3 = 0;
while (transformmanyBlock.TryReceive(out b3))
{
Console.WriteLine(b3);
}
//batch block
var batchBlock = new BatchBlock<int>(1000);
Enumerable.Repeat<int>(5, 100)
.ToList()
.ForEach(i => batchBlock.Post(i));
batchBlock.Post(1);
Task<int[]> batchData = batchBlock.ReceiveAsync();
//join block
var joinBlock = new JoinBlock<int, int, int>();
joinBlock.Target1.Post(1);
joinBlock.Target2.Post(2);
joinBlock.Target3.Post(3);
joinBlock.Target1.Post(4);
joinBlock.Target2.Post(5);
joinBlock.Target3.Post(6);
Tuple<int, int, int> tup1 = joinBlock.Receive();
//batched join block
var bacthedJoinBlock = new BatchedJoinBlock<int, int, int>(2);
bacthedJoinBlock.Target1.Post(1);
bacthedJoinBlock.Target2.Post(2);
bacthedJoinBlock.Target3.Post(3);
bacthedJoinBlock.Target1.Post(4);
bacthedJoinBlock.Target2.Post(5);
bacthedJoinBlock.Target3.Post(6);
Tuple<IList<Int32>, IList<Int32>, IList<Int32>> tup2 = bacthedJoinBlock.Receive();
Beside the provided default data flow blocks, custom data blocks can also be implemented.
No comments:
Post a Comment