Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 41 additions & 5 deletions cmd/workflow/purge.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,21 @@ package workflow

import (
"errors"
"slices"
"strings"

"github.com/dapr/cli/pkg/workflow"
"github.com/dapr/kit/signals"
"github.com/spf13/cobra"
)

var (
flagPurgeOlderThan string
flagPurgeAll bool
flagPurgeConn *connFlag
flagPurgeForce bool
schedulerNamespace string
flagPurgeOlderThan string
flagPurgeAll bool
flagPurgeConn *connFlag
flagPurgeForce bool
flagPurgeFilterStatus string
schedulerNamespace string
)

var PurgeCmd = &cobra.Command{
Expand All @@ -41,6 +44,9 @@ var PurgeCmd = &cobra.Command{
return errors.New("no arguments are accepted when using purge all flags")
}
default:
if cmd.Flags().Changed("all-filter-status") {
return errors.New("--all-filter-status can only be used with --all-older-than")
}
if len(args) == 0 {
return errors.New("one or more workflow instance ID arguments are required when not using purge all flags")
}
Expand Down Expand Up @@ -75,14 +81,44 @@ var PurgeCmd = &cobra.Command{
}
}

if cmd.Flags().Changed("all-filter-status") {
opts.AllFilterStatus = &flagPurgeFilterStatus
}

return workflow.Purge(ctx, opts)
},
}

var purgeFilterStatuses = []string{
"RUNNING",
"COMPLETED",
"CONTINUED_AS_NEW",
"FAILED",
"CANCELED",
"TERMINATED",
"PENDING",
"SUSPENDED",
}

