Skip to content
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
22531fe
naive poison message handler
davidmrdavid Apr 12, 2024
748b279
incorporate feedback
davidmrdavid Apr 13, 2024
40a00dd
add suffix, change to terminated
davidmrdavid Apr 15, 2024
b1b7fba
more changes to get poison message handling working E2E. It's hackier…
davidmrdavid Apr 16, 2024
b1808a1
simplify implementation
davidmrdavid Apr 16, 2024
45d523b
remove commented out code
davidmrdavid Apr 16, 2024
82e3531
remove csproj changes
davidmrdavid Apr 16, 2024
adf4579
undo change in message manager deps
davidmrdavid Apr 16, 2024
d20bb7e
undo csproj changeS
davidmrdavid Apr 16, 2024
40baca0
add activity pmh as well
davidmrdavid Apr 16, 2024
f896364
make configurable
davidmrdavid Apr 16, 2024
cef1410
move poison message handler to superclass
davidmrdavid Apr 16, 2024
eeea159
remove unecessary imports
davidmrdavid Apr 16, 2024
961d64b
remove unecessary import
davidmrdavid Apr 16, 2024
5dfe896
simplify code a bit
davidmrdavid Apr 16, 2024
4a25c5b
remove unused variable
davidmrdavid Apr 16, 2024
8afbfc2
simplify and unify guidance
davidmrdavid Apr 16, 2024
9057bfd
improve guidance
davidmrdavid Apr 16, 2024
6866828
call out backend-specificness
davidmrdavid Apr 16, 2024
b0d739c
clean up PR
davidmrdavid Apr 16, 2024
71e0b36
clean up csproj
davidmrdavid Apr 16, 2024
5934076
indent csproj comment
davidmrdavid Apr 16, 2024
a94cc4e
remove unused import
davidmrdavid Apr 16, 2024
37dbac4
have valid table-naming scheme
davidmrdavid Apr 18, 2024
865aa20
add log
davidmrdavid Apr 18, 2024
57bb966
add comments
davidmrdavid Apr 18, 2024
6c3bb79
create valid serializable activity failure
davidmrdavid Apr 18, 2024
b15dbb5
handle de-serialization errors as well
davidmrdavid Jun 14, 2024
cbb8274
add version suffix
davidmrdavid Jun 25, 2024
2acadbe
resolve conflicts
davidmrdavid Jun 25, 2024
16f38f1
rev patch
davidmrdavid Jun 25, 2024
74dc0f7
add dtfx.core
davidmrdavid Jun 25, 2024
584cf8d
merge mixed deserializtion hotfix
davidmrdavid Jun 27, 2024
51978a0
add imports
davidmrdavid Jun 27, 2024
65c29c4
pass nullable analysis
davidmrdavid Jun 27, 2024
de7e46b
make hotfix always occur
davidmrdavid Jun 27, 2024
a8b24e5
move nullable analysis
davidmrdavid Jun 27, 2024
a746b1e
make hotfix conditional on setting
davidmrdavid Jun 27, 2024
d219ffa
match diffs
davidmrdavid Jun 28, 2024
b2e1f0c
make hotfix always run
davidmrdavid Jun 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
<MajorVersion>1</MajorVersion>
<MinorVersion>17</MinorVersion>
<PatchVersion>1</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<FileVersion>$(VersionPrefix).0</FileVersion>
<!-- FileVersionRevision is expected to be set by the CI. This is useful for distinguishing between multiple builds of the same version. -->
<VersionSuffix>pmh.2</VersionSuffix>
<!-- FileVersionRevision is expected to be set by the CI. This is useful for distinguishing between multiple builds of the same version. -->
<FileVersion Condition="'$(FileVersionRevision)' != ''">$(VersionPrefix).$(FileVersionRevision)</FileVersion>
<!-- The assembly version is only the major/minor pair, making it easier to do in-place upgrades -->
<AssemblyVersion>$(MajorVersion).$(MinorVersion).0.0</AssemblyVersion>
Expand Down
37 changes: 37 additions & 0 deletions src/DurableTask.AzureStorage/Messaging/ControlQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@ namespace DurableTask.AzureStorage.Messaging
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure;
using Azure.Data.Tables;
using DurableTask.AzureStorage.Monitoring;
using DurableTask.AzureStorage.Partitioning;
using DurableTask.AzureStorage.Storage;
using DurableTask.Core;
using Microsoft.WindowsAzure.Storage.Table;

