-
Notifications
You must be signed in to change notification settings - Fork 292
Cleanup MTP async data consumer #7423
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,14 +12,9 @@ internal sealed class AsyncConsumerDataProcessor : IAsyncConsumerDataProcessor | |
| { | ||
| private readonly ITask _task; | ||
| private readonly CancellationToken _cancellationToken; | ||
| private readonly SingleConsumerUnboundedChannel<(IDataProducer DataProducer, IData Data)> _channel = new(); | ||
|
|
||
| // This is needed to avoid possible race condition between drain and _totalPayloadProcessed race condition. | ||
| // This is the "logical" consume workflow state. | ||
| private readonly TaskCompletionSource<object> _consumerState = new(); | ||
| private readonly Task _consumeTask; | ||
| private long _totalPayloadReceived; | ||
| private long _totalPayloadProcessed; | ||
| private SingleConsumerUnboundedChannel<(IDataProducer DataProducer, IData Data)> _channel = new(); | ||
| private Task _consumeTask; | ||
|
|
||
| public AsyncConsumerDataProcessor(IDataConsumer dataConsumer, ITask task, CancellationToken cancellationToken) | ||
| { | ||
|
|
@@ -34,7 +29,6 @@ public AsyncConsumerDataProcessor(IDataConsumer dataConsumer, ITask task, Cancel | |
| public Task PublishAsync(IDataProducer dataProducer, IData data) | ||
| { | ||
| _cancellationToken.ThrowIfCancellationRequested(); | ||
| Interlocked.Increment(ref _totalPayloadReceived); | ||
| _channel.Write((dataProducer, data)); | ||
| return Task.CompletedTask; | ||
| } | ||
|
|
@@ -47,58 +41,22 @@ private async Task ConsumeAsync() | |
| { | ||
| while (_channel.TryRead(out (IDataProducer DataProducer, IData Data) item)) | ||
| { | ||
| try | ||
| // We don't enqueue the data if the consumer is the producer of the data. | ||
| // We could optimize this if and make a get with type/all but producers, but it | ||
| // could be over-engineering. | ||
| if (item.DataProducer.Uid == DataConsumer.Uid) | ||
| { | ||
| // We don't enqueue the data if the consumer is the producer of the data. | ||
| // We could optimize this if and make a get with type/all but producers, but it | ||
| // could be over-engineering. | ||
| if (item.DataProducer.Uid == DataConsumer.Uid) | ||
| { | ||
| continue; | ||
| } | ||
|
|
||
| try | ||
| { | ||
| await DataConsumer.ConsumeAsync(item.DataProducer, item.Data, _cancellationToken).ConfigureAwait(false); | ||
| } | ||
|
|
||
| // We let the catch below to handle the graceful cancellation of the process | ||
| catch (Exception ex) when (ex is not OperationCanceledException) | ||
| { | ||
| // If we're draining before to increment the _totalPayloadProcessed we need to signal that we should throw because | ||
| // it's possible we have a race condition where the payload check at line 106 return false and the current task is not yet in a | ||
| // "faulted state". | ||
| _consumerState.SetException(ex); | ||
|
|
||
| // We let current task to move to fault state, checked inside CompleteAddingAsync. | ||
| throw; | ||
| } | ||
| } | ||
| finally | ||
| { | ||
| Interlocked.Increment(ref _totalPayloadProcessed); | ||
| continue; | ||
| } | ||
|
|
||
| await DataConsumer.ConsumeAsync(item.DataProducer, item.Data, _cancellationToken).ConfigureAwait(false); | ||
| } | ||
| } | ||
| } | ||
| catch (OperationCanceledException oc) when (oc.CancellationToken == _cancellationToken) | ||
| { | ||
| // Ignore we're shutting down | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| // For all other exception we signal the state if not already faulted | ||
| if (!_consumerState.Task.IsFaulted) | ||
| { | ||
| _consumerState.SetException(ex); | ||
| } | ||
|
|
||
| // let the exception bubble up | ||
| throw; | ||
| } | ||
|
|
||
| // We're exiting gracefully, signal the correct state. | ||
| _consumerState.SetResult(new object()); | ||
| } | ||
|
|
||
| public async Task CompleteAddingAsync() | ||
|
|
@@ -111,43 +69,13 @@ public async Task CompleteAddingAsync() | |
| await _consumeTask.ConfigureAwait(false); | ||
| } | ||
|
|
||
| public async Task<long> DrainDataAsync() | ||
| public async Task DrainDataAsync() | ||
| { | ||
| // We go volatile because we race with Interlocked.Increment in PublishAsync | ||
| long totalPayloadProcessed = Volatile.Read(ref _totalPayloadProcessed); | ||
| long totalPayloadReceived = Volatile.Read(ref _totalPayloadReceived); | ||
| const int minDelayTimeMs = 25; | ||
| int currentDelayTimeMs = minDelayTimeMs; | ||
| while (Interlocked.CompareExchange(ref _totalPayloadReceived, totalPayloadReceived, totalPayloadProcessed) != totalPayloadProcessed) | ||
| { | ||
| // When we cancel we throw inside ConsumeAsync and we won't drain anymore any data | ||
| if (_cancellationToken.IsCancellationRequested) | ||
| { | ||
| break; | ||
| } | ||
|
|
||
| await _task.Delay(currentDelayTimeMs).ConfigureAwait(false); | ||
| currentDelayTimeMs = Math.Min(currentDelayTimeMs + minDelayTimeMs, 200); | ||
|
|
||
| if (_consumerState.Task.IsFaulted) | ||
| { | ||
| // Rethrow the exception | ||
| await _consumerState.Task.ConfigureAwait(false); | ||
| } | ||
|
|
||
| // Wait for the consumer to complete the current enqueued items | ||
| totalPayloadProcessed = Volatile.Read(ref _totalPayloadProcessed); | ||
| totalPayloadReceived = Volatile.Read(ref _totalPayloadReceived); | ||
| } | ||
|
|
||
| // It' possible that we fail and we have consumed the item | ||
| if (_consumerState.Task.IsFaulted) | ||
| { | ||
| // Rethrow the exception | ||
| await _consumerState.Task.ConfigureAwait(false); | ||
| } | ||
| _channel.Complete(); | ||
| await _consumeTask.ConfigureAwait(false); | ||
|
|
||
| return _totalPayloadReceived; | ||
| _channel = new(); | ||
| _consumeTask = _task.Run(ConsumeAsync, _cancellationToken); | ||
| } | ||
|
Comment on lines
+72
to
79
|
||
|
|
||
| public void Dispose() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical race condition: When DrainDataAsync completes the channel writer (line 72) and before creating a new channel (line 75), any concurrent calls to PublishAsync will throw ChannelClosedException when trying to write to the completed channel. This is problematic because DrainDataAsync is called at multiple synchronization points during normal execution (see CommonTestHost.cs lines 223, 229, 245, 249), not just during shutdown. The old implementation avoided this by not completing the channel during drain. Consider using a lock or other synchronization mechanism to atomically swap the old channel with a new one, or ensure no publishing can occur during drain.