func init() {
PurgeCmd.Flags().StringVar(&flagPurgeOlderThan, "all-older-than", "", "Purge workflow instances older than the specified Go duration or timestamp, e.g., '24h' or '2023-01-02T15:04:05Z'.")
PurgeCmd.Flags().BoolVar(&flagPurgeAll, "all", false, "Purge all workflow instances in a terminal state. Use with caution.")
PurgeCmd.Flags().StringVar(&flagPurgeFilterStatus, "all-filter-status", "", "Filter purge to only workflow instances with the given runtime status. Must be used with --all-older-than. One of "+strings.Join(purgeFilterStatuses, ", "))
PurgeCmd.MarkFlagsMutuallyExclusive("all-older-than", "all")
PurgeCmd.MarkFlagsMutuallyExclusive("all-filter-status", "all")

pre := PurgeCmd.PreRunE
PurgeCmd.PreRunE = func(cmd *cobra.Command, args []string) error {
if cmd.Flags().Changed("all-filter-status") {
if !slices.Contains(purgeFilterStatuses, flagPurgeFilterStatus) {
return errors.New("invalid value for --all-filter-status. Supported values are " + strings.Join(purgeFilterStatuses, ", "))
}
}
if pre != nil {
return pre(cmd, args)
}
return nil
}
PurgeCmd.Flags().BoolVar(&flagPurgeForce, "force", false, "force will force a purge of a workflow, regardless of its current runtime state, or whether an active worker can process it, the backend will attempt to delete it anyway. This necessarily means the purging is executed out side of the workflow state machine, and therefore, can lead to corrupt state or broken workflow execution. Usage of this should _only_ be used when you know the workflow is not being currently processed. It is highly recommended to avoid using this flag unless absolutely necessary.")

PurgeCmd.Flags().StringVar(&schedulerNamespace, "scheduler-namespace", "dapr-system", "Kubernetes namespace where the scheduler is deployed, only relevant if --kubernetes is set")
Expand Down
58 changes: 58 additions & 0 deletions cmd/workflow/purge_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package workflow

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestPurgeFilterStatuses(t *testing.T) {
expected := []string{
"RUNNING",
"COMPLETED",
"CONTINUED_AS_NEW",
"FAILED",
"CANCELED",
"TERMINATED",
"PENDING",
"SUSPENDED",
}
assert.Equal(t, expected, purgeFilterStatuses)
}

func TestPurgeCmdFlags(t *testing.T) {
t.Run("all-filter-status flag is registered", func(t *testing.T) {
f := PurgeCmd.Flags().Lookup("all-filter-status")
assert.NotNil(t, f)
assert.Equal(t, "string", f.Value.Type())
assert.Contains(t, f.Usage, "Must be used with --all-older-than")
})

t.Run("all-filter-status and all are mutually exclusive", func(t *testing.T) {
// The mutual exclusivity is registered via MarkFlagsMutuallyExclusive.
// We verify the flag group exists by checking that the command
// has both flags and that they are correctly configured.
allFlag := PurgeCmd.Flags().Lookup("all")
assert.NotNil(t, allFlag)
filterFlag := PurgeCmd.Flags().Lookup("all-filter-status")
assert.NotNil(t, filterFlag)
})

t.Run("all-older-than flag is registered", func(t *testing.T) {
f := PurgeCmd.Flags().Lookup("all-older-than")
assert.NotNil(t, f)
})
}
13 changes: 10 additions & 3 deletions pkg/workflow/purge.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type PurgeOptions struct {
AppID string
InstanceIDs []string
AllOlderThan *time.Time
AllFilterStatus *string
All bool
Force bool

Expand All @@ -45,6 +46,14 @@ func Purge(ctx context.Context, opts PurgeOptions) error {
if len(opts.InstanceIDs) > 0 {
toPurge = opts.InstanceIDs
} else {
filter := Filter{
Terminal: true,
}
if opts.AllFilterStatus != nil {
filter.Terminal = false
filter.Status = opts.AllFilterStatus
}

var list []*ListOutputWide
var err error
list, err = ListWide(ctx, ListOptions{
Expand All @@ -53,9 +62,7 @@ func Purge(ctx context.Context, opts PurgeOptions) error {
AppID: opts.AppID,
ConnectionString: opts.ConnectionString,
TableName: opts.TableName,
Filter: Filter{
Terminal: true,
},
Filter: filter,
})
if err != nil {
return err
Expand Down
124 changes: 124 additions & 0 deletions pkg/workflow/purge_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package workflow

import (
"testing"
"time"

"github.com/dapr/kit/ptr"
"github.com/stretchr/testify/assert"
)

func TestPurgeOptions_AllFilterStatus(t *testing.T) {
t.Run("AllFilterStatus sets filter status instead of terminal", func(t *testing.T) {
opts := PurgeOptions{
AllOlderThan: ptr.Of(time.Now()),
AllFilterStatus: ptr.Of("COMPLETED"),
}

assert.NotNil(t, opts.AllFilterStatus)
assert.Equal(t, "COMPLETED", *opts.AllFilterStatus)
assert.NotNil(t, opts.AllOlderThan)
})

t.Run("nil AllFilterStatus defaults to terminal filtering", func(t *testing.T) {
opts := PurgeOptions{
AllOlderThan: ptr.Of(time.Now()),
}

assert.Nil(t, opts.AllFilterStatus)
})

t.Run("AllFilterStatus with various statuses", func(t *testing.T) {
statuses := []string{
"RUNNING", "COMPLETED", "CONTINUED_AS_NEW",
"FAILED", "CANCELED", "TERMINATED",
"PENDING", "SUSPENDED",
}

for _, status := range statuses {
t.Run(status, func(t *testing.T) {
opts := PurgeOptions{
AllOlderThan: ptr.Of(time.Now()),
AllFilterStatus: ptr.Of(status),
}
assert.Equal(t, status, *opts.AllFilterStatus)
})
}
})
}

func TestPurgeFilterBuildLogic(t *testing.T) {
// Tests the filter construction logic that Purge uses internally.
// When AllFilterStatus is set, Terminal should be false and Status should
// be the provided value. When AllFilterStatus is nil, Terminal should be true.

t.Run("without AllFilterStatus uses terminal filter", func(t *testing.T) {
opts := PurgeOptions{
All: true,
}

filter := Filter{Terminal: true}
if opts.AllFilterStatus != nil {
filter.Terminal = false
filter.Status = opts.AllFilterStatus
}

assert.True(t, filter.Terminal)
assert.Nil(t, filter.Status)
})

t.Run("with AllFilterStatus uses status filter", func(t *testing.T) {
opts := PurgeOptions{
AllOlderThan: ptr.Of(time.Now()),
AllFilterStatus: ptr.Of("FAILED"),
}

filter := Filter{Terminal: true}
if opts.AllFilterStatus != nil {
filter.Terminal = false
filter.Status = opts.AllFilterStatus
}

assert.False(t, filter.Terminal)
assert.NotNil(t, filter.Status)
assert.Equal(t, "FAILED", *filter.Status)
})

t.Run("AllOlderThan filters by created time", func(t *testing.T) {
now := time.Now()
cutoff := now.Add(-1 * time.Hour)
opts := PurgeOptions{
AllOlderThan: &cutoff,
AllFilterStatus: ptr.Of("COMPLETED"),
}

// Simulate the filtering logic from Purge
list := []*ListOutputWide{
{InstanceID: "old-1", Created: now.Add(-2 * time.Hour), RuntimeStatus: "COMPLETED"},
{InstanceID: "new-1", Created: now.Add(-30 * time.Minute), RuntimeStatus: "COMPLETED"},
{InstanceID: "old-2", Created: now.Add(-3 * time.Hour), RuntimeStatus: "COMPLETED"},
}

var toPurge []string
for _, w := range list {
if w.Created.Before(*opts.AllOlderThan) {
toPurge = append(toPurge, w.InstanceID)
}
}

assert.Equal(t, []string{"old-1", "old-2"}, toPurge)
})
}
87 changes: 87 additions & 0 deletions tests/e2e/standalone/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,93 @@ func TestWorkflowPurge(t *testing.T) {
assert.NotContains(t, output, "purge-older")
})

t.Run("purge older than with filter status only purges matching status", func(t *testing.T) {
// Create one workflow that will complete (SimpleWorkflow) and one that
// will be terminated (LongWorkflow) so they have different statuses.
output, err := cmdWorkflowRun(appID, "SimpleWorkflow",
"--instance-id=filter-completed")
require.NoError(t, err, output)

output, err = cmdWorkflowRun(appID, "LongWorkflow",
"--instance-id=filter-terminated")
require.NoError(t, err, output)

time.Sleep(3 * time.Second)

// Terminate one so we have two different terminal statuses.
_, err = cmdWorkflowTerminate(appID, "filter-terminated")
require.NoError(t, err)

time.Sleep(5 * time.Second)

// Purge only COMPLETED instances older than 1s.
output, err = cmdWorkflowPurge(appID, redisConnString,
"--all-older-than", "1s", "--all-filter-status", "COMPLETED")
require.NoError(t, err, output)
assert.Contains(t, output, `Purged workflow instance "filter-completed"`)
assert.NotContains(t, output, "filter-terminated")

// Verify filter-terminated still exists.
output, err = cmdWorkflowList(appID, "-o", "json", redisConnString)
require.NoError(t, err, output)
assert.NotContains(t, output, "filter-completed")
assert.Contains(t, output, "filter-terminated")

// Clean up the remaining instance.
_, _ = cmdWorkflowPurge(appID, "filter-terminated")
})

t.Run("purge older than with filter status TERMINATED", func(t *testing.T) {
output, err := cmdWorkflowRun(appID, "SimpleWorkflow",
"--instance-id=fs-completed")
require.NoError(t, err, output)

output, err = cmdWorkflowRun(appID, "LongWorkflow",
"--instance-id=fs-terminated")
require.NoError(t, err, output)

time.Sleep(3 * time.Second)

_, err = cmdWorkflowTerminate(appID, "fs-terminated")
require.NoError(t, err)

time.Sleep(5 * time.Second)

// Purge only TERMINATED instances older than 1s.
output, err = cmdWorkflowPurge(appID, redisConnString,
"--all-older-than", "1s", "--all-filter-status", "TERMINATED")
require.NoError(t, err, output)
assert.Contains(t, output, `Purged workflow instance "fs-terminated"`)
assert.NotContains(t, output, "fs-completed")

// Verify fs-completed still exists.
output, err = cmdWorkflowList(appID, "-o", "json", redisConnString)
require.NoError(t, err, output)
assert.Contains(t, output, "fs-completed")
assert.NotContains(t, output, "fs-terminated")

// Clean up.
_, _ = cmdWorkflowPurge(appID, "fs-completed")
})

t.Run("all-filter-status without all-older-than errors", func(t *testing.T) {
_, err := cmdWorkflowPurge(appID, redisConnString,
"--all-filter-status", "COMPLETED")
require.Error(t, err)
})

t.Run("all-filter-status with invalid value errors", func(t *testing.T) {
_, err := cmdWorkflowPurge(appID, redisConnString,
"--all-older-than", "1s", "--all-filter-status", "INVALID")
require.Error(t, err)
})

t.Run("all-filter-status with all flag errors", func(t *testing.T) {
_, err := cmdWorkflowPurge(appID, redisConnString,
"--all", "--all-filter-status", "COMPLETED")
require.Error(t, err)
})

t.Run("also purge scheduler", func(t *testing.T) {
output, err := cmdWorkflowRun(appID, "EventWorkflow",
"--instance-id=also-sched")
Expand Down
Loading