Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,8 @@ public DynamicUpdateHandler getHandler() {

void sleep(Duration duration);

void sleep(Duration duration, TimerOptions options);

boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition);

void await(String reason, Supplier<Boolean> unblockCondition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public void sleep(Duration duration) {
next.sleep(duration);
}

@Override
public void sleep(Duration duration, TimerOptions options) {
next.sleep(duration, options);
}

@Override
public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
return next.await(timeout, reason, unblockCondition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,11 @@ public void sleep(Duration duration) {
newTimer(duration).get();
}

@Override
public void sleep(Duration duration, TimerOptions options) {
newTimer(duration, options).get();
}

@Override
public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
Promise<Void> timer = newTimer(timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,11 @@ public static void sleep(Duration duration) {
getWorkflowOutboundInterceptor().sleep(duration);
}

public static void sleep(Duration duration, TimerOptions options) {
assertNotReadOnly("sleep");
getWorkflowOutboundInterceptor().sleep(duration, options);
}

public static boolean isWorkflowThread() {
return WorkflowThreadMarker.isWorkflowThread();
}
Expand Down
10 changes: 10 additions & 0 deletions temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -571,11 +571,21 @@ public static void sleep(Duration duration) {
WorkflowInternal.sleep(duration);
}

/** Must be called instead of {@link Thread#sleep(long)} to guarantee determinism. */
public static void sleep(Duration duration, TimerOptions options) {
WorkflowInternal.sleep(duration, options);
}

/** Must be called instead of {@link Thread#sleep(long)} to guarantee determinism. */
public static void sleep(long millis) {
WorkflowInternal.sleep(Duration.ofMillis(millis));
}

/** Must be called instead of {@link Thread#sleep(long)} to guarantee determinism. */
public static void sleep(long millis, TimerOptions options) {
WorkflowInternal.sleep(Duration.ofMillis(millis), options);
}

/**
* Block current thread until unblockCondition is evaluated to true.
*
Expand Down
87 changes: 87 additions & 0 deletions temporal-sdk/src/test/java/io/temporal/workflow/SleepTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package io.temporal.workflow;

import static org.junit.Assert.assertTrue;

import io.temporal.client.WorkflowOptions;
import io.temporal.testing.internal.SDKTestOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.testing.internal.TracingWorkerInterceptor;
import io.temporal.workflow.shared.TestWorkflows.TestTraceWorkflow;
import java.time.Duration;
import java.util.List;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

public class SleepTest {

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder().setWorkflowTypes(TestTimerWorkflowImpl.class).build();

@Test
public void testSleep() {
WorkflowOptions options;
if (testWorkflowRule.isUseExternalService()) {
options = SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue());
} else {
options =
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder()
.setWorkflowRunTimeout(Duration.ofDays(1))
.build();
}
TestTraceWorkflow client =
testWorkflowRule.getWorkflowClient().newWorkflowStub(TestTraceWorkflow.class, options);
String result = client.execute();
Assert.assertEquals("testSleep", result);
if (testWorkflowRule.isUseExternalService()) {
testWorkflowRule
.getInterceptor(TracingWorkerInterceptor.class)
.setExpected(
"interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP,
"registerQuery getTrace",
"newThread workflow-method",
"newTimer PT0.7S",
"newTimer PT1.3S",
"currentTimeMillis",
"newTimer PT10S",
"currentTimeMillis",
"currentTimeMillis",
"currentTimeMillis");
} else {
testWorkflowRule
.getInterceptor(TracingWorkerInterceptor.class)
.setExpected(
"interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP,
"registerQuery getTrace",
"newThread workflow-method",
"newTimer PT11M40S",
"newTimer PT21M40S",
"currentTimeMillis",
"newTimer PT10H",
"currentTimeMillis",
"currentTimeMillis",
"currentTimeMillis");
}
}

public static class TestTimerWorkflowImpl implements TestTraceWorkflow {

@Override
public String execute() {
boolean useExternalService = SDKTestWorkflowRule.useExternalService;
Duration timeout1 = useExternalService ? Duration.ofMillis(700) : Duration.ofSeconds(700);
long time = Workflow.currentTimeMillis();
Workflow.sleep(timeout1, TimerOptions.newBuilder().setSummary("timer1").build());
long slept = Workflow.currentTimeMillis() - time;
// Also checks that rounding up to a second works.
assertTrue(slept + "<" + timeout1.toMillis(), slept >= timeout1.toMillis());
return "testSleep";
}

@Override
public List<String> getTrace() {
throw new UnsupportedOperationException("not implemented");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,11 @@ public void sleep(Duration duration) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public void sleep(Duration duration, TimerOptions options) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
throw new UnsupportedOperationException("not implemented");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,14 @@ public void sleep(Duration duration) {
next.sleep(duration);
}

@Override
public void sleep(Duration duration, TimerOptions options) {
if (!WorkflowUnsafe.isReplaying()) {
trace.add("sleep " + duration);
}
next.sleep(duration, options);
}

@Override
public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
if (!WorkflowUnsafe.isReplaying()) {
Expand Down
Loading