使用.NET的 BlockingCollection<T>来包装一个
ConcurrentQueue<T>来实现golang的channel。
代码如下:
public class Channel<T> { private BlockingCollection<T> _buffer; public Channel() : this(1) { } public Channel(int size) { _buffer = new BlockingCollection<T>(new ConcurrentQueue<T>(), size); } public bool Send(T t) { try { _buffer.Add(t); } catch (InvalidOperationException) { // will be thrown when the collection gets closed return false; } return true; } public bool Receive(out T val) { try { val = _buffer.Take(); } catch (InvalidOperationException) { // will be thrown when the collection is empty and got closed val = default(T); return false; } return true; } public void Close() { _buffer.CompleteAdding(); } public IEnumerable<T> Range() { T val; while (Receive(out val)) { yield return val; } } }
测试程序
[TestCase] public void TestSPSC_Performance() { int numItems = 10000000; int numIterations = 10; var stopWatch = new Stopwatch(); stopWatch.Start(); for (int i = 0; i < numIterations; ++i) { var channel = new Channel<int>(100); var writer = Task.Factory.StartNew(() => { foreach (var num in Enumerable.Range(1, numItems)) { channel.Send(num); } channel.Close(); }); var reader = Task.Factory.StartNew<List<int>>(() => { var res = new List<int>(numItems); foreach (var num in channel.Range()) { res.Add(num); } return res; }); Task.WaitAll(writer, reader); } stopWatch.Stop(); var elapsedMs = stopWatch.Elapsed.TotalMilliseconds; Console.WriteLine("SPSC N = {0}: {1:.00}ms/iteration, {2:.00}ns/item (tx+rx)", numItems, elapsedMs / numIterations, elapsedMs * 1000.0 / numItems / numIterations); }
有疑问加站长微信联系(非本文作者)