diff --git a/cmd/run_unix.go b/cmd/run_unix.go index 6dcd68358..aed416d48 100644 --- a/cmd/run_unix.go +++ b/cmd/run_unix.go @@ -74,6 +74,13 @@ func killProcessGroup(process *os.Process) error { if errors.Is(err, syscall.ESRCH) { return nil // process group gone } + // EPERM can occur on macOS when the process group is in a + // dying/zombie state. Keep polling rather than returning an + // error so we can fall through to SIGKILL if needed. + if errors.Is(err, syscall.EPERM) { + time.Sleep(100 * time.Millisecond) + continue + } return fmt.Errorf("failed to check status of process group %d: %w", pgid, err) } // Grace period elapsed — force kill. diff --git a/pkg/version/version_test.go b/pkg/version/version_test.go index e49a71148..7ce2b6bac 100644 --- a/pkg/version/version_test.go +++ b/pkg/version/version_test.go @@ -14,12 +14,15 @@ limitations under the License. package version import ( + "context" "fmt" + "net" "net/http" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestGetVersionsGithub(t *testing.T) { @@ -209,7 +212,7 @@ func TestGetVersionsGithub(t *testing.T) { }, } m := http.NewServeMux() - s := http.Server{Addr: ":12345", Handler: m, ReadHeaderTimeout: time.Duration(5) * time.Second} + s := http.Server{Handler: m, ReadHeaderTimeout: time.Duration(5) * time.Second} for _, tc := range tests { body := tc.ResponseBody @@ -218,13 +221,19 @@ func TestGetVersionsGithub(t *testing.T) { }) } - go func() { - s.ListenAndServe() - }() + ln, err := net.Listen("tcp", ":0") + require.NoError(t, err) + t.Cleanup(func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + s.Shutdown(ctx) + }) + go func() { s.Serve(ln) }() + addr := fmt.Sprintf("http://localhost:%d", ln.Addr().(*net.TCPAddr).Port) for _, tc := range tests { t.Run(tc.Name, func(t *testing.T) { - version, err := GetLatestReleaseGithub("http://localhost:12345" + tc.Path) + version, err := GetLatestReleaseGithub(addr + tc.Path) assert.Equal(t, tc.ExpectedVer, version) if tc.ExpectedErr != "" { assert.EqualError(t, err, tc.ExpectedErr) @@ -233,9 +242,9 @@ func TestGetVersionsGithub(t *testing.T) { } t.Run("error on 404", func(t *testing.T) { - version, err := GetLatestReleaseGithub("http://localhost:12345/non-existant/path") + version, err := GetLatestReleaseGithub(addr + "/non-existant/path") assert.Equal(t, "", version) - assert.EqualError(t, err, "http://localhost:12345/non-existant/path - 404 Not Found") + assert.EqualError(t, err, addr+"/non-existant/path - 404 Not Found") }) t.Run("error on bad addr", func(t *testing.T) { @@ -243,8 +252,6 @@ func TestGetVersionsGithub(t *testing.T) { assert.Equal(t, "", version) assert.Error(t, err) }) - - s.Shutdown(t.Context()) } func TestGetVersionsHelm(t *testing.T) { @@ -318,7 +325,7 @@ entries: }, } m := http.NewServeMux() - s := http.Server{Addr: ":12346", Handler: m, ReadHeaderTimeout: time.Duration(5) * time.Second} + s := http.Server{Handler: m, ReadHeaderTimeout: time.Duration(5) * time.Second} for _, tc := range tests { body := tc.ResponseBody @@ -327,19 +334,23 @@ entries: }) } - go func() { - s.ListenAndServe() - }() + ln, err := net.Listen("tcp", ":0") + require.NoError(t, err) + t.Cleanup(func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + s.Shutdown(ctx) + }) + go func() { s.Serve(ln) }() + addr := fmt.Sprintf("http://localhost:%d", ln.Addr().(*net.TCPAddr).Port) for _, tc := range tests { t.Run(tc.Name, func(t *testing.T) { - version, err := GetLatestReleaseHelmChart("http://localhost:12346" + tc.Path) + version, err := GetLatestReleaseHelmChart(addr + tc.Path) assert.Equal(t, tc.ExpectedVer, version) if tc.ExpectedErr != "" { assert.EqualError(t, err, tc.ExpectedErr) } }) } - - s.Shutdown(t.Context()) } diff --git a/tests/e2e/spawn/spawn.go b/tests/e2e/spawn/spawn.go index 68af6e68a..de0c55d07 100644 --- a/tests/e2e/spawn/spawn.go +++ b/tests/e2e/spawn/spawn.go @@ -17,6 +17,7 @@ import ( "bufio" "context" "os/exec" + "time" ) // CommandWithContext runs a command with its arguments in background. @@ -75,8 +76,14 @@ func Command(command string, arguments ...string) (string, error) { // CommandExecWithContext runs a command with its arguments, kills the command after context is done // and returns the combined stdout, stderr or the error. +// +// WaitDelay is set so that if child processes (e.g. the compiled binary spawned +// by `go run`) outlive the main process and keep its stdout/stderr pipes open, +// CombinedOutput will still return after the delay instead of blocking forever. +// This is critical on macOS where zombie process groups can hold pipes open. func CommandExecWithContext(ctx context.Context, command string, arguments ...string) (string, error) { cmd := exec.CommandContext(ctx, command, arguments...) + cmd.WaitDelay = 10 * time.Second b, err := cmd.CombinedOutput() return string(b), err } diff --git a/tests/e2e/standalone/init_negative_test.go b/tests/e2e/standalone/init_negative_test.go index adfa6549d..9fe056852 100644 --- a/tests/e2e/standalone/init_negative_test.go +++ b/tests/e2e/standalone/init_negative_test.go @@ -27,6 +27,10 @@ import ( func TestStandaloneInitNegatives(t *testing.T) { // Ensure a clean environment must(t, cmdUninstall, "failed to uninstall Dapr") + // Reinstall Dapr when done so subsequent tests still work. + t.Cleanup(func() { + ensureDaprInstallation(t) + }) homeDir, err := os.UserHomeDir() require.NoError(t, err, "expected no error on querying for os home dir") diff --git a/tests/e2e/standalone/init_run_custom_path_test.go b/tests/e2e/standalone/init_run_custom_path_test.go index 37a358e4a..4c862ee2b 100644 --- a/tests/e2e/standalone/init_run_custom_path_test.go +++ b/tests/e2e/standalone/init_run_custom_path_test.go @@ -39,6 +39,10 @@ func echoTestAppArgs() []string { func TestStandaloneInitRunUninstallNonDefaultDaprPath(t *testing.T) { // Ensure a clean environment must(t, cmdUninstall, "failed to uninstall Dapr") + // Reinstall Dapr when done so subsequent tests still work. + t.Cleanup(func() { + ensureDaprInstallation(t) + }) t.Run("run with --runtime-path flag", func(t *testing.T) { daprPath, err := os.MkdirTemp("", "dapr-e2e-run-with-flag-*") assert.NoError(t, err) diff --git a/tests/e2e/standalone/init_test.go b/tests/e2e/standalone/init_test.go index 9f4a81e81..9b0bdf49a 100644 --- a/tests/e2e/standalone/init_test.go +++ b/tests/e2e/standalone/init_test.go @@ -41,8 +41,9 @@ func TestStandaloneInit(t *testing.T) { daprRuntimeVersion, daprDashboardVersion := common.GetVersionsFromEnv(t, false) t.Cleanup(func() { - // remove dapr installation after all tests in this function. - must(t, cmdUninstall, "failed to uninstall Dapr") + // Reinstall Dapr so subsequent tests still have a working installation. + cmdUninstall() + ensureDaprInstallation(t) }) t.Run("init with invalid private registry", func(t *testing.T) { diff --git a/tests/e2e/standalone/invoke_test.go b/tests/e2e/standalone/invoke_test.go index c21ec212b..b8a25695a 100644 --- a/tests/e2e/standalone/invoke_test.go +++ b/tests/e2e/standalone/invoke_test.go @@ -57,7 +57,6 @@ func StartTestService(t *testing.T, port int) common.Service { func TestStandaloneInvoke(t *testing.T) { port := 9987 - ensureDaprInstallation(t) s := StartTestService(t, port) defer s.Stop() @@ -117,7 +116,6 @@ func TestStandaloneInvoke(t *testing.T) { func TestStandaloneInvokeWithAppChannel(t *testing.T) { port := 9988 - ensureDaprInstallation(t) s := StartTestService(t, port) defer s.Stop() diff --git a/tests/e2e/standalone/list_test.go b/tests/e2e/standalone/list_test.go index cb9116955..471eee0eb 100644 --- a/tests/e2e/standalone/list_test.go +++ b/tests/e2e/standalone/list_test.go @@ -31,7 +31,6 @@ import ( ) func TestStandaloneList(t *testing.T) { - ensureDaprInstallation(t) // Use a long-running app so we can test list and stop. Windows has no bash, so use cmd. runArgs := []string{"run", "--app-id", "dapr_e2e_list", "-H", "3555", "-G", "4555", "--"} if runtime.GOOS == "windows" { @@ -86,9 +85,13 @@ func TestStandaloneList(t *testing.T) { cmd := exec.Command(daprdPath, "--app-id", "daprd_e2e_list", "--dapr-http-port", "3555", "--dapr-grpc-port", "4555", "--app-port", "0") cmd.Start() - output, err := cmdList("") + // Wait for daprd to register and appear in the list. + var output string + require.Eventually(t, func() bool { + output, err = cmdList("") + return err == nil && !strings.Contains(output, "No Dapr instances found") + }, 30*time.Second, time.Second, "daprd instance did not appear in list") t.Log(output) - require.NoError(t, err, "dapr list failed with daprd instance") listOutputCheck(t, output, false) // TODO: remove this condition when `dapr stop` starts working for Windows. diff --git a/tests/e2e/standalone/main_test.go b/tests/e2e/standalone/main_test.go new file mode 100644 index 000000000..89f2f4019 --- /dev/null +++ b/tests/e2e/standalone/main_test.go @@ -0,0 +1,108 @@ +//go:build e2e || template + +/* +Copyright 2026 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package standalone_test + +import ( + "fmt" + "net" + "os" + "path/filepath" + "testing" + "time" +) + +// TestMain installs Dapr once for the entire test binary, removing the +// need for every test to call cmdUninstall/ensureDaprInstallation. +// Tests that need to test the install/uninstall lifecycle itself must +// reinstall Dapr in their t.Cleanup so subsequent tests still work. +func TestMain(m *testing.M) { + // Start from a clean slate. + cmdUninstall() + + if err := installDapr(); err != nil { + fmt.Fprintf(os.Stderr, "TestMain: failed to install Dapr: %v\n", err) + os.Exit(1) + } + + code := m.Run() + + cmdUninstall() + os.Exit(code) +} + +// installDapr performs a Dapr init for the test binary. This mirrors +// ensureDaprInstallation but does not require a *testing.T. +func installDapr() error { + daprRuntimeVersion, ok := os.LookupEnv("DAPR_RUNTIME_PINNED_VERSION") + if !ok { + return fmt.Errorf("env var DAPR_RUNTIME_PINNED_VERSION not set") + } + daprDashboardVersion, ok := os.LookupEnv("DAPR_DASHBOARD_PINNED_VERSION") + if !ok { + return fmt.Errorf("env var DAPR_DASHBOARD_PINNED_VERSION not set") + } + + if !isSlimMode() { + if err := waitForPortsFreeDirect(60*time.Second, 58080, 58081, 50005); err != nil { + return fmt.Errorf("waiting for container ports: %w", err) + } + } + + args := []string{ + "--runtime-version", daprRuntimeVersion, + "--dashboard-version", daprDashboardVersion, + } + output, err := cmdInit(args...) + if err != nil { + return fmt.Errorf("dapr init: %s: %w", output, err) + } + + if isSlimMode() { + homeDir, err := os.UserHomeDir() + if err != nil { + return fmt.Errorf("getting home dir: %w", err) + } + if err := createSlimComponents(filepath.Join(homeDir, ".dapr", "components")); err != nil { + return fmt.Errorf("creating slim components: %w", err) + } + } + + return nil +} + +// waitForPortsFreeDirect is a non-test variant of waitForPortsFree for +// use in TestMain where *testing.T is not available. +func waitForPortsFreeDirect(timeout time.Duration, ports ...int) error { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + allFree := true + for _, port := range ports { + ln, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port)) + if err != nil { + allFree = false + break + } + ln.Close() + } + if allFree { + return nil + } + time.Sleep(time.Second) + } + return fmt.Errorf("ports %v not free within %v", ports, timeout) +} diff --git a/tests/e2e/standalone/publish_test.go b/tests/e2e/standalone/publish_test.go index 07d80c408..f9b86e39d 100644 --- a/tests/e2e/standalone/publish_test.go +++ b/tests/e2e/standalone/publish_test.go @@ -28,7 +28,6 @@ import ( ) func TestStandalonePublish(t *testing.T) { - ensureDaprInstallation(t) sub := &common.Subscription{ PubsubName: "pubsub", Topic: "sample", diff --git a/tests/e2e/standalone/run_template_test.go b/tests/e2e/standalone/run_template_test.go index 429fbaa36..46cace55c 100644 --- a/tests/e2e/standalone/run_template_test.go +++ b/tests/e2e/standalone/run_template_test.go @@ -18,8 +18,11 @@ limitations under the License. package standalone_test import ( + "context" + "encoding/json" "fmt" "io/ioutil" + "net/http" "path/filepath" "strings" "testing" @@ -29,6 +32,132 @@ import ( "github.com/stretchr/testify/require" ) +var httpClient = &http.Client{Timeout: 500 * time.Millisecond} + +// waitForDaprHealth polls the Dapr HTTP healthz endpoints until all +// sidecars report healthy. This confirms both the sidecar and its app +// are running, independent of log output timing. +func waitForDaprHealth(t *testing.T, timeout time.Duration, httpPorts ...int) { + t.Helper() + require.Eventually(t, func() bool { + for _, port := range httpPorts { + resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/v1.0/healthz", port)) + if err != nil { + return false + } + resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return false + } + } + return true + }, timeout, 500*time.Millisecond, "dapr sidecars on ports %v not healthy within %v", httpPorts, timeout) +} + +// waitForAppHealthy polls dapr list to discover the HTTP port for the +// given appID, then health-checks it. Use this when the HTTP port is +// auto-assigned and not known in advance. +func waitForAppHealthy(t *testing.T, timeout time.Duration, appID string) { + t.Helper() + require.Eventually(t, func() bool { + output, err := cmdList("json") + if err != nil { + return false + } + var result []map[string]interface{} + if err := json.Unmarshal([]byte(output), &result); err != nil { + return false + } + for _, entry := range result { + if entry["appId"] != appID { + continue + } + httpPort, _ := entry["httpPort"].(float64) + if httpPort <= 0 { + return false + } + resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/v1.0/healthz", int(httpPort))) + if err != nil { + return false + } + resp.Body.Close() + return resp.StatusCode >= 200 && resp.StatusCode < 300 + } + return false + }, timeout, time.Second, "dapr app %q not healthy within %v", appID, timeout) +} + +// waitForAppsListed polls dapr list until all given appIDs are present with +// a non-zero HTTP port. Unlike waitForDaprHealth this does NOT check the +// healthz endpoint, so it works in slim mode where placement/scheduler are +// absent. It guarantees that daprd is up, listening, and has stored metadata — +// which is the prerequisite for `dapr stop -f` to locate the CLI process. +func waitForAppsListed(t *testing.T, timeout time.Duration, appIDs ...string) { + t.Helper() + require.Eventually(t, func() bool { + output, err := cmdList("json") + if err != nil { + return false + } + var result []map[string]interface{} + if err := json.Unmarshal([]byte(output), &result); err != nil { + return false + } + found := 0 + for _, id := range appIDs { + for _, entry := range result { + if entry["appId"] == id { + httpPort, _ := entry["httpPort"].(float64) + if httpPort > 0 { + found++ + break + } + } + } + } + return found == len(appIDs) + }, timeout, time.Second, "dapr apps %v not listed within %v", appIDs, timeout) +} + +// waitForLogContent polls until the log file matching partialFileName in +// dirPath contains the expected substring. This is used to wait for slow +// app startup (e.g. `go run` compilation) before proceeding with the test. +func waitForLogContent(t *testing.T, dirPath, partialFileName, expected string, timeout time.Duration) { + t.Helper() + require.Eventually(t, func() bool { + fileName, err := lookUpFileFullName(dirPath, partialFileName) + if err != nil { + return false + } + contents, err := ioutil.ReadFile(filepath.Join(dirPath, fileName)) + if err != nil { + return false + } + return strings.Contains(string(contents), expected) + }, timeout, time.Second, "log file matching %q in %s did not contain %q within %v", partialFileName, dirPath, expected, timeout) +} + +// collectOutput waits for the CLI process output from outputCh. If the +// output does not arrive within timeout, the context is canceled (which +// SIGKILL's the CLI via exec.CommandContext) and we wait a further 20s +// for WaitDelay to close pipes and CombinedOutput to return. +func collectOutput(t *testing.T, outputCh <-chan string, cancel context.CancelFunc, timeout time.Duration) string { + t.Helper() + select { + case output := <-outputCh: + return output + case <-time.After(timeout): + cancel() + select { + case output := <-outputCh: + return output + case <-time.After(20 * time.Second): + t.Fatal("timed out waiting for run command to finish") + return "" + } + } +} + type AppTestOutput struct { appID string appLogContents []string @@ -40,49 +169,45 @@ type AppTestOutput struct { } func TestRunWithTemplateFile(t *testing.T) { - cmdUninstall() cleanUpLogs() - ensureDaprInstallation(t) - t.Cleanup(func() { - // remove dapr installation after all tests in this function. - must(t, cmdUninstall, "failed to uninstall Dapr") - }) // These tests are dependent on run template files in ../testdata/run-template-files folder. t.Run("invalid template file wrong emit metrics app run", func(t *testing.T) { runFilePath := "../testdata/run-template-files/wrong_emit_metrics_app_dapr.yaml" + ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) + defer cancel() t.Cleanup(func() { - // assumption in the test is that there is only one set of app and daprd logs in the logs directory. + cmdStopWithRunTemplate(runFilePath) + cmdStopWithAppID("processor") + cmdStopWithAppID("emit-metrics") cleanUpLogs() }) args := []string{ "-f", runFilePath, } - outputCh := make(chan string) + waitForPortsFree(t, 3510, 3511) + outputCh := make(chan string, 1) go func() { - output, _ := cmdRun("", args...) + output, _ := cmdRunWithContext(ctx, "", args...) t.Logf("%s", output) outputCh <- output }() - time.Sleep(time.Second * 10) + // Wait for the emit-metrics app to fail (wrong file name). The app + // log gets written quickly since `go run wrongappname.go` fails + // immediately. Then send stop so the CLI shuts down gracefully. + waitForLogContent(t, "../../apps/emit-metrics/.dapr/logs", "app", "exit status 1", 60*time.Second) cmdStopWithRunTemplate(runFilePath) - var output string - select { - case output = <-outputCh: - case <-time.After(25 * time.Second): - t.Fatal("timed out waiting for run command to finish") - } - - // Deterministic output for template file, so we can assert line by line - lines := strings.Split(output, "\n") - assert.GreaterOrEqual(t, len(lines), 4, "expected at least 4 lines in output of starting two apps") - assert.Contains(t, lines[1], "Started Dapr with app id \"processor\". HTTP Port: 3510.") - assert.Contains(t, lines[2], "Writing log files to directory") - assert.Contains(t, lines[2], "tests/apps/processor/.dapr/logs") - assert.Contains(t, lines[4], "Started Dapr with app id \"emit-metrics\". HTTP Port: 3511.") - assert.Contains(t, lines[5], "Writing log files to directory") - assert.Contains(t, lines[5], "tests/apps/emit-metrics/.dapr/logs") + // Give the CLI time to gracefully shut down. The CLI must process + // the SIGTERM from stop, then kill daprd/app processes (up to 5s + // grace period each). 60s is generous. + output := collectOutput(t, outputCh, cancel, 60*time.Second) + + assert.Contains(t, output, "Started Dapr with app id \"processor\". HTTP Port: 3510.") + assert.Contains(t, output, "Writing log files to directory") + assert.Contains(t, output, "tests/apps/processor/.dapr/logs") + assert.Contains(t, output, "Started Dapr with app id \"emit-metrics\". HTTP Port: 3511.") + assert.Contains(t, output, "tests/apps/emit-metrics/.dapr/logs") assert.Contains(t, output, "Received signal to stop Dapr and app processes. Shutting down Dapr and app processes.") appTestOutput := AppTestOutput{ appID: "processor", @@ -110,36 +235,38 @@ func TestRunWithTemplateFile(t *testing.T) { }) t.Run("valid template file", func(t *testing.T) { - cmdUninstall() - ensureDaprInstallation(t) + if isSlimMode() { + t.Skip("skipping: slim mode has no placement/scheduler so daprd cannot become healthy") + } runFilePath := "../testdata/run-template-files/dapr.yaml" + ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) + defer cancel() t.Cleanup(func() { - // assumption in the test is that there is only one set of app and daprd logs in the logs directory. + cmdStopWithRunTemplate(runFilePath) + cmdStopWithAppID("processor") + cmdStopWithAppID("emit-metrics") cleanUpLogs() }) args := []string{ "-f", runFilePath, } - outputCh := make(chan string) + waitForPortsFree(t, 3510, 3511) + outputCh := make(chan string, 1) go func() { - output, _ := cmdRun("", args...) + output, _ := cmdRunWithContext(ctx, "", args...) t.Logf("%s", output) outputCh <- output }() - time.Sleep(time.Second * 10) + waitForDaprHealth(t, 60*time.Second, 3510, 3511) + waitForLogContent(t, "../../apps/emit-metrics/.dapr/logs", "app", "Metrics with ID 1 sent", 60*time.Second) cmdStopWithRunTemplate(runFilePath) - var output string - select { - case output = <-outputCh: - case <-time.After(time.Second * 10): - t.Fatal("timed out waiting for run command to finish") - } + output := collectOutput(t, outputCh, cancel, 60*time.Second) // Deterministic output for template file, so we can assert line by line lines := strings.Split(output, "\n") - assert.GreaterOrEqual(t, len(lines), 6, "expected at least 6 lines in output of starting two apps") + require.GreaterOrEqual(t, len(lines), 6, "expected at least 6 lines in output of starting two apps") assert.Contains(t, lines[0], "Validating config and starting app \"processor\"") assert.Contains(t, lines[1], "Started Dapr with app id \"processor\". HTTP Port: 3510.") assert.Contains(t, lines[2], "Writing log files to directory") @@ -181,40 +308,39 @@ func TestRunWithTemplateFile(t *testing.T) { t.Run("invalid template file env var not set", func(t *testing.T) { runFilePath := "../testdata/run-template-files/env_var_not_set_dapr.yaml" - cmdUninstall() - ensureDaprInstallation(t) + ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) + defer cancel() t.Cleanup(func() { - // assumption in the test is that there is only one set of app and daprd logs in the logs directory. + cmdStopWithRunTemplate(runFilePath) + cmdStopWithAppID("processor") + cmdStopWithAppID("emit-metrics") cleanUpLogs() }) args := []string{ "-f", runFilePath, } - outputCh := make(chan string) + waitForPortsFree(t, 3510, 3511) + outputCh := make(chan string, 1) go func() { - output, _ := cmdRun("", args...) + output, _ := cmdRunWithContext(ctx, "", args...) t.Logf("%s", output) outputCh <- output }() - time.Sleep(time.Second * 10) + // The emit-metrics app must compile (go run) and then fail because + // the env var is not set. This can be slow on CI (downloading deps, + // compiling). Wait for the app log to confirm the app has failed + // before sending stop — otherwise stop kills the app before it can + // produce the expected error output. + waitForLogContent(t, "../../apps/emit-metrics/.dapr/logs", "app", "exit status 1", 90*time.Second) cmdStopWithRunTemplate(runFilePath) - var output string - select { - case output = <-outputCh: - case <-time.After(25 * time.Second): - t.Fatal("timed out waiting for run command to finish") - } + output := collectOutput(t, outputCh, cancel, 60*time.Second) - // Deterministic output for template file, so we can assert line by line - lines := strings.Split(output, "\n") - assert.GreaterOrEqual(t, len(lines), 6, "expected at least 6 lines in output of starting two apps") - assert.Contains(t, lines[1], "Started Dapr with app id \"processor\". HTTP Port: 3510.") - assert.Contains(t, lines[2], "Writing log files to directory") - assert.Contains(t, lines[2], "tests/apps/processor/.dapr/logs") - assert.Contains(t, lines[4], "Started Dapr with app id \"emit-metrics\". HTTP Port: 3511.") - assert.Contains(t, lines[5], "Writing log files to directory") - assert.Contains(t, lines[5], "tests/apps/emit-metrics/.dapr/logs") + assert.Contains(t, output, "Started Dapr with app id \"processor\". HTTP Port: 3510.") + assert.Contains(t, output, "Writing log files to directory") + assert.Contains(t, output, "tests/apps/processor/.dapr/logs") + assert.Contains(t, output, "Started Dapr with app id \"emit-metrics\". HTTP Port: 3511.") + assert.Contains(t, output, "tests/apps/emit-metrics/.dapr/logs") assert.Contains(t, output, "Received signal to stop Dapr and app processes. Shutting down Dapr and app processes.") appTestOutput := AppTestOutput{ appID: "processor", @@ -243,42 +369,47 @@ func TestRunWithTemplateFile(t *testing.T) { }) t.Run("valid template file no app command", func(t *testing.T) { - cmdUninstall() - ensureDaprInstallation(t) + if isSlimMode() { + t.Skip("skipping: slim mode has no placement/scheduler so daprd cannot become healthy") + } runFilePath := "../testdata/run-template-files/no_app_command.yaml" + // The CLI performs daprd health checks (IsDaprListeningOnPort) for + // apps with appPort=0. Each check can take up to 60s. With two + // ports (HTTP + gRPC) per app, the total startup can take >120s. + ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) + defer cancel() t.Cleanup(func() { - // assumption in the test is that there is only one set of app and daprd logs in the logs directory. + cmdStopWithRunTemplate(runFilePath) + cmdStopWithAppID("processor") + cmdStopWithAppID("emit-metrics") cleanUpLogs() }) args := []string{ "-f", runFilePath, } - outputCh := make(chan string) + waitForPortsFree(t, 3510, 3511) + outputCh := make(chan string, 1) go func() { - output, _ := cmdRun("", args...) + output, _ := cmdRunWithContext(ctx, "", args...) t.Logf("%s", output) outputCh <- output }() - time.Sleep(time.Second * 10) + // Wait for emit-metrics to be fully healthy before stopping. + // NOTE: Do NOT use waitForAppsListed here — it detects the + // daprd process BEFORE the CLI finishes health checks, causing + // a race where stop is sent too early. waitForAppHealthy also + // checks the healthz endpoint, confirming the sidecar is ready. + waitForAppHealthy(t, 180*time.Second, "emit-metrics") cmdStopWithRunTemplate(runFilePath) - var output string - select { - case output = <-outputCh: - case <-time.After(25 * time.Second): - t.Fatal("timed out waiting for run command to finish") - } - - // Deterministic output for template file, so we can assert line by line - lines := strings.Split(output, "\n") - assert.GreaterOrEqual(t, len(lines), 7, "expected at least 7 lines in output of starting two apps with one app not having a command") - assert.Contains(t, lines[1], "Started Dapr with app id \"processor\". HTTP Port: 3510.") - assert.Contains(t, lines[2], "Writing log files to directory") - assert.Contains(t, lines[2], "tests/apps/processor/.dapr/logs") - assert.Contains(t, lines[4], "No application command found for app \"emit-metrics\" present in") - assert.Contains(t, lines[5], "Started Dapr with app id \"emit-metrics\". HTTP Port: 3511.") - assert.Contains(t, lines[6], "Writing log files to directory") - assert.Contains(t, lines[6], "tests/apps/emit-metrics/.dapr/logs") + output := collectOutput(t, outputCh, cancel, 60*time.Second) + + assert.Contains(t, output, "Started Dapr with app id \"processor\". HTTP Port: 3510.") + assert.Contains(t, output, "Writing log files to directory") + assert.Contains(t, output, "tests/apps/processor/.dapr/logs") + assert.Contains(t, output, "No application command found for app \"emit-metrics\" present in") + assert.Contains(t, output, "Started Dapr with app id \"emit-metrics\". HTTP Port: 3511.") + assert.Contains(t, output, "tests/apps/emit-metrics/.dapr/logs") assert.Contains(t, output, "Received signal to stop Dapr and app processes. Shutting down Dapr and app processes.") appTestOutput := AppTestOutput{ appID: "processor", @@ -301,45 +432,49 @@ func TestRunWithTemplateFile(t *testing.T) { "termination signal received: shutting down", "Exited Dapr successfully", }, - daprdLogPollTimeout: 20 * time.Second, + daprdLogPollTimeout: 60 * time.Second, } assertLogOutputForRunTemplateExec(t, appTestOutput) }) t.Run("valid template file empty app command", func(t *testing.T) { - cmdUninstall() - ensureDaprInstallation(t) + if isSlimMode() { + t.Skip("skipping: slim mode has no placement/scheduler so daprd cannot become healthy") + } runFilePath := "../testdata/run-template-files/empty_app_command.yaml" + // The CLI starts daprd for emit-metrics, runs health checks (up + // to 60s each for HTTP and gRPC ports since appPort=0), detects + // the empty command, kills daprd, and exits with error. The whole + // process can take >120s on slow runners. + ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) + defer cancel() t.Cleanup(func() { - // assumption in the test is that there is only one set of app and daprd logs in the logs directory. + cmdStopWithRunTemplate(runFilePath) + cmdStopWithAppID("processor") + cmdStopWithAppID("emit-metrics") cleanUpLogs() }) args := []string{ "-f", runFilePath, } - outputCh := make(chan string) + waitForPortsFree(t, 3510, 3511) + outputCh := make(chan string, 1) go func() { - output, _ := cmdRun("", args...) + output, _ := cmdRunWithContext(ctx, "", args...) t.Logf("%s", output) outputCh <- output }() - time.Sleep(time.Second * 10) - cmdStopWithRunTemplate(runFilePath) - var output string - select { - case output = <-outputCh: - case <-time.After(25 * time.Second): - t.Fatal("timed out waiting for run command to finish") - } - - // Deterministic output for template file, so we can assert line by line - lines := strings.Split(output, "\n") - assert.GreaterOrEqual(t, len(lines), 5, "expected at least 5 lines in output of starting two apps with last app having an empty command") - assert.Contains(t, lines[1], "Started Dapr with app id \"processor\". HTTP Port: 3510.") - assert.Contains(t, lines[2], "Writing log files to directory") - assert.Contains(t, lines[2], "tests/apps/processor/.dapr/logs") - assert.Contains(t, lines[4], "Error starting Dapr and app (\"emit-metrics\"): exec: no command") + // The CLI exits on its own after detecting the empty command + // (exitWithError=true). Do NOT send cmdStopWithRunTemplate here: + // the SIGTERM would sit in sigCh unread while the CLI is blocked + // in daprd health checks. Just wait for the CLI to finish. + output := collectOutput(t, outputCh, cancel, 180*time.Second) + + assert.Contains(t, output, "Started Dapr with app id \"processor\". HTTP Port: 3510.") + assert.Contains(t, output, "Writing log files to directory") + assert.Contains(t, output, "tests/apps/processor/.dapr/logs") + assert.Contains(t, output, "Error starting Dapr and app (\"emit-metrics\"): exec: no command") appTestOutput := AppTestOutput{ appID: "processor", baseLogDirPath: "../../apps/processor/.dapr/logs", @@ -367,31 +502,33 @@ func TestRunWithTemplateFile(t *testing.T) { }) t.Run("valid template file with app/daprd log destinations", func(t *testing.T) { - cmdUninstall() - ensureDaprInstallation(t) + if isSlimMode() { + t.Skip("skipping: slim mode has no placement/scheduler so daprd cannot become healthy") + } runFilePath := "../testdata/run-template-files/app_output_to_file_and_console.yaml" + ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) + defer cancel() t.Cleanup(func() { - // assumption in the test is that there is only one set of app and daprd logs in the logs directory. + cmdStopWithRunTemplate(runFilePath) + cmdStopWithAppID("processor") + cmdStopWithAppID("emit-metrics") cleanUpLogs() }) args := []string{ "-f", runFilePath, } - outputCh := make(chan string) + waitForPortsFree(t, 3510, 3511) + outputCh := make(chan string, 1) go func() { - output, _ := cmdRun("", args...) + output, _ := cmdRunWithContext(ctx, "", args...) t.Logf("%s", output) outputCh <- output }() - time.Sleep(time.Second * 10) + waitForDaprHealth(t, 60*time.Second, 3510, 3511) + waitForLogContent(t, "../../apps/emit-metrics/.dapr/logs", "app", "Metrics with ID 1 sent", 60*time.Second) cmdStopWithRunTemplate(runFilePath) - var output string - select { - case output = <-outputCh: - case <-time.After(25 * time.Second): - t.Fatal("timed out waiting for run command to finish") - } + output := collectOutput(t, outputCh, cancel, 60*time.Second) // App logs for processor app should not be printed to console and only written to file. assert.NotContains(t, output, "== APP - processor") @@ -437,13 +574,18 @@ func TestRunWithTemplateFile(t *testing.T) { } func TestRunTemplateFileWithoutDaprInit(t *testing.T) { - // remove any dapr installation before this test. + // Remove dapr installation so we can test running without init. must(t, cmdUninstall, "failed to uninstall Dapr") + // Reinstall Dapr when done so subsequent tests still work. + t.Cleanup(func() { + ensureDaprInstallation(t) + }) t.Run("valid template file without dapr init", func(t *testing.T) { t.Cleanup(func() { // assumption in the test is that there is only one set of app and daprd logs in the logs directory. cleanUpLogs() }) + waitForPortsFree(t, 3510, 3511) args := []string{ "-f", "../testdata/run-template-files/no_app_command.yaml", } diff --git a/tests/e2e/standalone/run_test.go b/tests/e2e/standalone/run_test.go index eb1beb875..a8f88dc66 100644 --- a/tests/e2e/standalone/run_test.go +++ b/tests/e2e/standalone/run_test.go @@ -29,8 +29,6 @@ import ( ) func TestStandaloneRun(t *testing.T) { - ensureDaprInstallation(t) - ctx, cancelFunc := context.WithCancel(context.Background()) defer cancelFunc() @@ -46,9 +44,6 @@ func TestStandaloneRun(t *testing.T) { } } t.Cleanup(func() { - // remove dapr installation after all tests in this function. - must(t, cmdUninstall, "failed to uninstall Dapr") - // Call cancelFunc to stop the processes cancelFunc() }) for _, path := range getSocketCases() { diff --git a/tests/e2e/standalone/scheduler_test.go b/tests/e2e/standalone/scheduler_test.go index d1755fdea..44b74b5b8 100644 --- a/tests/e2e/standalone/scheduler_test.go +++ b/tests/e2e/standalone/scheduler_test.go @@ -31,40 +31,52 @@ import ( "gopkg.in/yaml.v2" ) +// countSchedulerEntries parses the tabular output from `dapr scheduler list` +// and returns the number of data rows (skipping the header and empty lines). +// This avoids hard-coding total line counts that break when the output format +// changes (e.g. extra trailing newlines or header adjustments). +func countSchedulerEntries(output string) int { + count := 0 + for i, line := range strings.Split(output, "\n") { + if i == 0 { // skip header + continue + } + if strings.TrimSpace(line) != "" { + count++ + } + } + return count +} + func TestSchedulerList(t *testing.T) { if isSlimMode() { t.Skip("skipping scheduler tests in slim mode") } + // Reinstall Dapr to get a fresh scheduler container. Without this, + // stale workflow registrations from previous tests cause + // wf.StartWorker to hang when reconnecting with the same types/IDs. cmdUninstall() ensureDaprInstallation(t) runFilePath := "../testdata/run-template-files/test-scheduler.yaml" - t.Cleanup(func() { - cmdStopWithRunTemplate(runFilePath) - must(t, cmdUninstall, "failed to uninstall Dapr") - }) - - args := []string{"-f", runFilePath} - go func() { - o, err := cmdRun("", args...) - t.Log(o) - t.Log(err) - }() + startDaprRunRetry(t, []int{3510}, func() { cmdStopWithRunTemplate(runFilePath) }, "-f", runFilePath) + // On slow CI runners, the first dapr run attempt may fail to register + // workflows (only jobs + reminders appear). startDaprRunRetry retries + // in the background, but the retry can take 30-40s. Use 120s to + // accommodate the retry delay. require.EventuallyWithT(t, func(c *assert.CollectT) { output, err := cmdSchedulerList() require.NoError(t, err) - assert.Len(c, strings.Split(output, "\n"), 10) - }, time.Second*30, time.Millisecond*10) - - time.Sleep(time.Second * 3) + assert.GreaterOrEqual(c, countSchedulerEntries(output), 8) + }, 240*time.Second, time.Second) t.Run("short", func(t *testing.T) { output, err := cmdSchedulerList() require.NoError(t, err) lines := strings.Split(output, "\n") - require.Len(t, lines, 10) + require.Equal(t, 8, countSchedulerEntries(output)) require.Equal(t, []string{ "NAME", @@ -148,7 +160,7 @@ func TestSchedulerList(t *testing.T) { output, err := cmdSchedulerList("-o", "wide") require.NoError(t, err) lines := strings.Split(output, "\n") - require.Len(t, lines, 10) + require.Equal(t, 8, countSchedulerEntries(output)) require.Equal(t, []string{ "NAMESPACE", @@ -187,27 +199,27 @@ func TestSchedulerList(t *testing.T) { t.Run("filter", func(t *testing.T) { output, err := cmdSchedulerList("-n", "foo") require.NoError(t, err) - assert.Len(t, strings.Split(output, "\n"), 2) + assert.Equal(t, 0, countSchedulerEntries(output)) output, err = cmdSchedulerList("--filter", "all") require.NoError(t, err) - assert.Len(t, strings.Split(output, "\n"), 10) + assert.Equal(t, 8, countSchedulerEntries(output)) output, err = cmdSchedulerList("--filter", "app") require.NoError(t, err) - assert.Len(t, strings.Split(output, "\n"), 4) + assert.Equal(t, 2, countSchedulerEntries(output)) output, err = cmdSchedulerList("--filter", "actor") require.NoError(t, err) - assert.Len(t, strings.Split(output, "\n"), 4) + assert.Equal(t, 2, countSchedulerEntries(output)) output, err = cmdSchedulerList("--filter", "workflow") require.NoError(t, err) - assert.Len(t, strings.Split(output, "\n"), 4) + assert.Equal(t, 2, countSchedulerEntries(output)) output, err = cmdSchedulerList("--filter", "activity") require.NoError(t, err) - assert.Len(t, strings.Split(output, "\n"), 4) + assert.Equal(t, 2, countSchedulerEntries(output)) }) } @@ -220,18 +232,7 @@ func TestSchedulerGet(t *testing.T) { ensureDaprInstallation(t) runFilePath := "../testdata/run-template-files/test-scheduler.yaml" - t.Cleanup(func() { - cmdStopWithRunTemplate(runFilePath) - must(t, cmdUninstall, "failed to uninstall Dapr") - }) - - args := []string{"-f", runFilePath} - - go func() { - o, err := cmdRun("", args...) - t.Log(o) - t.Log(err) - }() + startDaprRunRetry(t, []int{3510}, func() { cmdStopWithRunTemplate(runFilePath) }, "-f", runFilePath) expNames := []string{ "actor/myactortype/actorid1/test1", @@ -283,7 +284,7 @@ func TestSchedulerGet(t *testing.T) { } } assert.Equal(c, len(expWorkflowPrefixes), foundWorkflows, "expected %d workflow items, found %d", len(expWorkflowPrefixes), foundWorkflows) - }, time.Second*30, time.Millisecond*10) + }, 240*time.Second, time.Second) t.Run("short", func(t *testing.T) { for _, name := range expNames { @@ -389,36 +390,20 @@ func TestSchedulerDelete(t *testing.T) { t.Skip("skipping scheduler tests in slim mode") } + // Reinstall Dapr to clear any stale scheduler state (workflow entries) + // from previous tests. Without this, wf.StartWorker hangs because the + // scheduler container still holds old workflow registrations. cmdUninstall() ensureDaprInstallation(t) - t.Cleanup(func() { - must(t, cmdUninstall, "failed to uninstall Dapr") - }) runFilePath := "../testdata/run-template-files/test-scheduler.yaml" - t.Cleanup(func() { - cmdStopWithRunTemplate(runFilePath) - must(t, cmdUninstall, "failed to uninstall Dapr") - }) - args := []string{"-f", runFilePath} - - go func() { - for range 10 { - o, err := cmdRun("", args...) - t.Log(o) - t.Log(err) - if err == nil { - break - } - time.Sleep(time.Second * 2) - } - }() + startDaprRunRetry(t, []int{3510}, func() { cmdStopWithRunTemplate(runFilePath) }, "-f", runFilePath) require.EventuallyWithT(t, func(c *assert.CollectT) { output, err := cmdSchedulerList() require.NoError(t, err) - assert.Len(c, strings.Split(output, "\n"), 10) - }, time.Second*30, time.Millisecond*10) + assert.GreaterOrEqual(c, countSchedulerEntries(output), 8) + }, 240*time.Second, time.Second) output, err := cmdSchedulerList() require.NoError(t, err) @@ -428,7 +413,7 @@ func TestSchedulerDelete(t *testing.T) { output, err = cmdSchedulerList() require.NoError(t, err) - assert.Len(t, strings.Split(output, "\n"), 9) + assert.Equal(t, 7, countSchedulerEntries(output)) _, err = cmdSchedulerDelete( "actor/myactortype/actorid2/test2", @@ -439,7 +424,7 @@ func TestSchedulerDelete(t *testing.T) { output, err = cmdSchedulerList() require.NoError(t, err) - assert.Len(t, strings.Split(output, "\n"), 6) + assert.Equal(t, 4, countSchedulerEntries(output)) _, err = cmdSchedulerDelete( "activity/test-scheduler/xyz1::0::1", @@ -449,17 +434,18 @@ func TestSchedulerDelete(t *testing.T) { output, err = cmdSchedulerList() require.NoError(t, err) - assert.Len(t, strings.Split(output, "\n"), 4) + assert.Equal(t, 2, countSchedulerEntries(output)) + lines := strings.Split(output, "\n") _, err = cmdSchedulerDelete( - strings.Fields(strings.Split(output, "\n")[1])[0], - strings.Fields(strings.Split(output, "\n")[2])[0], + strings.Fields(lines[1])[0], + strings.Fields(lines[2])[0], ) require.NoError(t, err) output, err = cmdSchedulerList() require.NoError(t, err) - assert.Len(t, strings.Split(output, "\n"), 2) + assert.Equal(t, 0, countSchedulerEntries(output)) } func TestSchedulerDeleteAllAll(t *testing.T) { @@ -469,41 +455,24 @@ func TestSchedulerDeleteAllAll(t *testing.T) { cmdUninstall() ensureDaprInstallation(t) - t.Cleanup(func() { - must(t, cmdUninstall, "failed to uninstall Dapr") - }) runFilePath := "../testdata/run-template-files/test-scheduler.yaml" - t.Cleanup(func() { - cmdStopWithRunTemplate(runFilePath) - must(t, cmdUninstall, "failed to uninstall Dapr") - }) - args := []string{"-f", runFilePath} - - go func() { - for range 10 { - o, err := cmdRun("", args...) - t.Log(o) - t.Log(err) - if err == nil { - break - } - time.Sleep(time.Second * 2) - } - }() + startDaprRunRetry(t, []int{3510}, func() { cmdStopWithRunTemplate(runFilePath) }, "-f", runFilePath) + // On slow macOS CI runners, workflow/activity entries can take over 60s to + // register, so use a 120s timeout. require.EventuallyWithT(t, func(c *assert.CollectT) { output, err := cmdSchedulerList() require.NoError(t, err) - assert.Len(c, strings.Split(output, "\n"), 10) - }, time.Second*30, time.Millisecond*10) + assert.GreaterOrEqual(c, countSchedulerEntries(output), 8) + }, 240*time.Second, time.Second) _, err := cmdSchedulerDeleteAll("all") require.NoError(t, err) output, err := cmdSchedulerList() require.NoError(t, err) - assert.Len(t, strings.Split(output, "\n"), 2) + assert.Equal(t, 0, countSchedulerEntries(output)) } func TestSchedulerDeleteAll(t *testing.T) { @@ -513,69 +482,53 @@ func TestSchedulerDeleteAll(t *testing.T) { cmdUninstall() ensureDaprInstallation(t) - t.Cleanup(func() { - must(t, cmdUninstall, "failed to uninstall Dapr") - }) runFilePath := "../testdata/run-template-files/test-scheduler.yaml" - t.Cleanup(func() { - cmdStopWithRunTemplate(runFilePath) - must(t, cmdUninstall, "failed to uninstall Dapr") - }) - - // Stop any existing instance before starting to ensure port is free - cmdStopWithRunTemplate(runFilePath) - time.Sleep(time.Millisecond * 500) - - args := []string{"-f", runFilePath} - - go func() { - for range 10 { - o, err := cmdRun("", args...) - t.Log(o) - t.Log(err) - if err == nil { - break - } - time.Sleep(time.Second * 2) - } - }() + startDaprRunRetry(t, []int{3510}, func() { cmdStopWithRunTemplate(runFilePath) }, "-f", runFilePath) + // Wait for all 8 scheduler entries to appear: 2 app jobs, 2 actor + // reminders, 4 workflow/activity entries. Using countSchedulerEntries + // avoids hard-coding a line count that breaks if the output format changes. + // On slow macOS CI runners, workflow/activity entries can take over 60s to + // register, so use a 120s timeout. require.EventuallyWithT(t, func(c *assert.CollectT) { output, err := cmdSchedulerList() require.NoError(t, err) - assert.GreaterOrEqual(c, len(strings.Split(output, "\n")), 7) - }, time.Second*30, time.Millisecond*10) + assert.GreaterOrEqual(c, countSchedulerEntries(output), 8) + }, 240*time.Second, time.Second) _, err := cmdSchedulerDeleteAll("app/test-scheduler") require.NoError(t, err) - output, err := cmdSchedulerList() - require.NoError(t, err) - assert.Len(t, strings.Split(output, "\n"), 8) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + output, err := cmdSchedulerList() + require.NoError(t, err) + assert.Equal(c, 6, countSchedulerEntries(output)) + }, 10*time.Second, 500*time.Millisecond) _, err = cmdSchedulerDeleteAll("workflow/test-scheduler/abc1") require.NoError(t, err) - output, err = cmdSchedulerList() + output, err := cmdSchedulerList() require.NoError(t, err) - assert.Len(t, strings.Split(output, "\n"), 7) + assert.Equal(t, 5, countSchedulerEntries(output)) _, err = cmdSchedulerDeleteAll("workflow/test-scheduler") require.NoError(t, err) output, err = cmdSchedulerList() require.NoError(t, err) - assert.Len(t, strings.Split(output, "\n"), 4) + assert.Equal(t, 2, countSchedulerEntries(output)) _, err = cmdSchedulerDeleteAll("actor/myactortype/actorid1") require.NoError(t, err) output, err = cmdSchedulerList() require.NoError(t, err) - assert.Len(t, strings.Split(output, "\n"), 3) + assert.Equal(t, 1, countSchedulerEntries(output)) _, err = cmdSchedulerDeleteAll("actor/myactortype") require.NoError(t, err) output, err = cmdSchedulerList() require.NoError(t, err) - assert.Len(t, strings.Split(output, "\n"), 2) + assert.Equal(t, 0, countSchedulerEntries(output)) } func TestSchedulerExportImport(t *testing.T) { @@ -585,34 +538,17 @@ func TestSchedulerExportImport(t *testing.T) { cmdUninstall() ensureDaprInstallation(t) - t.Cleanup(func() { - must(t, cmdUninstall, "failed to uninstall Dapr") - }) runFilePath := "../testdata/run-template-files/test-scheduler.yaml" - t.Cleanup(func() { - cmdStopWithRunTemplate(runFilePath) - must(t, cmdUninstall, "failed to uninstall Dapr") - }) - args := []string{"-f", runFilePath} - - go func() { - for range 10 { - o, err := cmdRun("", args...) - t.Log(o) - t.Log(err) - if err == nil { - break - } - time.Sleep(time.Second * 2) - } - }() + startDaprRunRetry(t, []int{3510}, func() { cmdStopWithRunTemplate(runFilePath) }, "-f", runFilePath) + // On slow macOS CI runners, workflow/activity entries can take over 60s to + // register, so use a 120s timeout. require.EventuallyWithT(t, func(c *assert.CollectT) { output, err := cmdSchedulerList() require.NoError(t, err) - assert.Len(c, strings.Split(output, "\n"), 10) - }, time.Second*30, time.Millisecond*10) + assert.GreaterOrEqual(c, countSchedulerEntries(output), 8) + }, 240*time.Second, time.Second) f := filepath.Join(t.TempDir(), "foo") _, err := cmdSchedulerExport("-o", f) @@ -622,7 +558,7 @@ func TestSchedulerExportImport(t *testing.T) { require.NoError(t, err) output, err := cmdSchedulerList() require.NoError(t, err) - assert.Len(t, strings.Split(output, "\n"), 2) + assert.Equal(t, 0, countSchedulerEntries(output)) _, err = cmdSchedulerImport("-f", f) require.NoError(t, err) @@ -630,6 +566,6 @@ func TestSchedulerExportImport(t *testing.T) { require.EventuallyWithT(t, func(c *assert.CollectT) { output, err := cmdSchedulerList() require.NoError(t, err) - assert.GreaterOrEqual(c, len(strings.Split(output, "\n")), 9) - }, time.Second*30, time.Millisecond*10) + assert.GreaterOrEqual(c, countSchedulerEntries(output), 7) + }, 60*time.Second, time.Second) } diff --git a/tests/e2e/standalone/stop_test.go b/tests/e2e/standalone/stop_test.go index 67e0f97e0..20d293259 100644 --- a/tests/e2e/standalone/stop_test.go +++ b/tests/e2e/standalone/stop_test.go @@ -18,21 +18,12 @@ package standalone_test import ( "runtime" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestStandaloneStop(t *testing.T) { - ensureDaprInstallation(t) - - time.Sleep(5 * time.Second) - - t.Cleanup(func() { - // remove dapr installation after all tests in this function. - must(t, cmdUninstall, "failed to uninstall Dapr") - }) runArgs := []string{"run", "--app-id", "dapr_e2e_stop", "--"} if runtime.GOOS == "windows" { diff --git a/tests/e2e/standalone/stop_with_run_template_test.go b/tests/e2e/standalone/stop_with_run_template_test.go index eea8b6137..93337a6a3 100644 --- a/tests/e2e/standalone/stop_with_run_template_test.go +++ b/tests/e2e/standalone/stop_with_run_template_test.go @@ -19,7 +19,7 @@ package standalone_test import ( "encoding/json" - "fmt" + "strconv" "strings" "testing" "time" @@ -29,21 +29,23 @@ import ( ) func TestStopAppsStartedWithRunTemplate(t *testing.T) { + if isSlimMode() { + t.Skip("skipping: slim mode has no placement/scheduler so template apps cannot start fully") + } + // clean up logs before starting the tests cleanUpLogs() - ensureDaprInstallation(t) t.Cleanup(func() { - // remove dapr installation after all tests in this function. - tearDownTestSetup(t) + cleanUpLogs() }) t.Run("stop apps by passing run template file", func(t *testing.T) { t.Cleanup(func() { cleanUpLogs() }) + waitForPortsFree(t, 3510, 3511) go ensureAllAppsStartedWithRunTemplate(t) - time.Sleep(10 * time.Second) cliPID := getCLIPID(t) // Assert dapr list contains template name assertTemplateListOutput(t, "test_dapr_template") @@ -57,8 +59,8 @@ func TestStopAppsStartedWithRunTemplate(t *testing.T) { t.Cleanup(func() { cleanUpLogs() }) + waitForPortsFree(t, 3510, 3511) go ensureAllAppsStartedWithRunTemplate(t) - time.Sleep(10 * time.Second) cliPID := getCLIPID(t) output, err := cmdStopWithRunTemplate("../testdata/run-template-files") assert.NoError(t, err, "failed to stop apps started with run template") @@ -70,6 +72,7 @@ func TestStopAppsStartedWithRunTemplate(t *testing.T) { t.Cleanup(func() { cleanUpLogs() }) + waitForPortsFree(t, 3510, 3511) go ensureAllAppsStartedWithRunTemplate(t) time.Sleep(10 * time.Second) output, err := cmdStopWithRunTemplate("../testdata/invalid-dir") @@ -85,8 +88,8 @@ func TestStopAppsStartedWithRunTemplate(t *testing.T) { t.Cleanup(func() { cleanUpLogs() }) + waitForPortsFree(t, 3510, 3511) go ensureAllAppsStartedWithRunTemplate(t) - time.Sleep(10 * time.Second) cliPID := getCLIPID(t) output, err := cmdStopWithAppID("emit-metrics", "processor") assert.NoError(t, err, "failed to stop apps started with run template") @@ -124,12 +127,29 @@ func tearDownTestSetup(t *testing.T) { } func getCLIPID(t *testing.T) string { - output, err := cmdList("json") - require.NoError(t, err, "failed to list apps") - result := []map[string]interface{}{} - err = json.Unmarshal([]byte(output), &result) - assert.Equal(t, 2, len(result)) - return fmt.Sprintf("%v", result[0]["cliPid"]) + var pid string + require.Eventually(t, func() bool { + output, err := cmdList("json") + if err != nil { + return false + } + var result []map[string]interface{} + if err := json.Unmarshal([]byte(output), &result); err != nil { + return false + } + if len(result) != 2 { + return false + } + // cliPid is a JSON number unmarshaled as float64; wait until it + // is a positive value so verifyCLIPIDNotExist can match reliably. + v, _ := result[0]["cliPid"].(float64) + if v <= 0 { + return false + } + pid = strconv.Itoa(int(v)) + return true + }, 30*time.Second, time.Second, "expected 2 apps with valid cliPid in list") + return pid } func verifyCLIPIDNotExist(t *testing.T, pid string) { @@ -144,17 +164,20 @@ func verifyCLIPIDNotExist(t *testing.T, pid string) { } func assertTemplateListOutput(t *testing.T, name string) { - output, err := cmdList("json") - t.Log(output) - require.NoError(t, err, "dapr list failed") - var result []map[string]interface{} - - err = json.Unmarshal([]byte(output), &result) - - assert.NoError(t, err, "output was not valid JSON") - - assert.Len(t, result, 2, "expected two apps to be running") - assert.Equal(t, name, result[0]["runTemplateName"], "expected run template name to be %s", name) - assert.NotEmpty(t, result[0]["appLogPath"], "expected appLogPath to be non-empty") - assert.NotEmpty(t, result[0]["daprdLogPath"], "expected daprdLogPath to be non-empty") + require.Eventually(t, func() bool { + output, err := cmdList("json") + if err != nil { + return false + } + var result []map[string]interface{} + if err := json.Unmarshal([]byte(output), &result); err != nil { + return false + } + if len(result) != 2 { + return false + } + return result[0]["runTemplateName"] == name && + result[0]["appLogPath"] != nil && result[0]["appLogPath"] != "" && + result[0]["daprdLogPath"] != nil && result[0]["daprdLogPath"] != "" + }, 30*time.Second, time.Second, "expected 2 apps with template name %q and non-empty log paths", name) } diff --git a/tests/e2e/standalone/uninstall_test.go b/tests/e2e/standalone/uninstall_test.go index 11cd85c3c..538cc293c 100644 --- a/tests/e2e/standalone/uninstall_test.go +++ b/tests/e2e/standalone/uninstall_test.go @@ -29,6 +29,11 @@ import ( ) func TestStandaloneUninstall(t *testing.T) { + // Reinstall Dapr when done so subsequent tests still work. + t.Cleanup(func() { + ensureDaprInstallation(t) + }) + t.Run("uninstall should error out if container runtime is not valid", func(t *testing.T) { output, err := cmdUninstall("--container-runtime", "invalid") require.Error(t, err, "expected error if container runtime is invalid") diff --git a/tests/e2e/standalone/utils.go b/tests/e2e/standalone/utils.go index 4eff6287b..cdbf5c3ef 100644 --- a/tests/e2e/standalone/utils.go +++ b/tests/e2e/standalone/utils.go @@ -18,12 +18,16 @@ package standalone_test import ( "bufio" "errors" + "fmt" + "net" "os" "os/exec" "path/filepath" "runtime" "strings" + "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -97,6 +101,9 @@ spec: // executeAgainstRunningDapr runs a function against a running Dapr instance. // If Dapr or the App throws an error, the test is marked as failed. +// After f() returns the process is given 60s to exit on its own (f() +// should have called `dapr stop`). If it hasn't exited by then, the +// process is killed so the test doesn't hang until the global 40m timeout. func executeAgainstRunningDapr(t *testing.T, f func(), daprArgs ...string) { daprPath := common.GetDaprPath() @@ -106,6 +113,25 @@ func executeAgainstRunningDapr(t *testing.T, f func(), daprArgs ...string) { cmd.Start() + // scanDone is closed when the scanner.Scan loop finishes, meaning + // the process has closed its stdout pipe (i.e., is exiting). + scanDone := make(chan struct{}) + + // Safety goroutine: kill the process if it is still running after + // 5 minutes. This prevents a 40-minute hang when f() blocks + // (e.g. a subtest hangs on a channel receive) or when f() fails + // to stop daprd. Killing the process closes the stdout pipe, + // which unblocks scanner.Scan() below. + go func() { + select { + case <-time.After(5 * time.Minute): + t.Log("executeAgainstRunningDapr: process did not exit within 5m, killing") + cmd.Process.Kill() + case <-scanDone: + // Process exited on its own — nothing to do. + } + }() + daprOutput := "" for scanner.Scan() { outputChunk := scanner.Text() @@ -115,25 +141,102 @@ func executeAgainstRunningDapr(t *testing.T, f func(), daprArgs ...string) { } daprOutput += outputChunk } + close(scanDone) err := cmd.Wait() hasAppCommand := !strings.Contains(daprOutput, "WARNING: no application command found") + terminatedBySignal := strings.Contains(daprOutput, "terminated signal received: shutting down") if err != nil { var exitErr *exec.ExitError if errors.As(err, &exitErr) && exitErr.ExitCode() == 1 && strings.Contains(daprOutput, "Exited Dapr successfully") && - (!hasAppCommand || strings.Contains(daprOutput, "Exited App successfully")) { + (!hasAppCommand || terminatedBySignal || strings.Contains(daprOutput, "Exited App successfully")) { err = nil } } require.NoError(t, err, "dapr didn't exit cleanly") assert.NotContains(t, daprOutput, "The App process exited with error code: exit status", "Stop command should have been called before the app had a chance to exit") assert.Contains(t, daprOutput, "Exited Dapr successfully") - if hasAppCommand { + if hasAppCommand && !terminatedBySignal { assert.Contains(t, daprOutput, "Exited App successfully") } } +// waitForPortsFree polls until all given ports are available for binding. +// This prevents port contention between sequential tests that reuse +// hardcoded ports (e.g. container ports from dapr init). +func waitForPortsFree(t *testing.T, ports ...int) { + t.Helper() + require.Eventually(t, func() bool { + for _, port := range ports { + ln, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port)) + if err != nil { + return false + } + ln.Close() + } + return true + }, 60*time.Second, time.Second, "ports %v not available in time", ports) +} + +// startDaprRun starts `dapr run` in a background goroutine and registers +// cleanup handlers that stop the app and wait for the goroutine to finish. +// This prevents "Log in goroutine after Test has completed" panics that +// occur when the cmdRun goroutine outlives the test. +// +// stopArgs is passed to cmdStopWithAppID or cmdStopWithRunTemplate depending +// on whether it looks like a file path (contains "/" or ".yaml"). +func startDaprRun(t *testing.T, ports []int, stopFn func(), runArgs ...string) { + t.Helper() + + if len(ports) > 0 { + waitForPortsFree(t, ports...) + } + + var wg sync.WaitGroup + // Register wg.Wait first so it runs last (LIFO cleanup order). + t.Cleanup(func() { wg.Wait() }) + t.Cleanup(stopFn) + + wg.Add(1) + go func() { + defer wg.Done() + o, _ := cmdRun("", runArgs...) + // Only safe to call t.Log here because cleanup waits for us + // via wg.Wait(). + t.Log(o) + }() +} + +// startDaprRunRetry is like startDaprRun but retries cmdRun up to 10 times +// on failure. Used by scheduler tests where port contention can cause +// transient startup failures. +func startDaprRunRetry(t *testing.T, ports []int, stopFn func(), runArgs ...string) { + t.Helper() + + if len(ports) > 0 { + waitForPortsFree(t, ports...) + } + + var wg sync.WaitGroup + t.Cleanup(func() { wg.Wait() }) + t.Cleanup(stopFn) + + wg.Add(1) + go func() { + defer wg.Done() + for range 10 { + o, err := cmdRun("", runArgs...) + t.Log(o) + if err == nil { + break + } + t.Log(err) + time.Sleep(time.Second * 2) + } + }() +} + // ensureDaprInstallation ensures that Dapr is installed. // If Dapr is not installed, a new installation is attempted. func ensureDaprInstallation(t *testing.T) { @@ -144,6 +247,16 @@ func ensureDaprInstallation(t *testing.T) { daprPath := filepath.Join(homeDir, ".dapr") _, err = os.Stat(daprPath) if os.IsNotExist(err) { + // Wait for container ports from a previous dapr installation to + // be fully released. On macOS, container port bindings can linger + // briefly after `dapr uninstall` removes the containers. + if !isSlimMode() { + waitForPortsFree(t, + 58080, // placement health + 58081, // scheduler health + 50005, // placement gRPC + ) + } args := []string{ "--runtime-version", daprRuntimeVersion, "--dashboard-version", daprDashboardVersion, diff --git a/tests/e2e/standalone/version_test.go b/tests/e2e/standalone/version_test.go index c3372a168..60f24c4b9 100644 --- a/tests/e2e/standalone/version_test.go +++ b/tests/e2e/standalone/version_test.go @@ -26,7 +26,6 @@ import ( ) func TestStandaloneVersion(t *testing.T) { - ensureDaprInstallation(t) t.Run("version", func(t *testing.T) { output, err := cmdVersion("") t.Log(output) diff --git a/tests/e2e/standalone/windows_run_template_test.go b/tests/e2e/standalone/windows_run_template_test.go index 43ad6180c..8cad3e881 100644 --- a/tests/e2e/standalone/windows_run_template_test.go +++ b/tests/e2e/standalone/windows_run_template_test.go @@ -36,11 +36,6 @@ type AppTestOutput struct { } func TestRunWithTemplateFile(t *testing.T) { - ensureDaprInstallation(t) - t.Cleanup(func() { - // remove dapr installation after all tests in this function. - must(t, cmdUninstall, "failed to uninstall Dapr") - }) // These tests are dependent on run template files in ../testdata/run-template-files folder. t.Run("valid template file", func(t *testing.T) { runFilePath := "../testdata/run-template-files/dapr.yaml" diff --git a/tests/e2e/standalone/workflow_test.go b/tests/e2e/standalone/workflow_test.go index a4698e7d1..dc47b2fb7 100644 --- a/tests/e2e/standalone/workflow_test.go +++ b/tests/e2e/standalone/workflow_test.go @@ -38,25 +38,15 @@ func TestWorkflowList(t *testing.T) { t.Skip("skipping workflow tests in slim mode") } - cmdUninstall() - ensureDaprInstallation(t) - t.Cleanup(func() { - must(t, cmdUninstall, "failed to uninstall Dapr") - }) - runFilePath := "../testdata/run-template-files/test-workflow.yaml" appID := "test-workflow" - t.Cleanup(func() { - cmdStopWithAppID(appID) - }) - args := []string{"-f", runFilePath} + startDaprRun(t, []int{3510}, func() { cmdStopWithAppID(appID) }, "-f", runFilePath) - go func() { - o, _ := cmdRun("", args...) - t.Log(o) - }() + waitForAppHealthy(t, 60*time.Second, "test-workflow") + + // Purge any leftover workflow instances from previous test runs. + cmdWorkflowPurge(appID, redisConnString, "--all") - time.Sleep(time.Second * 5) output, err := cmdWorkflowList(appID, redisConnString) require.NoError(t, err) assert.Equal(t, `❌ No workflow found in namespace "default" for app ID "test-workflow" @@ -95,25 +85,13 @@ func TestWorkflowRaiseEvent(t *testing.T) { t.Skip("skipping workflow tests in slim mode") } - cmdUninstall() - ensureDaprInstallation(t) - t.Cleanup(func() { - must(t, cmdUninstall, "failed to uninstall Dapr") - }) - runFilePath := "../testdata/run-template-files/test-workflow.yaml" appID := "test-workflow" - t.Cleanup(func() { - cmdStopWithAppID(appID) - }) - args := []string{"-f", runFilePath} + startDaprRun(t, []int{3510}, func() { cmdStopWithAppID(appID) }, "-f", runFilePath) - go func() { - o, _ := cmdRun("", args...) - t.Log(o) - }() + waitForAppHealthy(t, 60*time.Second, "test-workflow") + cmdWorkflowPurge(appID, redisConnString, "--all") - time.Sleep(time.Second * 5) output, err := cmdWorkflowRun(appID, "EventWorkflow", "--instance-id=foo") require.NoError(t, err, output) @@ -176,40 +154,47 @@ func TestWorkflowReRun(t *testing.T) { t.Skip("skipping workflow tests in slim mode") } - cmdUninstall() - ensureDaprInstallation(t) - t.Cleanup(func() { - must(t, cmdUninstall, "failed to uninstall Dapr") - }) - runFilePath := "../testdata/run-template-files/test-workflow.yaml" appID := "test-workflow" - t.Cleanup(func() { - cmdStopWithAppID(appID) - }) - args := []string{"-f", runFilePath} + startDaprRun(t, []int{3510}, func() { cmdStopWithAppID(appID) }, "-f", runFilePath) - go func() { - o, _ := cmdRun("", args...) - t.Log(o) - }() + waitForAppHealthy(t, 60*time.Second, "test-workflow") - time.Sleep(time.Second * 5) + cmdWorkflowPurge(appID, redisConnString, "--all") output, err := cmdWorkflowRun(appID, "SimpleWorkflow", "--instance-id=foo") require.NoError(t, err, output) - time.Sleep(3 * time.Second) + // Wait for the workflow instance to reach a terminal state before + // attempting rerun operations. Rerun requires the instance to be in a + // terminal state (COMPLETED/FAILED/TERMINATED). + require.Eventually(t, func() bool { + out, err := cmdWorkflowList(appID, redisConnString, "-o", "json") + if err != nil { + return false + } + var list []map[string]interface{} + if err := json.Unmarshal([]byte(out), &list); err != nil { + return false + } + for _, item := range list { + if item["instanceID"] == "foo" { + status, _ := item["runtimeStatus"].(string) + return status == "COMPLETED" || status == "FAILED" || status == "TERMINATED" + } + } + return false + }, 60*time.Second, time.Second, "workflow instance 'foo' did not reach terminal state") t.Run("rerun from beginning", func(t *testing.T) { output, err := cmdWorkflowReRun(appID, "foo") - require.NoError(t, err) + require.NoError(t, err, output) assert.Contains(t, output, "Rerunning workflow instance") }) t.Run("rerun with new instance ID", func(t *testing.T) { output, err := cmdWorkflowReRun(appID, "foo", "--new-instance-id", "bar") - require.NoError(t, err) + require.NoError(t, err, output) assert.Contains(t, output, "bar") }) @@ -232,25 +217,12 @@ func TestWorkflowPurge(t *testing.T) { t.Skip("skipping workflow tests in slim mode") } - cmdUninstall() - ensureDaprInstallation(t) - t.Cleanup(func() { - must(t, cmdUninstall, "failed to uninstall Dapr") - }) - runFilePath := "../testdata/run-template-files/test-workflow.yaml" appID := "test-workflow" - t.Cleanup(func() { - cmdStopWithAppID(appID) - }) - args := []string{"-f", runFilePath} - - go func() { - o, _ := cmdRun("", args...) - t.Log(o) - }() + startDaprRun(t, []int{3510}, func() { cmdStopWithAppID(appID) }, "-f", runFilePath) - time.Sleep(5 * time.Second) + waitForAppHealthy(t, 60*time.Second, "test-workflow") + cmdWorkflowPurge(appID, redisConnString, "--all") for i := 0; i < 3; i++ { output, err := cmdWorkflowRun(appID, "SimpleWorkflow", @@ -290,6 +262,8 @@ func TestWorkflowPurge(t *testing.T) { require.NoError(t, err, output) _, _ = cmdWorkflowTerminate(appID, "purge-all-"+strconv.Itoa(i)) } + // Wait for workflows to reach terminal state after terminate. + time.Sleep(2 * time.Second) output, err := cmdWorkflowPurge(appID, redisConnString, "--all") require.NoError(t, err, output) @@ -324,19 +298,28 @@ func TestWorkflowPurge(t *testing.T) { "--instance-id=also-sched") require.NoError(t, err) + // Wait for scheduler entries to appear while workflow is still running. + require.Eventually(t, func() bool { + output, err := cmdSchedulerList() + if err != nil { + return false + } + return len(strings.Split(output, "\n")) > 2 + }, 30*time.Second, time.Second, "expected scheduler entries to appear") + output, err = cmdWorkflowTerminate(appID, "also-sched") require.NoError(t, err, output) - output, err = cmdSchedulerList() - require.NoError(t, err) - assert.Greater(t, len(strings.Split(output, "\n")), 2) - output, err = cmdWorkflowPurge(appID, "also-sched") require.NoError(t, err, output) - output, err = cmdSchedulerList() - require.NoError(t, err) - assert.Len(t, strings.Split(output, "\n"), 2) + require.Eventually(t, func() bool { + output, err := cmdSchedulerList() + if err != nil { + return false + } + return len(strings.Split(output, "\n")) == 2 + }, 30*time.Second, time.Second, "expected scheduler entries to be purged") }) } @@ -345,25 +328,12 @@ func TestWorkflowFilters(t *testing.T) { t.Skip("skipping workflow tests in slim mode") } - cmdUninstall() - ensureDaprInstallation(t) - t.Cleanup(func() { - must(t, cmdUninstall, "failed to uninstall Dapr") - }) - runFilePath := "../testdata/run-template-files/test-workflow.yaml" appID := "test-workflow" - t.Cleanup(func() { - cmdStopWithAppID(appID) - }) - args := []string{"-f", runFilePath} + startDaprRun(t, []int{3510}, func() { cmdStopWithAppID(appID) }, "-f", runFilePath) - go func() { - o, _ := cmdRun("", args...) - t.Log(o) - }() - - time.Sleep(5 * time.Second) + waitForAppHealthy(t, 60*time.Second, "test-workflow") + cmdWorkflowPurge(appID, redisConnString, "--all") _, _ = cmdWorkflowRun(appID, "SimpleWorkflow", "--instance-id=simple-1") _, _ = cmdWorkflowRun(appID, "LongWorkflow", "--instance-id=long-1") @@ -411,32 +381,40 @@ func TestWorkflowChildCalls(t *testing.T) { t.Skip("skipping workflow tests in slim mode") } - cmdUninstall() - ensureDaprInstallation(t) - t.Cleanup(func() { - must(t, cmdUninstall, "failed to uninstall Dapr") - }) - runFilePath := "../testdata/run-template-files/test-workflow.yaml" appID := "test-workflow" - t.Cleanup(func() { - cmdStopWithAppID(appID) - }) - args := []string{"-f", runFilePath} + startDaprRun(t, []int{3510}, func() { cmdStopWithAppID(appID) }, "-f", runFilePath) - go func() { - o, _ := cmdRun("", args...) - t.Log(o) - }() - - time.Sleep(5 * time.Second) + waitForAppHealthy(t, 60*time.Second, "test-workflow") + cmdWorkflowPurge(appID, redisConnString, "--all") t.Run("parent child workflow", func(t *testing.T) { input := `{"test": "parent-child", "value": 42}` output, err := cmdWorkflowRun(appID, "ParentWorkflow", "--input", input, "--instance-id=parent-1") require.NoError(t, err, output) - time.Sleep(5 * time.Second) + // Poll until the parent workflow and child workflows appear. + require.Eventually(t, func() bool { + out, err := cmdWorkflowList(appID, redisConnString, "-o", "json") + if err != nil { + return false + } + var list []map[string]interface{} + if err := json.Unmarshal([]byte(out), &list); err != nil { + return false + } + var parentFound bool + var childCount int + for _, item := range list { + if item["instanceID"] == "parent-1" { + parentFound = true + } + if name, ok := item["name"].(string); ok && name == "ChildWorkflow" { + childCount++ + } + } + return parentFound && childCount >= 2 + }, 30*time.Second, time.Second, "parent workflow and children did not appear") output, err = cmdWorkflowList(appID, redisConnString, "-o", "json") require.NoError(t, err) @@ -463,7 +441,24 @@ func TestWorkflowChildCalls(t *testing.T) { output, err := cmdWorkflowRun(appID, "NestedParentWorkflow", "--instance-id=nested-parent") require.NoError(t, err) - time.Sleep(6 * time.Second) + // Poll until recursive child workflows appear. + require.Eventually(t, func() bool { + out, err := cmdWorkflowList(appID, redisConnString, "-o", "json") + if err != nil { + return false + } + var list []map[string]interface{} + if err := json.Unmarshal([]byte(out), &list); err != nil { + return false + } + count := 0 + for _, item := range list { + if name, ok := item["name"].(string); ok && name == "RecursiveChildWorkflow" { + count++ + } + } + return count >= 2 + }, 30*time.Second, time.Second, "recursive child workflows did not appear") output, err = cmdWorkflowList(appID, redisConnString, "-o", "json") require.NoError(t, err) @@ -486,7 +481,24 @@ func TestWorkflowChildCalls(t *testing.T) { output, err := cmdWorkflowRun(appID, "FanOutWorkflow", "--input", input, "--instance-id=fanout-1") require.NoError(t, err) - time.Sleep(5 * time.Second) + // Poll until fan-out child workflows appear. + require.Eventually(t, func() bool { + out, err := cmdWorkflowList(appID, redisConnString, "-o", "json") + if err != nil { + return false + } + var list []map[string]interface{} + if err := json.Unmarshal([]byte(out), &list); err != nil { + return false + } + count := 0 + for _, item := range list { + if name, ok := item["name"].(string); ok && name == "ChildWorkflow" { + count++ + } + } + return count >= parallelCount + }, 30*time.Second, time.Second, "fan-out child workflows did not appear") output, err = cmdWorkflowList(appID, redisConnString, "-o", "json") require.NoError(t, err) @@ -507,21 +519,25 @@ func TestWorkflowChildCalls(t *testing.T) { output, err := cmdWorkflowRun(appID, "ParentWorkflow", "--input", `{"fail": true}`, "--instance-id=parent-1") require.NoError(t, err, output) - time.Sleep(5 * time.Second) - - output, err = cmdWorkflowList(appID, redisConnString, "-o", "json") - require.NoError(t, err) - - var list []map[string]interface{} - require.NoError(t, json.Unmarshal([]byte(output), &list)) - - for _, item := range list { - if item["instanceID"] == "parent-1" { - status := item["runtimeStatus"].(string) - assert.Contains(t, []string{"COMPLETED", "FAILED"}, status) - break + // Poll until the parent workflow reaches a terminal state. + // On slow CI runners the workflow may still be RUNNING after 5s. + require.Eventually(t, func() bool { + out, err := cmdWorkflowList(appID, redisConnString, "-o", "json") + if err != nil { + return false } - } + var list []map[string]interface{} + if err := json.Unmarshal([]byte(out), &list); err != nil { + return false + } + for _, item := range list { + if item["instanceID"] == "parent-1" { + status, _ := item["runtimeStatus"].(string) + return status == "COMPLETED" || status == "FAILED" + } + } + return false + }, 30*time.Second, time.Second, "parent-1 workflow did not reach terminal state") }) } @@ -530,26 +546,13 @@ func TestWorkflowHistory(t *testing.T) { t.Skip("skipping workflow tests in slim mode") } - cmdUninstall() - ensureDaprInstallation(t) - t.Cleanup(func() { - must(t, cmdUninstall, "failed to uninstall Dapr") - }) - runFilePath := "../testdata/run-template-files/test-workflow.yaml" appID := "test-workflow" - t.Cleanup(func() { - cmdStopWithAppID(appID) - }) - args := []string{"-f", runFilePath} + startDaprRun(t, []int{3510}, func() { cmdStopWithAppID(appID) }, "-f", runFilePath) - go func() { - o, _ := cmdRun("", args...) - t.Log(o) - }() + waitForAppHealthy(t, 60*time.Second, "test-workflow") + cmdWorkflowPurge(appID, redisConnString, "--all") - // Wait and create a workflow - time.Sleep(5 * time.Second) output, err := cmdWorkflowRun(appID, "SimpleWorkflow", "--instance-id=history-test") require.NoError(t, err, output) @@ -584,26 +587,13 @@ func TestWorkflowSuspendResume(t *testing.T) { t.Skip("skipping workflow tests in slim mode") } - cmdUninstall() - ensureDaprInstallation(t) - t.Cleanup(func() { - must(t, cmdUninstall, "failed to uninstall Dapr") - }) - runFilePath := "../testdata/run-template-files/test-workflow.yaml" appID := "test-workflow" - t.Cleanup(func() { - cmdStopWithAppID(appID) - }) - args := []string{"-f", runFilePath} + startDaprRun(t, []int{3510}, func() { cmdStopWithAppID(appID) }, "-f", runFilePath) - go func() { - o, _ := cmdRun("", args...) - t.Log(o) - }() + waitForAppHealthy(t, 60*time.Second, "test-workflow") + cmdWorkflowPurge(appID, redisConnString, "--all") - // Wait and create a long-running workflow - time.Sleep(5 * time.Second) output, err := cmdWorkflowRun(appID, "LongWorkflow", "--instance-id=suspend-resume-test") require.NoError(t, err, output) @@ -662,26 +652,13 @@ func TestWorkflowTerminate(t *testing.T) { t.Skip("skipping workflow tests in slim mode") } - cmdUninstall() - ensureDaprInstallation(t) - t.Cleanup(func() { - must(t, cmdUninstall, "failed to uninstall Dapr") - }) - runFilePath := "../testdata/run-template-files/test-workflow.yaml" appID := "test-workflow" - t.Cleanup(func() { - cmdStopWithAppID(appID) - }) - args := []string{"-f", runFilePath} + startDaprRun(t, []int{3510}, func() { cmdStopWithAppID(appID) }, "-f", runFilePath) - go func() { - o, _ := cmdRun("", args...) - t.Log(o) - }() + waitForAppHealthy(t, 60*time.Second, "test-workflow") + cmdWorkflowPurge(appID, redisConnString, "--all") - // Wait and create a workflow for testing - time.Sleep(5 * time.Second) output, err := cmdWorkflowRun(appID, "LongWorkflow", "--instance-id=terminate-test") require.NoError(t, err, output)