diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 744709946..907470ada 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,6 +9,10 @@ on: - main workflow_dispatch: +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + jobs: build: runs-on: ubuntu-latest diff --git a/.github/workflows/harness-image.yml b/.github/workflows/harness-image.yml new file mode 100644 index 000000000..9c1befcfa --- /dev/null +++ b/.github/workflows/harness-image.yml @@ -0,0 +1,58 @@ +name: Harness Worker Image + +on: + push: + branches: [main, TOOL-33-test-worker] + paths: + - "harness/**" + - "build.gradle" + - "settings.gradle" + - ".github/workflows/harness-image.yml" + release: + types: [published] + workflow_dispatch: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + build-and-push: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Log in to GHCR + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Docker metadata + id: meta + uses: docker/metadata-action@v5 + with: + images: ghcr.io/conductor-oss/java-sdk/harness-worker + tags: | + type=raw,value=latest + type=raw,value=${{ github.event.release.tag_name }},enable=${{ github.event_name == 'release' }} + + - name: Build and push + uses: docker/build-push-action@v6 + with: + context: . + file: ./harness/Dockerfile + platforms: linux/amd64,linux/arm64 + push: true + tags: ${{ steps.meta.outputs.tags }} diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 38d0e6e04..90497256b 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -11,6 +11,10 @@ permissions: statuses: write checks: write +concurrency: + group: ${{ github.workflow }}-${{ github.event.workflow_run.head_branch }} + cancel-in-progress: true + jobs: integration-tests: runs-on: ubuntu-latest diff --git a/.github/workflows/osv-scanner.yml b/.github/workflows/osv-scanner.yml index 5d573f05a..def5bb9da 100644 --- a/.github/workflows/osv-scanner.yml +++ b/.github/workflows/osv-scanner.yml @@ -11,6 +11,10 @@ permissions: contents: read security-events: write +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + jobs: scan-pr: uses: "google/osv-scanner-action/.github/workflows/osv-scanner-reusable-pr.yml@v2.3.3" diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index a59520664..3ae1e2f12 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.3-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/harness/Dockerfile b/harness/Dockerfile new file mode 100644 index 000000000..36740c980 --- /dev/null +++ b/harness/Dockerfile @@ -0,0 +1,10 @@ +FROM eclipse-temurin:21-jdk AS build +WORKDIR /app +COPY . . +RUN ./gradlew :harness:installDist -x test --no-daemon + +FROM eclipse-temurin:21-jre +WORKDIR /app +COPY --from=build /app/harness/build/install/harness/lib/ ./lib/ + +ENTRYPOINT ["java", "-cp", "lib/*", "io.orkes.conductor.harness.HarnessMain"] diff --git a/harness/README.md b/harness/README.md new file mode 100644 index 000000000..9a5ecdd80 --- /dev/null +++ b/harness/README.md @@ -0,0 +1,73 @@ +# Java SDK Worker Harness + +A self-feeding worker harness that runs indefinitely. On startup it registers five simulated tasks (`java_worker_0` through `java_worker_4`) and the `java_simulated_tasks_workflow`, then runs two background services: + +- **WorkflowGovernor** -- starts a configurable number of `java_simulated_tasks_workflow` instances per second (default 2), indefinitely. +- **SimulatedTaskWorkers** -- five task handlers, each with a codename and a default sleep duration. Each worker supports configurable delay types, failure simulation, and output generation via task input parameters. The workflow chains them in sequence: quickpulse (1s) → whisperlink (2s) → shadowfetch (3s) → ironforge (4s) → deepcrawl (5s). + +All resource names use a `java_` prefix so multiple SDK harnesses (C#, Python, Go, etc.) can coexist on the same cluster. + +## Local Run + +```bash +# From the repository root +CONDUCTOR_SERVER_URL=https://your-cluster.example.com/api \ +CONDUCTOR_AUTH_KEY=your-key \ +CONDUCTOR_AUTH_SECRET=your-secret \ +./gradlew :harness:run +``` + +## Docker + +### Build + +From the repository root: + +```bash +docker build -f harness/Dockerfile -t java-sdk-harness . +``` + +### Run + +```bash +docker run -d \ + -e CONDUCTOR_SERVER_URL=https://your-cluster.example.com/api \ + -e CONDUCTOR_AUTH_KEY=$CONDUCTOR_AUTH_KEY \ + -e CONDUCTOR_AUTH_SECRET=$CONDUCTOR_AUTH_SECRET \ + -e HARNESS_WORKFLOWS_PER_SEC=4 \ + java-sdk-harness +``` + +### Multiplatform Build and Push + +Build for both `linux/amd64` and `linux/arm64` and push to GHCR: + +```bash +docker buildx build \ + --platform linux/amd64,linux/arm64 \ + -f harness/Dockerfile \ + -t ghcr.io/conductor-oss/java-sdk/harness-worker:latest \ + --push . +``` + +After pushing a new image with the same tag, restart the K8s deployment to pull it: + +```bash +kubectl rollout restart deployment/java-sdk-harness-worker -n $NS +kubectl rollout status deployment/java-sdk-harness-worker -n $NS +``` + +## Environment Variables + +| Variable | Required | Default | Description | +|---|---|---|---| +| `CONDUCTOR_SERVER_URL` | yes | -- | Conductor API base URL | +| `CONDUCTOR_AUTH_KEY` | no | -- | Orkes auth key | +| `CONDUCTOR_AUTH_SECRET` | no | -- | Orkes auth secret | +| `HARNESS_WORKFLOWS_PER_SEC` | no | 2 | Workflows to start per second | +| `HARNESS_BATCH_SIZE` | no | 20 | Thread count per worker (controls polling concurrency) | +| `HARNESS_POLL_INTERVAL_MS` | no | 100 | Milliseconds between poll cycles | + +## Kubernetes + +See [manifests/README.md](manifests/README.md) for deployment instructions. diff --git a/harness/build.gradle b/harness/build.gradle new file mode 100644 index 000000000..f47fdd503 --- /dev/null +++ b/harness/build.gradle @@ -0,0 +1,18 @@ +plugins { + id 'java' + id 'application' +} + +application { + mainClass = 'io.orkes.conductor.harness.HarnessMain' +} + +repositories { + mavenCentral() +} + +dependencies { + implementation project(':conductor-client') + implementation project(':orkes-client') + implementation "ch.qos.logback:logback-classic:1.5.6" +} diff --git a/harness/manifests/README.md b/harness/manifests/README.md new file mode 100644 index 000000000..4a0d6fc76 --- /dev/null +++ b/harness/manifests/README.md @@ -0,0 +1,130 @@ +# Kubernetes Manifests + +This directory contains Kubernetes manifests for deploying the Java SDK harness worker to the certification clusters. + +## Prerequisites + +**Set your namespace environment variable:** +```bash +export NS=your-namespace-here +``` + +All kubectl commands below use `-n $NS` to specify the namespace. The manifests intentionally do not include hardcoded namespaces. + +## Files + +| File | Description | +|---|---| +| `deployment.yaml` | Deployment (single file, works on all clusters) | +| `configmap-aws.yaml` | Conductor URL + auth key for certification-aws | +| `configmap-azure.yaml` | Conductor URL + auth key for certification-az | +| `configmap-gcp.yaml` | Conductor URL + auth key for certification-gcp | +| `secret-conductor.yaml` | Conductor auth secret (placeholder template) | + +## Quick Start + +### 1. Create the Conductor Auth Secret + +The `CONDUCTOR_AUTH_SECRET` must be created as a Kubernetes secret before deploying. + +```bash +kubectl create secret generic conductor-credentials \ + --from-literal=auth-secret=YOUR_AUTH_SECRET \ + -n $NS +``` + +If the `conductor-credentials` secret already exists in the namespace (e.g. from the e2e-testrunner-worker or the C# harness), it can be reused as-is. + +See `secret-conductor.yaml` for more details. + +### 2. Apply the ConfigMap for Your Cluster + +```bash +# AWS +kubectl apply -f manifests/configmap-aws.yaml -n $NS + +# Azure +kubectl apply -f manifests/configmap-azure.yaml -n $NS + +# GCP +kubectl apply -f manifests/configmap-gcp.yaml -n $NS +``` + +### 3. Deploy + +```bash +kubectl apply -f manifests/deployment.yaml -n $NS +``` + +### 4. Verify + +```bash +# Check pod status +kubectl get pods -n $NS -l app=java-sdk-harness-worker + +# Watch logs +kubectl logs -n $NS -l app=java-sdk-harness-worker -f +``` + +## Building and Pushing the Image + +From the repository root: + +```bash +# Build for both amd64 and arm64 and push to GHCR +docker buildx build \ + --platform linux/amd64,linux/arm64 \ + -f harness/Dockerfile \ + -t ghcr.io/conductor-oss/java-sdk/harness-worker:latest \ + --push . +``` + +After pushing a new image with the same tag, restart the deployment to pull it: + +```bash +kubectl rollout restart deployment/java-sdk-harness-worker -n $NS +kubectl rollout status deployment/java-sdk-harness-worker -n $NS +``` + +## Tuning + +The harness worker accepts these optional environment variables (set in `deployment.yaml`): + +| Variable | Default | Description | +|---|---|---| +| `HARNESS_WORKFLOWS_PER_SEC` | 2 | Workflows to start per second | +| `HARNESS_BATCH_SIZE` | 20 | Thread count per worker (controls polling concurrency) | +| `HARNESS_POLL_INTERVAL_MS` | 100 | Milliseconds between poll cycles | + +Edit `deployment.yaml` to change these, then re-apply: + +```bash +kubectl apply -f manifests/deployment.yaml -n $NS +``` + +## Troubleshooting + +### Pod not starting + +```bash +kubectl describe pod -n $NS -l app=java-sdk-harness-worker +kubectl logs -n $NS -l app=java-sdk-harness-worker --tail=100 +``` + +### Secret not found + +```bash +kubectl get secret conductor-credentials -n $NS +``` + +## Resource Limits + +Default resource allocation: +- **Memory**: 256Mi (request) / 512Mi (limit) +- **CPU**: 100m (request) / 500m (limit) + +Adjust in `deployment.yaml` based on workload. Higher `HARNESS_WORKFLOWS_PER_SEC` values may need more CPU/memory. + +## Service + +The harness worker does **not** need a Service or Ingress. It connects to Conductor via outbound HTTP polling. All communication is outbound. diff --git a/harness/manifests/configmap-aws.yaml b/harness/manifests/configmap-aws.yaml new file mode 100644 index 000000000..1939f6ed2 --- /dev/null +++ b/harness/manifests/configmap-aws.yaml @@ -0,0 +1,13 @@ +--- +# ConfigMap for certification-aws cluster +# Contains Conductor connection details specific to this cluster +apiVersion: v1 +kind: ConfigMap +metadata: + name: java-sdk-harness-config + # namespace: xxxxx # supply this in kubectl command + labels: + app: java-sdk-harness-worker +data: + CONDUCTOR_SERVER_URL: "https://certification-aws.orkesconductor.io/api" + CONDUCTOR_AUTH_KEY: "7ba9d0ec-247b-11f1-8d42-ea3efeda41b2" diff --git a/harness/manifests/configmap-azure.yaml b/harness/manifests/configmap-azure.yaml new file mode 100644 index 000000000..ac8066750 --- /dev/null +++ b/harness/manifests/configmap-azure.yaml @@ -0,0 +1,13 @@ +--- +# ConfigMap for certification-az cluster +# Contains Conductor connection details specific to this cluster +apiVersion: v1 +kind: ConfigMap +metadata: + name: java-sdk-harness-config + # namespace: xxxxx # supply this in kubectl command + labels: + app: java-sdk-harness-worker +data: + CONDUCTOR_SERVER_URL: "https://certification-az.orkesconductor.io/api" + CONDUCTOR_AUTH_KEY: "bf170d61-2797-11f1-833e-4ae04d100a03" diff --git a/harness/manifests/configmap-gcp.yaml b/harness/manifests/configmap-gcp.yaml new file mode 100644 index 000000000..5d020f949 --- /dev/null +++ b/harness/manifests/configmap-gcp.yaml @@ -0,0 +1,13 @@ +--- +# ConfigMap for certification-gcp cluster +# Contains Conductor connection details specific to this cluster +apiVersion: v1 +kind: ConfigMap +metadata: + name: java-sdk-harness-config + # namespace: xxxxx # supply this in kubectl command + labels: + app: java-sdk-harness-worker +data: + CONDUCTOR_SERVER_URL: "https://certification-gcp.orkesconductor.com/api" + CONDUCTOR_AUTH_KEY: "e6c1ac61-286b-11f1-be01-c682b5750c3a" diff --git a/harness/manifests/deployment.yaml b/harness/manifests/deployment.yaml new file mode 100644 index 000000000..34ed6bc08 --- /dev/null +++ b/harness/manifests/deployment.yaml @@ -0,0 +1,85 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: java-sdk-harness-worker + # namespace: xxxxx # supply this in kubectl command + labels: + app: java-sdk-harness-worker +spec: + replicas: 2 + selector: + matchLabels: + app: java-sdk-harness-worker + template: + metadata: + labels: + app: java-sdk-harness-worker + spec: + containers: + - name: harness + image: ghcr.io/conductor-oss/java-sdk/harness-worker:latest + imagePullPolicy: Always + env: + # === CONDUCTOR CONNECTION (from per-cloud ConfigMap) === + + - name: CONDUCTOR_SERVER_URL + valueFrom: + configMapKeyRef: + name: java-sdk-harness-config + key: CONDUCTOR_SERVER_URL + + - name: CONDUCTOR_AUTH_KEY + valueFrom: + configMapKeyRef: + name: java-sdk-harness-config + key: CONDUCTOR_AUTH_KEY + + - name: CONDUCTOR_AUTH_SECRET + valueFrom: + secretKeyRef: + name: conductor-credentials + key: auth-secret + + # === HARNESS TUNING (defaults match HarnessMain.java) === + # Adjust these per-cluster if needed, or move to ConfigMap. + + - name: HARNESS_WORKFLOWS_PER_SEC + value: "1" + + - name: HARNESS_BATCH_SIZE + value: "20" + + - name: HARNESS_POLL_INTERVAL_MS + value: "100" + + resources: + requests: + memory: "256Mi" + cpu: "100m" + limits: + memory: "512Mi" + cpu: "500m" + + livenessProbe: + exec: + command: + - /bin/sh + - -c + - test -e /proc/1/cmdline + initialDelaySeconds: 30 + periodSeconds: 30 + timeoutSeconds: 5 + failureThreshold: 3 + + readinessProbe: + exec: + command: + - /bin/sh + - -c + - test -e /proc/1/cmdline + initialDelaySeconds: 10 + periodSeconds: 10 + timeoutSeconds: 5 + failureThreshold: 3 + + restartPolicy: Always diff --git a/harness/manifests/secret-conductor.yaml b/harness/manifests/secret-conductor.yaml new file mode 100644 index 000000000..469ee769f --- /dev/null +++ b/harness/manifests/secret-conductor.yaml @@ -0,0 +1,37 @@ +# Conductor API Secret +# This secret contains the Conductor AUTH SECRET for authentication. +# The auth key (key ID) is NOT secret and lives in the per-cloud ConfigMap. +# Create this secret before deploying the worker. + +apiVersion: v1 +kind: Secret +metadata: + name: conductor-credentials + # namespace: xxxxx # supply this in kubectl command + labels: + app: java-sdk-harness-worker +type: Opaque +stringData: + # TODO: Replace with your actual Conductor AUTH SECRET (not the key ID) + auth-secret: "YOUR_CONDUCTOR_AUTH_SECRET_HERE" + +--- +# Instructions for creating this secret: +# +# Option 1 - kubectl imperative command (recommended): +# +# kubectl create secret generic conductor-credentials \ +# --from-literal=auth-secret=YOUR_AUTH_SECRET \ +# -n $NS +# +# Option 2 - edit this file and apply: +# +# 1. Replace YOUR_CONDUCTOR_AUTH_SECRET_HERE with the real secret value +# 2. kubectl apply -f manifests/secret-conductor.yaml -n $NS +# +# Note: The auth key (key ID) is not secret and is stored in the per-cloud +# ConfigMap (configmap-aws.yaml, configmap-azure.yaml, configmap-gcp.yaml). +# +# Note: If the e2e-testrunner-worker already runs in the same namespace, the +# conductor-credentials secret may already exist and can be reused as-is +# (same credential, same secret name). diff --git a/harness/src/main/java/io/orkes/conductor/harness/HarnessMain.java b/harness/src/main/java/io/orkes/conductor/harness/HarnessMain.java new file mode 100644 index 000000000..afdab121c --- /dev/null +++ b/harness/src/main/java/io/orkes/conductor/harness/HarnessMain.java @@ -0,0 +1,138 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.orkes.conductor.harness; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.netflix.conductor.client.automator.TaskRunnerConfigurer; +import com.netflix.conductor.client.http.ConductorClient; +import com.netflix.conductor.client.http.MetadataClient; +import com.netflix.conductor.client.http.TaskClient; +import com.netflix.conductor.client.http.WorkflowClient; +import com.netflix.conductor.client.worker.Worker; +import com.netflix.conductor.common.metadata.tasks.TaskDef; +import com.netflix.conductor.common.metadata.tasks.TaskType; +import com.netflix.conductor.common.metadata.workflow.WorkflowDef; +import com.netflix.conductor.common.metadata.workflow.WorkflowTask; + +import io.orkes.conductor.client.ApiClient; + +public class HarnessMain { + + private static final Logger log = LoggerFactory.getLogger(HarnessMain.class); + + private static final String WORKFLOW_NAME = "java_simulated_tasks_workflow"; + + private static final String[][] SIMULATED_WORKERS = { + {"java_worker_0", "quickpulse", "1"}, + {"java_worker_1", "whisperlink", "2"}, + {"java_worker_2", "shadowfetch", "3"}, + {"java_worker_3", "ironforge", "4"}, + {"java_worker_4", "deepcrawl", "5"}, + }; + + public static void main(String[] args) throws InterruptedException { + ConductorClient client = ApiClient.builder().useEnvVariables(true).readTimeout(10_000).connectTimeout(10_000) + .writeTimeout(10_000).build(); + + int workflowsPerSec = envInt("HARNESS_WORKFLOWS_PER_SEC", 2); + int batchSize = envInt("HARNESS_BATCH_SIZE", 20); + int pollIntervalMs = envInt("HARNESS_POLL_INTERVAL_MS", 100); + + registerMetadata(client); + + List workers = new ArrayList<>(); + for (String[] entry : SIMULATED_WORKERS) { + workers.add(new SimulatedTaskWorker(entry[0], entry[1], Integer.parseInt(entry[2]), batchSize, + pollIntervalMs)); + } + + TaskClient taskClient = new TaskClient(client); + Map threadCounts = + workers.stream().collect(Collectors.toMap(Worker::getTaskDefName, w -> batchSize)); + + TaskRunnerConfigurer configurer = + new TaskRunnerConfigurer.Builder(taskClient, workers).withTaskThreadCount(threadCounts).build(); + configurer.init(); + + WorkflowClient workflowClient = new WorkflowClient(client); + WorkflowGovernor governor = new WorkflowGovernor(workflowClient, WORKFLOW_NAME, workflowsPerSec); + governor.start(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + log.info("Shutting down harness..."); + governor.shutdown(); + configurer.shutdown(); + })); + + Thread.currentThread().join(); + } + + private static void registerMetadata(ConductorClient client) { + MetadataClient metadataClient = new MetadataClient(client); + + List taskDefs = new ArrayList<>(); + for (String[] entry : SIMULATED_WORKERS) { + String taskName = entry[0]; + String codename = entry[1]; + int sleepSeconds = Integer.parseInt(entry[2]); + + TaskDef td = new TaskDef(taskName); + td.setDescription( + "Java SDK harness simulated task (" + codename + ", default delay " + sleepSeconds + "s)"); + td.setRetryCount(1); + td.setTimeoutSeconds(300); + td.setResponseTimeoutSeconds(300); + taskDefs.add(td); + } + metadataClient.registerTaskDefs(taskDefs); + log.info("Registered {} task definitions", taskDefs.size()); + + WorkflowDef workflowDef = new WorkflowDef(); + workflowDef.setName(WORKFLOW_NAME); + workflowDef.setVersion(1); + workflowDef.setDescription("Java SDK harness simulated task workflow"); + workflowDef.setOwnerEmail("java-sdk-harness@conductor.io"); + + List wfTasks = new ArrayList<>(); + for (String[] entry : SIMULATED_WORKERS) { + WorkflowTask wt = new WorkflowTask(); + wt.setName(entry[0]); + wt.setTaskReferenceName(entry[1]); + wt.setType(TaskType.SIMPLE.name()); + wfTasks.add(wt); + } + workflowDef.setTasks(wfTasks); + + metadataClient.updateWorkflowDefs(List.of(workflowDef)); + log.info("Registered workflow definition: {}", WORKFLOW_NAME); + } + + private static int envInt(String name, int defaultValue) { + String value = System.getenv(name); + if (value == null || value.isBlank()) { + return defaultValue; + } + try { + return Integer.parseInt(value.trim()); + } catch (NumberFormatException e) { + return defaultValue; + } + } +} diff --git a/harness/src/main/java/io/orkes/conductor/harness/SimulatedTaskWorker.java b/harness/src/main/java/io/orkes/conductor/harness/SimulatedTaskWorker.java new file mode 100644 index 000000000..02757884a --- /dev/null +++ b/harness/src/main/java/io/orkes/conductor/harness/SimulatedTaskWorker.java @@ -0,0 +1,279 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.orkes.conductor.harness; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.netflix.conductor.client.worker.Worker; +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.metadata.tasks.TaskResult; + +public class SimulatedTaskWorker implements Worker { + + private static final Logger log = LoggerFactory.getLogger(SimulatedTaskWorker.class); + + private static final String ALPHANUMERIC_CHARS = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + + private static final String INSTANCE_ID; + + static { + String hostname = System.getenv("HOSTNAME"); + INSTANCE_ID = (hostname != null && !hostname.isBlank()) ? hostname + : UUID.randomUUID().toString().replace("-", "").substring(0, 8); + } + + private final String taskName; + private final String codename; + private final int defaultDelayMs; + private final int batchSize; + private final int pollIntervalMs; + private final String workerId; + private final Random random = new Random(); + + public SimulatedTaskWorker(String taskName, String codename, int sleepSeconds, int batchSize, int pollIntervalMs) { + this.taskName = taskName; + this.codename = codename; + this.defaultDelayMs = sleepSeconds * 1000; + this.batchSize = batchSize; + this.pollIntervalMs = pollIntervalMs; + this.workerId = taskName + "-" + INSTANCE_ID; + + log.info("[{}] Initialized worker [workerId={}, codename={}, batchSize={}, pollInterval={}ms]", taskName, + workerId, codename, batchSize, pollIntervalMs); + } + + @Override + public String getTaskDefName() { + return taskName; + } + + @Override + public String getIdentity() { + return workerId; + } + + @Override + public int getPollingInterval() { + return pollIntervalMs; + } + + @Override + public TaskResult execute(Task task) { + Map input = task.getInputData() != null ? task.getInputData() : new HashMap<>(); + String taskId = task.getTaskId(); + int taskIndex = getOrDefault(input, "taskIndex", -1); + + log.info("[{}] Starting simulated task [id={}, index={}, codename={}]", taskName, taskId, taskIndex, codename); + + long startTime = System.currentTimeMillis(); + + String delayType = getOrDefault(input, "delayType", "fixed"); + int minDelay = getOrDefault(input, "minDelay", defaultDelayMs); + int maxDelay = getOrDefault(input, "maxDelay", minDelay + 100); + int meanDelay = getOrDefault(input, "meanDelay", (minDelay + maxDelay) / 2); + int stdDeviation = getOrDefault(input, "stdDeviation", 30); + double successRate = getOrDefault(input, "successRate", 1.0); + String failureMode = getOrDefault(input, "failureMode", "random"); + int outputSize = getOrDefault(input, "outputSize", 1024); + + long delayMs = 0L; + if (!"wait".equalsIgnoreCase(delayType)) { + delayMs = calculateDelay(delayType, minDelay, maxDelay, meanDelay, stdDeviation); + + log.info("[{}] Simulated task [id={}, index={}] sleeping for {} ms", taskName, taskId, taskIndex, delayMs); + try { + TimeUnit.MILLISECONDS.sleep(delayMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Task interrupted", e); + } + } + + if (!shouldTaskSucceed(successRate, failureMode, input)) { + log.info("[{}] Simulated task [id={}, index={}] failed as configured", taskName, taskId, taskIndex); + throw new SimulatedTaskException("Simulated task failure based on configuration"); + } + + long elapsedTime = System.currentTimeMillis() - startTime; + Map output = generateOutput(input, taskId, taskIndex, delayMs, elapsedTime, outputSize); + + TaskResult result = new TaskResult(task); + result.setStatus(TaskResult.Status.COMPLETED); + result.setOutputData(output); + return result; + } + + private long calculateDelay(String delayType, int minDelay, int maxDelay, int meanDelay, int stdDeviation) { + switch (delayType.toLowerCase()) { + case "fixed": + return minDelay; + + case "random": + return minDelay + random.nextInt(Math.max(1, maxDelay - minDelay + 1)); + + case "normal": + double gaussian = random.nextGaussian(); + long delay = Math.round(meanDelay + gaussian * stdDeviation); + return Math.max(1, delay); + + case "exponential": + double exp = -meanDelay * Math.log(1 - random.nextDouble()); + return Math.max(minDelay, Math.min(maxDelay, (long) exp)); + + default: + return minDelay; + } + } + + private boolean shouldTaskSucceed(double successRate, String failureMode, Map input) { + Boolean forceSuccess = getOrDefault(input, "forceSuccess", null); + if (forceSuccess != null) { + return forceSuccess; + } + + Boolean forceFail = getOrDefault(input, "forceFail", null); + if (forceFail != null) { + return !forceFail; + } + + switch (failureMode.toLowerCase()) { + case "random": + return random.nextDouble() < successRate; + + case "conditional": + int taskIndex = getOrDefault(input, "taskIndex", -1); + if (taskIndex >= 0) { + Object failIndexesObj = input.get("failIndexes"); + if (failIndexesObj instanceof Iterable) { + for (Object index : (Iterable) failIndexesObj) { + if (index != null && index.toString().equals(String.valueOf(taskIndex))) { + return false; + } + } + } + + int failEvery = getOrDefault(input, "failEvery", 0); + if (failEvery > 0 && taskIndex % failEvery == 0) { + return false; + } + } + return random.nextDouble() < successRate; + + case "sequential": + int attempt = getOrDefault(input, "attempt", 1); + int failUntilAttempt = getOrDefault(input, "failUntilAttempt", 2); + return attempt >= failUntilAttempt; + + default: + return random.nextDouble() < successRate; + } + } + + private Map generateOutput(Map input, String taskId, int taskIndex, long delayMs, + long elapsedTimeMs, int outputSize) { + Map output = new HashMap<>(); + + output.put("taskId", taskId); + output.put("taskIndex", taskIndex); + output.put("codename", codename); + output.put("status", "completed"); + output.put("configuredDelayMs", delayMs); + output.put("actualExecutionTimeMs", elapsedTimeMs); + output.put("a_or_b", random.nextInt(100) > 20 ? "a" : "b"); + output.put("c_or_d", random.nextInt(100) > 33 ? "c" : "d"); + + if (getOrDefault(input, "includeInput", false)) { + output.put("input", input); + } + + Object previousTaskOutput = input.get("previousTaskOutput"); + if (previousTaskOutput != null) { + output.put("previousTaskData", previousTaskOutput); + } + + if (outputSize > 0) { + output.put("data", generateRandomData(outputSize)); + } + + Map outputTemplate = getMapOrDefault(input, "outputTemplate"); + if (outputTemplate != null) { + output.putAll(outputTemplate); + } + + return output; + } + + private String generateRandomData(int size) { + if (size <= 0) { + return ""; + } + + StringBuilder sb = new StringBuilder(size); + for (int i = 0; i < size; i++) { + sb.append(ALPHANUMERIC_CHARS.charAt(random.nextInt(ALPHANUMERIC_CHARS.length()))); + } + return sb.toString(); + } + + @SuppressWarnings("unchecked") + private T getOrDefault(Map map, String key, T defaultValue) { + Object value = map.get(key); + if (value == null) { + return defaultValue; + } + + try { + if (defaultValue instanceof Integer && value instanceof Number) { + return (T) Integer.valueOf(((Number) value).intValue()); + } else if (defaultValue instanceof Double && value instanceof Number) { + return (T) Double.valueOf(((Number) value).doubleValue()); + } else if (defaultValue instanceof Boolean) { + if (value instanceof Boolean) { + return (T) value; + } + if (value instanceof String) { + return (T) Boolean.valueOf(Boolean.parseBoolean((String) value)); + } + } else if (defaultValue instanceof String) { + return (T) value.toString(); + } + return (T) value; + } catch (ClassCastException e) { + return defaultValue; + } + } + + @SuppressWarnings("unchecked") + private Map getMapOrDefault(Map map, String key) { + Object value = map.get(key); + if (value instanceof Map) { + return (Map) value; + } + return null; + } + + public static class SimulatedTaskException extends RuntimeException { + + public SimulatedTaskException(String message) { + super(message); + } + } +} diff --git a/harness/src/main/java/io/orkes/conductor/harness/WorkflowGovernor.java b/harness/src/main/java/io/orkes/conductor/harness/WorkflowGovernor.java new file mode 100644 index 000000000..0a152e248 --- /dev/null +++ b/harness/src/main/java/io/orkes/conductor/harness/WorkflowGovernor.java @@ -0,0 +1,74 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.orkes.conductor.harness; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.netflix.conductor.client.http.WorkflowClient; +import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest; + +public class WorkflowGovernor { + + private static final Logger log = LoggerFactory.getLogger(WorkflowGovernor.class); + + private final WorkflowClient workflowClient; + private final String workflowName; + private final int workflowsPerSecond; + private final ScheduledExecutorService scheduler; + + public WorkflowGovernor(WorkflowClient workflowClient, String workflowName, int workflowsPerSecond) { + this.workflowClient = workflowClient; + this.workflowName = workflowName; + this.workflowsPerSecond = workflowsPerSecond; + this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "workflow-governor"); + t.setDaemon(true); + return t; + }); + } + + public void start() { + log.info("WorkflowGovernor started: workflow={}, rate={}/sec", workflowName, workflowsPerSecond); + scheduler.scheduleAtFixedRate(this::tick, 0, 1, TimeUnit.SECONDS); + } + + public void shutdown() { + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + scheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + private void tick() { + try { + for (int i = 0; i < workflowsPerSecond; i++) { + StartWorkflowRequest request = new StartWorkflowRequest(); + request.setName(workflowName); + workflowClient.startWorkflow(request); + } + log.info("Governor: started {} workflow(s)", workflowsPerSecond); + } catch (Exception e) { + log.error("Governor: error starting workflows", e); + } + } +} diff --git a/harness/src/main/resources/logback.xml b/harness/src/main/resources/logback.xml new file mode 100644 index 000000000..0aab35aab --- /dev/null +++ b/harness/src/main/resources/logback.xml @@ -0,0 +1,11 @@ + + + + %d{HH:mm:ss.SSS} %-5level %logger{36} - %msg%n + + + + + + + diff --git a/settings.gradle b/settings.gradle index 85acea3e5..3d55ec611 100644 --- a/settings.gradle +++ b/settings.gradle @@ -6,3 +6,4 @@ include 'java-sdk' include 'tests' include 'orkes-client' include 'orkes-spring' +include 'harness'