Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions adr/20260310-seqera-dataset-filesystem.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# NIO Filesystem for Seqera Platform Datasets

- Authors: Jorge Ejarque
- Status: draft
- Date: 2026-03-10
- Tags: nio, filesystem, seqera, datasets, nf-tower

Technical Story: Enable Nextflow pipelines to read Seqera Platform datasets as ordinary file paths using `seqera://` URIs.

## Summary

Add a Java NIO `FileSystemProvider` to the `nf-tower` plugin that registers the `seqera://` scheme, allowing pipelines to reference Seqera Platform datasets (CSV/TSV) as standard file paths without manual download steps. The implementation reuses the existing `TowerClient` for all HTTP communication, inheriting authentication and retry behaviour.

## Problem Statement

Nextflow users managing datasets on the Seqera Platform must currently download dataset files manually or through custom scripts before referencing them in pipelines. There is no native integration between Nextflow's file abstraction and the Seqera Platform dataset API. This creates friction in workflows where datasets are the primary input and forces users to handle authentication, versioning, and file staging outside the pipeline definition.

## Goals or Decision Drivers

- Transparent access to Seqera Platform datasets using standard Nextflow file path syntax
- Reuse of existing nf-tower plugin infrastructure (authentication, HTTP client, retry/backoff)
- Hierarchical path browsing matching the platform's org/workspace/dataset structure
- Extensible architecture that can support future Seqera-managed resource types (e.g. data-links)
- No new plugin or module — feature lives within nf-tower

## Non-goals

- Streaming large datasets — the Platform API does not support streaming; content is fully buffered on download
- Implementing resource types beyond `datasets` — only the extensible architecture is required
- Local caching across pipeline runs — Nextflow's standard task staging handles caching
- Dataset management operations (delete, rename) — the filesystem is read-only in the initial implementation

## Considered Options

### Option 1: Standalone plugin with dedicated HTTP client

A new `nf-seqera-fs` plugin with its own HTTP client configuration and authentication setup.

- Good, because it isolates the filesystem code from the nf-tower plugin
- Bad, because it duplicates authentication configuration and HTTP client setup
- Bad, because two separate HTTP clients sharing a refresh token would corrupt each other's auth state

### Option 2: NIO filesystem within nf-tower using TowerClient delegation

Add the filesystem to nf-tower, delegating all HTTP through the existing `TowerClient` singleton via a typed `SeqeraDatasetClient` wrapper.

- Good, because it shares authentication and token refresh with TowerClient
- Good, because it reuses existing retry/backoff configuration
- Good, because no new dependencies are needed

### Option 3: Direct HxClient usage within nf-tower

Add the filesystem to nf-tower but use `HxClient` directly rather than going through TowerClient.

- Good, because it gives full control over request construction
- Bad, because exposing HxClient internals couples the filesystem to implementation details
- Bad, because token refresh coordination with TowerClient becomes manual

## Solution or decision outcome

Option 2 — NIO filesystem within nf-tower using TowerClient delegation. All HTTP calls go through `TowerClient.sendApiRequest()`, ensuring a single point of authentication and retry logic.

## Rationale & discussion

### Path Hierarchy

The `seqera://` path encodes the Platform's organizational structure directly:

```
seqera:// → ROOT (directory, depth 0)
└── <org>/ → ORGANIZATION (directory, depth 1)
└── <workspace>/ → WORKSPACE (directory, depth 2)
└── datasets/ → RESOURCE TYPE (directory, depth 3)
└── <name>[@<version>] → DATASET (file, depth 4)
```

Each level is a directory except the leaf dataset, which is a file. Version pinning uses an `@version` suffix on the dataset name segment (e.g. `seqera://acme/research/datasets/samples@2`). Without it, the latest non-disabled version is resolved.

### Name-to-ID Resolution

The path uses human-readable names but the Platform API requires numeric IDs. Resolution is built from two API calls at filesystem initialization:

1. `GET /user-info` → obtain `userId`
2. `GET /user/{userId}/workspaces` → returns all accessible org/workspace pairs

This single source provides both directory listing content and name→ID mapping. Results are cached in `SeqeraFileSystem` with invalidation on write operations. `GET /orgs` is intentionally not used as it returns all platform orgs, not scoped to user membership.

### Component Structure

```
plugins/nf-tower/src/main/io/seqera/tower/plugin/
├── fs/ ← NIO layer
│ ├── SeqeraFileSystemProvider ← FileSystemProvider (scheme: "seqera")
│ ├── SeqeraFileSystem ← FileSystem with org/workspace/dataset caches
│ ├── SeqeraPath ← Path implementation (depth 0–4)
│ ├── SeqeraFileAttributes ← BasicFileAttributes
│ ├── SeqeraPathFactory ← PF4J FileSystemPathFactory extension
│ └── DatasetInputStream ← SeekableByteChannel over InputStream
├── dataset/ ← API client layer
│ ├── SeqeraDatasetClient ← Typed HTTP client wrapping TowerClient
│ ├── DatasetDto ← Dataset API response model
│ ├── DatasetVersionDto ← Version API response model
│ ├── OrgAndWorkspaceDto ← Org/workspace list model
│ └── WorkspaceOrgDto ← Workspace/org mapping model
└── resources/META-INF/services/
└── java.nio.file.spi.FileSystemProvider
```

