I came across Channel class while working with SignalR which looks really interesting. By looking into NuGet packages (https://www.nuget.org/packages/System.Threading.Channels), it seems just 4 months old.
The Channel class provides infrastructure to have multiple reads and write simuletensely through it's Reader and Writer properties.
This is where it is handy in case of SignalR where data streaming needs to be done but is not just limited to that but wherever something needs to be read/write/combination of both in a multi-threading environment.
In my case with SignalR, I had to stream stock data at a regular interval of time.
The SignalR keeps return type of ChannelReader<StockData> open so that whatever written in Channel it would be transmitted data through ChannelReader.
The Channel.CreateUnbounded<StockData> creates an instance it also has overridden function to pass allowed number of items which can be stored in the channel which helps out to avoid a lot of buffer usage.
The Reader property of Channel is returned which does not die in SignalR, and whenever anything is written in Channel it is sent to SignalR consumers.
For Writer I am using delegate definition from OnStockData property, whenever the delegate is called from producer class I am calling channel.Writer.TryWrite to pass newly available data.
I have done a little experiment on a console application. Give it a try and check it out. Happy Coding!
The Channel class provides infrastructure to have multiple reads and write simuletensely through it's Reader and Writer properties.
This is where it is handy in case of SignalR where data streaming needs to be done but is not just limited to that but wherever something needs to be read/write/combination of both in a multi-threading environment.
In my case with SignalR, I had to stream stock data at a regular interval of time.
public ChannelReader<StockData> StreamStock()
{
var channel = Channel.CreateUnbounded<StockData>();
_stockManager.OnStockData = stockData =>
{
channel.Writer.TryWrite(stockData);
};
return channel.Reader;
}
The SignalR keeps return type of ChannelReader<StockData> open so that whatever written in Channel it would be transmitted data through ChannelReader.
The Channel.CreateUnbounded<StockData> creates an instance it also has overridden function to pass allowed number of items which can be stored in the channel which helps out to avoid a lot of buffer usage.
The Reader property of Channel is returned which does not die in SignalR, and whenever anything is written in Channel it is sent to SignalR consumers.
For Writer I am using delegate definition from OnStockData property, whenever the delegate is called from producer class I am calling channel.Writer.TryWrite to pass newly available data.
I have done a little experiment on a console application. Give it a try and check it out. Happy Coding!
public static class Example
{
public static async Task<ChannelReader<string>> RunExample()
{
const int maxMessagesToBuffer = 2;
Channel<string> channel = Channel.CreateBounded<string>(maxMessagesToBuffer);
var reader = channel.Reader;
var writer = channel.Writer;
var worker1 = Task.Run(() => ChannelReader(reader,"R1"));
var worker2 = Task.Run(() => ChannelReader(reader, "R2"));
var worker3 = Task.Run(() => ChannelReader(reader, "R3"));
var worker4 = Task.Run(() => ChannelReader(reader, "R4"));
await Task.WhenAll(new List<Task> {
Task.Run(() => WriteValue(writer, "W1")),
Task.Run(() => WriteValue(writer, "W2")),
Task.Run(() => WriteValue(writer, "W3")),
Task.Run(() => WriteValue(writer, "W4")),
Task.Run(() => WriteValue(writer, "W5")),
});
return reader;
}
static int index = 0;
static SemaphoreSlim @lock = new SemaphoreSlim(1, 1);
private async static Task WriteValue(ChannelWriter<string> writer, string writerId)
{
while (true)
{
try
{
await @lock.WaitAsync();
if (index < 50)
{
index++;
await writer.WriteAsync($"(index : {index}, Writer: {writerId})");
}
else
{
Console.WriteLine($"Writer completed: {writerId}");
break;
}
}
finally
{
@lock.Release();
}
}
}
private static async Task ChannelReader(ChannelReader<string> reader, string readerId)
{
while (await reader.WaitToReadAsync())
{
while (reader.TryRead(out var message))
{
Console.WriteLine($"Listener: message : {message}, index dirty read : {index}, Reader: {readerId}");
await Task.Delay(250);
}
}
}
}
public static async Task Main(string[] args)
{
await Example.RunExample();
}
Comments
Post a Comment