class ControlQueue : TaskHubQueue, IDisposable
{
Expand All @@ -46,6 +50,36 @@ public ControlQueue(

protected override TimeSpan MessageVisibilityTimeout => this.settings.ControlQueueVisibilityTimeout;

private async Task HandleIfPoisonMessageAsync(MessageData messageData)
{
var isPoison = false;
var queueMessage = messageData.OriginalQueueMessage;

// if deuque count is large, just flag it as poison. Don't even deserialize it!
if (queueMessage.DequeueCount > 5) // TODO: make configurable
{
var poisonMessage = new DynamicTableEntity(queueMessage.Id, this.Name)
{
Properties =
{
["TheFullMessage"] = new EntityProperty(queueMessage.Message)
}
};

// add to poison table
var poisonMessagesTable = this.azureStorageClient.GetTableReference("PoisonMessagesTable");
await poisonMessagesTable.CreateIfNotExistsAsync();
await poisonMessagesTable.InsertAsync(poisonMessage);

// delete from queue so it doesn't get processed again.
await this.storageQueue.DeleteMessageAsync(queueMessage);

// since isPoison is `true`, we'll override the deserialized message w/ a suspend event
isPoison = true;
}
messageData.TaskMessage.Event.IsPoison = isPoison;
}

public async Task<IReadOnlyList<MessageData>> GetMessagesAsync(CancellationToken cancellationToken)
{
using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(this.releaseCancellationToken, cancellationToken))
Expand Down Expand Up @@ -108,6 +142,9 @@ await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage)
messageData = await this.messageManager.DeserializeQueueMessageAsync(
queueMessage,
this.storageQueue.Name);

await this.HandleIfPoisonMessageAsync(messageData);

}
catch (Exception e)
{
Expand Down
34 changes: 34 additions & 0 deletions src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace DurableTask.AzureStorage.Messaging
using System.Threading;
using System.Threading.Tasks;
using DurableTask.AzureStorage.Storage;
using Microsoft.WindowsAzure.Storage.Table;

class WorkItemQueue : TaskHubQueue
{
Expand All @@ -30,6 +31,37 @@ public WorkItemQueue(

protected override TimeSpan MessageVisibilityTimeout => this.settings.WorkItemQueueVisibilityTimeout;

private async Task HandleIfPoisonMessageAsync(MessageData messageData)
{
// TODO: put in superclass?
var isPoison = false;
var queueMessage = messageData.OriginalQueueMessage;

// if deuque count is large, just flag it as poison. Don't even deserialize it!
if (queueMessage.DequeueCount > 5) // TODO: make configurable
{
var poisonMessage = new DynamicTableEntity(queueMessage.Id, this.Name)
{
Properties =
{
["TheFullMessage"] = new EntityProperty(queueMessage.Message)
}
};

// add to poison table
var poisonMessagesTable = this.azureStorageClient.GetTableReference("PoisonMessagesTable");
await poisonMessagesTable.CreateIfNotExistsAsync();
await poisonMessagesTable.InsertAsync(poisonMessage);

// delete from queue so it doesn't get processed again.
await this.storageQueue.DeleteMessageAsync(queueMessage);

// since isPoison is `true`, we'll override the deserialized message w/ a suspend event
isPoison = true;
}
messageData.TaskMessage.Event.IsPoison = isPoison;
}

public async Task<MessageData> GetMessageAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
Expand All @@ -44,9 +76,11 @@ public async Task<MessageData> GetMessageAsync(CancellationToken cancellationTok
continue;
}

// TODO: maybe the message manager should handle the poison?
MessageData data = await this.messageManager.DeserializeQueueMessageAsync(
queueMessage,
this.storageQueue.Name);
await this.HandleIfPoisonMessageAsync(data);

this.backoffHelper.Reset();
return data;
Expand Down
1 change: 1 addition & 0 deletions src/DurableTask.Core/DurableTask.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
<MinorVersion>16</MinorVersion>
<PatchVersion>2</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix>pmh.2</VersionSuffix>
<FileVersion>$(VersionPrefix).0</FileVersion>
<!-- FileVersionRevision is expected to be set by the CI. This is useful for distinguishing between multiple builds of the same version. -->
<FileVersion Condition="'$(FileVersionRevision)' != ''">$(VersionPrefix).$(FileVersionRevision)</FileVersion>
Expand Down
5 changes: 5 additions & 0 deletions src/DurableTask.Core/History/HistoryEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,10 @@ protected HistoryEvent(int eventId)
/// Implementation for <see cref="IExtensibleDataObject.ExtensionData"/>.
/// </summary>
public ExtensionDataObject? ExtensionData { get; set; }

/// <summary>
/// Gets or sets whether this is a poison message.
/// </summary>
public bool IsPoison { get; set; } = false;
}
}
6 changes: 6 additions & 0 deletions src/DurableTask.Core/TaskActivityDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,12 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ =>

try
{
if (scheduledEvent.IsPoison)
{
var exception = new TaskFailureException("poison activity message detected!", details: "poison activity message detected!");
throw exception;
}

string? output = await taskActivity.RunAsync(context, scheduledEvent.Input);
responseEvent = new TaskCompletedEvent(-1, scheduledEvent.EventId, output);
}
Expand Down
11 changes: 11 additions & 0 deletions src/DurableTask.Core/TaskOrchestrationExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,17 @@ void ProcessEvents(IEnumerable<HistoryEvent> events)

void ProcessEvent(HistoryEvent historyEvent)
{
if (historyEvent.IsPoison)
{
var terminationEvent = new ExecutionTerminatedEvent(-1, "detected poison!");
historyEvent = terminationEvent;

var taskCompletionSource = new TaskCompletionSource<string>();
taskCompletionSource.SetResult("");

this.result = taskCompletionSource.Task;
}

bool overrideSuspension = historyEvent.EventType == EventType.ExecutionResumed || historyEvent.EventType == EventType.ExecutionTerminated;
if (this.context.IsSuspended && !overrideSuspension)
{
Expand Down