### Key Design Decisions

1. **TowerClient delegation**: `SeqeraDatasetClient` delegates all HTTP through `TowerFactory.client()` → `TowerClient.sendApiRequest()`. This ensures shared authentication state and avoids the token refresh corruption that would occur with separate HTTP client instances.

2. **One filesystem per JVM**: `SeqeraFileSystemProvider` maintains a single `SeqeraFileSystem` keyed by scheme. This matches the `TowerClient` singleton-per-session pattern.

3. **Read-only initial scope**: The filesystem reports `isReadOnly()=true`. Write support (dataset upload via multipart POST) is deferred to a future iteration.

4. **Download filename constraint**: The Platform API's download endpoint (`GET /datasets/{id}/v/{version}/n/{fileName}`) requires the exact filename from upload time. The implementation always resolves `DatasetVersionDto.fileName` from `GET /datasets/{id}/versions` before constructing the download URL.

5. **Extensible resource types**: The path hierarchy reserves depth 3 for a resource type segment (currently only `datasets`). Adding support for data-links or other resource types requires only a new handler at the directory listing and I/O layers, with no changes to path resolution or authentication.

6. **Thread safety**: `SeqeraFileSystem` cache methods and `SeqeraFileSystemProvider` lifecycle methods are `synchronized`. The filesystem map uses `LinkedHashMap` with external synchronization rather than `ConcurrentHashMap`, matching the low-contention access pattern.

### Limitations

- **No size metadata**: `SeqeraFileAttributes.size()` returns 0 for all paths because the Platform API does not expose content length in dataset metadata.
- **Single endpoint per JVM**: The filesystem key is scheme-only; concurrent access to different Platform endpoints in the same JVM is not supported.

### Streaming Downloads

Dataset downloads use `TowerClient.sendStreamingRequest()` which calls `HxClient.sendAsStream()` — the response body is returned as an `InputStream` streamed directly from the HTTP connection. This avoids the triple-buffering problem (`String` → `getBytes()` → `ByteArrayInputStream`) that would otherwise consume ~40 MB heap per 10 MB dataset. The `HxClient.sendAsStream()` method goes through the same `sendWithRetry()` path as `sendAsString()`, so retry logic and token refresh are preserved.

## Links

- [Spec](../specs/260310-seqera-dataset-fs/spec.md)
- [Implementation plan](../specs/260310-seqera-dataset-fs/plan.md)
- [Data model](../specs/260310-seqera-dataset-fs/data-model.md)
3 changes: 2 additions & 1 deletion modules/nf-commons/src/main/nextflow/file/FileHelper.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ class FileHelper {
return asPath(toPathURI(str))
}

static final private Map<String,String> PLUGINS_MAP = [s3:'nf-amazon', gs:'nf-google', az:'nf-azure']
static final private Map<String,String> PLUGINS_MAP = [s3:'nf-amazon', gs:'nf-google', az:'nf-azure', seqera:'nf-tower']

static final private Map<String,Boolean> SCHEME_CHECKED = new HashMap<>()

