using System; using System.Threading; using System.Threading.Tasks; using System.Collections.ObjectModel; using System.Collections.Concurrent; using System.Collections; using System.Collections.Generic; using System.Collections.Specialized; using System.Linq; namespace wasSharp.Collections.Specialized { /// /// The observable concurrent queue. /// /// /// The content type /// public sealed class ObservableConcurrentQueue : ConcurrentQueue, INotifyCollectionChanged { public event NotifyCollectionChangedEventHandler CollectionChanged; private new void Enqueue(T item) { EnqueueAsync(item).RunSynchronously(); } public Task EnqueueAsync(T item) => Task.Run(() => { base.Enqueue(item); OnCollectionChanged( new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Add, item)); }); private new bool TryDequeue(out T result) { result = DequeueAsync().Result; if (result.Equals(default(T))) return false; return true; } public Task DequeueAsync() => Task.Run(() => { if (base.IsEmpty || !base.TryDequeue(out T item)) return default(T); OnCollectionChanged( new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Remove, item)); return item; }); public async Task PeekAsync() => await Task.Run(() => { if (!base.TryPeek(out T item)) return default(T); return item; }); private new bool TryPeek(out T result) { result = PeekAsync().Result; if (result.Equals(default(T))) return false; return true; } private void OnCollectionChanged(NotifyCollectionChangedEventArgs args) { CollectionChanged?.Invoke(this, args); } public async Task Clear() { while (!base.IsEmpty) await DequeueAsync(); } } }