Expand All @@ -373,6 +373,7 @@ class FileHelper {
// find out the default plugin for the given scheme and try to load it
final pluginId = PLUGINS_MAP.get(scheme)
if( pluginId ) try {
log.debug "Detected required plugin '$pluginId'"
if( Plugins.startIfMissing(pluginId) ) {
log.debug "Started plugin '$pluginId' required to handle file: $str"
// return true to signal a new plugin was loaded
Expand Down
6 changes: 5 additions & 1 deletion plugins/nf-tower/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ nextflowPlugin {
'io.seqera.tower.plugin.TowerFactory',
'io.seqera.tower.plugin.TowerFusionToken',
'io.seqera.tower.plugin.auth.AuthCommandImpl',
'io.seqera.tower.plugin.launch.LaunchCommandImpl'
'io.seqera.tower.plugin.launch.LaunchCommandImpl',
'io.seqera.tower.plugin.fs.SeqeraPathFactory'
]
}

Expand All @@ -57,6 +58,9 @@ dependencies {
compileOnly 'io.seqera:lib-httpx:2.1.0'
api 'io.seqera:lib-platform-oidc:0.1.0'

api('io.seqera:tower-api:1.121.0') {
exclude group: 'io.micronaut.servlet'
}
api "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.21.1"
api "com.fasterxml.jackson.core:jackson-databind:2.21.1"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,44 +19,27 @@ package io.seqera.tower.plugin
import groovy.json.JsonSlurper
import groovy.transform.Memoized
import groovy.util.logging.Slf4j
import io.seqera.http.HxClient
import nextflow.Const
import nextflow.SysEnv
import nextflow.config.ConfigBuilder

import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.time.Duration
import nextflow.util.Duration

@Slf4j
class BaseCommandImpl {

private static final int API_TIMEOUT_MS = 10_000
protected static final int API_TIMEOUT_MS = 10_000

/**
* Provides common API operations for Seqera Platform
*/
protected TowerCommonApi commonApi

BaseCommandImpl(){
this.commonApi = new TowerCommonApi()
}

BaseCommandImpl( TowerCommonApi commonApi ) {
this.commonApi = commonApi
}

/**
* Creates an HxClient instance with optional authentication token.
* Creates a TowerClient instance with optional authentication token.
*
* @param apiUrl Seqera Platform API url
* @param accessToken Optional personal access token for authentication (PAT)
* @return Configured HxClient instance with timeout settings
* @return Configured TowerClient instance with timeout settings
*/
@Memoized
protected HxClient createHttpClient(String accessToken = null) {
return HxClient.newBuilder()
.connectTimeout(Duration.ofMillis(API_TIMEOUT_MS))
.bearerToken(accessToken)
.build()
protected TowerClient createTowerClient(String apiUrl, String accessToken) {
final env = SysEnv.get()
return new TowerClient( new TowerConfig( [accessToken: accessToken, endpoint: apiUrl, httpConnectTimeout: Duration.of(API_TIMEOUT_MS)], env), env)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel this can be improved adding TowerClient constructor or a factory method

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this change would imply an important refactor of auth and launch commands. This method is called in several places and the arguments can change depending on the code. For instance, in auth command, it is first called with the login token and later with the configured token. I prefer to leave it out of the scope of the PR.

}

/**
Expand All @@ -73,69 +56,26 @@ class BaseCommandImpl {
return builder.buildConfigObject().flatten()
}

protected List listUserWorkspaces(HxClient client, String endpoint, String userId) {
final url = "${endpoint}/user/${userId}/workspaces"
log.debug "Platform list workspaces - GET ${url}"
final request = HttpRequest.newBuilder()
.uri(URI.create(url))
.GET()
.build()

final response = client.send(request, HttpResponse.BodyHandlers.ofString())

if( response.statusCode() != 200 ) {
final error = response.body() ?: "HTTP ${response.statusCode()}"
throw new RuntimeException("Failed to get workspaces: ${error}")
}

final json = new JsonSlurper().parseText(response.body()) as Map
final orgsAndWorkspaces = json.orgsAndWorkspaces as List

return orgsAndWorkspaces.findAll { ((Map) it).workspaceId != null }
protected List<Map> listUserWorkspaces(TowerClient client, String userId) {
return client.listUserWorkspacesAndOrgs(userId).findAll { ((Map) it).workspaceId != null }
}

protected List listComputeEnvironments(HxClient client, String endpoint, String workspaceId) {
final uri = workspaceId
? "${endpoint}/compute-envs?workspaceId=${workspaceId}"
: "${endpoint}/compute-envs"
log.debug "Platform list compute env - GET ${uri}"

final request = HttpRequest.newBuilder()
.uri(URI.create(uri))
.GET()
.build()

final response = client.send(request, HttpResponse.BodyHandlers.ofString())

if( response.statusCode() != 200 ) {
final error = response.body() ?: "HTTP ${response.statusCode()}"
throw new RuntimeException("Failed to get compute environments: ${error}")
protected List listComputeEnvironments(TowerClient client, String workspaceId) {
try {
final json = client.apiGet("/compute-envs", workspaceId ? [workspaceId: workspaceId] : [:])
return json.computeEnvs as List ?: []
} catch ( Exception e ) {
throw new RuntimeException("Failed to get compute environments: ${e.message}", e)
}

final json = new JsonSlurper().parseText(response.body()) as Map
return json.computeEnvs as List ?: []
}

protected Map getComputeEnvironment(HxClient client, String endpoint, String computeEnvId, String workspaceId) {
final uri = workspaceId ?
"${endpoint}/compute-envs/${computeEnvId}?workspaceId=${workspaceId}" :
"${endpoint}/compute-envs"
log.debug "Platform get compute env - GET ${uri}"

final request = HttpRequest.newBuilder()
.uri(URI.create(uri))
.GET()
.build()

final response = client.send(request, HttpResponse.BodyHandlers.ofString())

if( response.statusCode() != 200 ) {
final error = response.body() ?: "HTTP ${response.statusCode()}"
throw new RuntimeException("Failed to get compute environment: ${error}")
protected Map getComputeEnvironment(TowerClient client, String computeEnvId, String workspaceId) {
try {
final json = client.apiGet(workspaceId ? "/compute-envs/${computeEnvId}" : "/compute-envs", workspaceId ? [workspaceId: workspaceId] : [:])
return unifyComputeEnvDescription(json.computeEnv as Map ?: [:])
} catch ( Exception e ) {
throw new RuntimeException("Failed to get compute environments: ${e.message}", e)
}

final json = new JsonSlurper().parseText(response.body()) as Map
return unifyComputeEnvDescription(json.computeEnv as Map ?: [:])
}

private Map unifyComputeEnvDescription(Map computeEnv) {
Expand Down
Loading
Loading