diff --git a/cmd/server/main-server.go b/cmd/server/main-server.go index 70c8b3a005..dc56834e05 100644 --- a/cmd/server/main-server.go +++ b/cmd/server/main-server.go @@ -384,7 +384,6 @@ func shutdownActivityUpdate() { func createMainWshClient() { rpc := wshserver.GetMainRpcClient() - wshfs.RpcClient = rpc wshutil.DefaultRouter.RegisterTrustedLeaf(rpc, wshutil.DefaultRoute) wps.Broker.SetClient(wshutil.DefaultRouter) localInitialEnv := envutil.PruneInitialEnv(envutil.SliceToMap(os.Environ())) @@ -393,6 +392,8 @@ func createMainWshClient() { localConnWsh := wshutil.MakeWshRpc(wshrpc.RpcContext{Conn: wshrpc.LocalConnName}, remoteImpl, "conn:local") go wshremote.RunSysInfoLoop(localConnWsh, wshrpc.LocalConnName) wshutil.DefaultRouter.RegisterTrustedLeaf(localConnWsh, wshutil.MakeConnectionRouteId(wshrpc.LocalConnName)) + wshfs.RpcClient = localConnWsh + wshfs.RpcClientRouteId = wshutil.MakeConnectionRouteId(wshrpc.LocalConnName) } func grabAndRemoveEnvVars() error { diff --git a/cmd/wsh/cmd/wshcmd-connserver.go b/cmd/wsh/cmd/wshcmd-connserver.go index 8fde91dc3c..1f892a24ce 100644 --- a/cmd/wsh/cmd/wshcmd-connserver.go +++ b/cmd/wsh/cmd/wshcmd-connserver.go @@ -183,7 +183,7 @@ func runListener(listener net.Listener, router *wshutil.WshRouter) { } } -func setupConnServerRpcClientWithRouter(router *wshutil.WshRouter, sockName string) (*wshutil.WshRpc, error) { +func setupConnServerRpcClientWithRouter(router *wshutil.WshRouter, sockName string) (*wshutil.WshRpc, string, error) { routeId := wshutil.MakeConnectionRouteId(connServerConnName) rpcCtx := wshrpc.RpcContext{ RouteId: routeId, @@ -196,7 +196,7 @@ func setupConnServerRpcClientWithRouter(router *wshutil.WshRouter, sockName stri connServerClient := wshutil.MakeWshRpc(rpcCtx, wshremote.MakeRemoteRpcServerImpl(os.Stdout, router, bareClient, false, connServerInitialEnv, sockName), routeId) router.RegisterTrustedLeaf(connServerClient, routeId) - return connServerClient, nil + return connServerClient, routeId, nil } func serverRunRouter() error { @@ -236,11 +236,12 @@ func serverRunRouter() error { sockName := getRemoteDomainSocketName() // setup the connserver rpc client first - client, err := setupConnServerRpcClientWithRouter(router, sockName) + client, bareRouteId, err := setupConnServerRpcClientWithRouter(router, sockName) if err != nil { return fmt.Errorf("error setting up connserver rpc client: %v", err) } wshfs.RpcClient = client + wshfs.RpcClientRouteId = bareRouteId log.Printf("trying to get JWT public key") @@ -360,11 +361,12 @@ func serverRunRouterDomainSocket(jwtToken string) error { log.Printf("got JWT public key") // now setup the connserver rpc client - client, err := setupConnServerRpcClientWithRouter(router, sockName) + client, bareRouteId, err := setupConnServerRpcClientWithRouter(router, sockName) if err != nil { return fmt.Errorf("error setting up connserver rpc client: %v", err) } wshfs.RpcClient = client + wshfs.RpcClientRouteId = bareRouteId // set up the local domain socket listener for local wsh commands unixListener, err := MakeRemoteUnixListener() @@ -402,6 +404,7 @@ func serverRunNormal(jwtToken string) error { return err } wshfs.RpcClient = RpcClient + wshfs.RpcClientRouteId = RpcClientRouteId WriteStdout("running wsh connserver (%s)\n", RpcContext.Conn) go func() { defer func() { diff --git a/cmd/wsh/cmd/wshcmd-file-util.go b/cmd/wsh/cmd/wshcmd-file-util.go index 13ef433ec5..77934c524e 100644 --- a/cmd/wsh/cmd/wshcmd-file-util.go +++ b/cmd/wsh/cmd/wshcmd-file-util.go @@ -1,4 +1,4 @@ -// Copyright 2025, Command Line Inc. +// Copyright 2026, Command Line Inc. // SPDX-License-Identifier: Apache-2.0 package cmd @@ -12,10 +12,10 @@ import ( "strings" "github.com/wavetermdev/waveterm/pkg/remote/connparse" - "github.com/wavetermdev/waveterm/pkg/remote/fileshare/fsutil" "github.com/wavetermdev/waveterm/pkg/util/fileutil" "github.com/wavetermdev/waveterm/pkg/wshrpc" "github.com/wavetermdev/waveterm/pkg/wshrpc/wshclient" + "github.com/wavetermdev/waveterm/pkg/wshutil" ) func convertNotFoundErr(err error) error { @@ -91,8 +91,38 @@ func streamWriteToFile(fileData wshrpc.FileData, reader io.Reader) error { } func streamReadFromFile(ctx context.Context, fileData wshrpc.FileData, writer io.Writer) error { - ch := wshclient.FileReadStreamCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: fileTimeout}) - return fsutil.ReadFileStreamToWriter(ctx, ch, writer) + broker := RpcClient.StreamBroker + if broker == nil { + return fmt.Errorf("stream broker not available") + } + if fileData.Info == nil { + return fmt.Errorf("file info is required") + } + readerRouteId := RpcClientRouteId + if readerRouteId == "" { + return fmt.Errorf("no route id available") + } + conn, err := connparse.ParseURI(fileData.Info.Path) + if err != nil { + return fmt.Errorf("parsing file path: %w", err) + } + writerRouteId := wshutil.MakeConnectionRouteId(conn.Host) + reader, streamMeta := broker.CreateStreamReader(readerRouteId, writerRouteId, 256*1024) + defer reader.Close() + go func() { + <-ctx.Done() + reader.Close() + }() + data := wshrpc.CommandFileStreamData{ + Info: fileData.Info, + StreamMeta: *streamMeta, + } + _, err = wshclient.FileStreamCommand(RpcClient, data, nil) + if err != nil { + return fmt.Errorf("starting file stream: %w", err) + } + _, err = io.Copy(writer, reader) + return err } func fixRelativePaths(path string) (string, error) { diff --git a/cmd/wsh/cmd/wshcmd-file.go b/cmd/wsh/cmd/wshcmd-file.go index 3e2cc5721b..e40eb324d2 100644 --- a/cmd/wsh/cmd/wshcmd-file.go +++ b/cmd/wsh/cmd/wshcmd-file.go @@ -172,11 +172,6 @@ func fileCatRun(cmd *cobra.Command, args []string) error { return err } - _, err = checkFileSize(path, MaxFileSize) - if err != nil { - return err - } - fileData := wshrpc.FileData{ Info: &wshrpc.FileInfo{ Path: path}} diff --git a/cmd/wsh/cmd/wshcmd-root.go b/cmd/wsh/cmd/wshcmd-root.go index 534ce0c31a..9534d2e5f5 100644 --- a/cmd/wsh/cmd/wshcmd-root.go +++ b/cmd/wsh/cmd/wshcmd-root.go @@ -31,6 +31,7 @@ var WrappedStdout io.Writer = &WrappedWriter{dest: os.Stdout} var WrappedStderr io.Writer = &WrappedWriter{dest: os.Stderr} var RpcClient *wshutil.WshRpc var RpcContext wshrpc.RpcContext +var RpcClientRouteId string var UsingTermWshMode bool var blockArg string var WshExitCode int @@ -140,7 +141,12 @@ func setupRpcClientWithToken(swapTokenStr string) (wshrpc.CommandAuthenticateRtn if err != nil { return rtn, fmt.Errorf("error setting up domain socket rpc client: %w", err) } - return wshclient.AuthenticateTokenCommand(RpcClient, wshrpc.CommandAuthenticateTokenData{Token: token.Token}, &wshrpc.RpcOpts{Route: wshutil.ControlRoute}) + rtn, err = wshclient.AuthenticateTokenCommand(RpcClient, wshrpc.CommandAuthenticateTokenData{Token: token.Token}, &wshrpc.RpcOpts{Route: wshutil.ControlRoute}) + if err != nil { + return rtn, err + } + RpcClientRouteId = rtn.RouteId + return rtn, nil } // returns the wrapped stdin and a new rpc client (that wraps the stdin input and stdout output) @@ -158,10 +164,11 @@ func setupRpcClient(serverImpl wshutil.ServerImpl, jwtToken string) error { if err != nil { return fmt.Errorf("error setting up domain socket rpc client: %v", err) } - _, err = wshclient.AuthenticateCommand(RpcClient, jwtToken, &wshrpc.RpcOpts{Route: wshutil.ControlRoute}) + authRtn, err := wshclient.AuthenticateCommand(RpcClient, jwtToken, &wshrpc.RpcOpts{Route: wshutil.ControlRoute}) if err != nil { return fmt.Errorf("error authenticating: %v", err) } + RpcClientRouteId = authRtn.RouteId blockId := os.Getenv("WAVETERM_BLOCKID") if blockId != "" { peerInfo := fmt.Sprintf("domain:block:%s", blockId) diff --git a/frontend/app/store/wshclientapi.ts b/frontend/app/store/wshclientapi.ts index de6cd7efff..2f5024f0ef 100644 --- a/frontend/app/store/wshclientapi.ts +++ b/frontend/app/store/wshclientapi.ts @@ -366,12 +366,6 @@ export class RpcApiType { return client.wshRpcCall("fileread", data, opts); } - // command "filereadstream" [responsestream] - FileReadStreamCommand(client: WshClient, data: FileData, opts?: RpcOpts): AsyncGenerator { - if (this.mockClient) return this.mockClient.mockWshRpcStream(client, "filereadstream", data, opts); - return client.wshRpcStream("filereadstream", data, opts); - } - // command "filerestorebackup" [call] FileRestoreBackupCommand(client: WshClient, data: CommandFileRestoreBackupData, opts?: RpcOpts): Promise { if (this.mockClient) return this.mockClient.mockWshRpcCall(client, "filerestorebackup", data, opts); @@ -780,12 +774,6 @@ export class RpcApiType { return client.wshRpcStream("remotestreamcpudata", null, opts); } - // command "remotestreamfile" [responsestream] - RemoteStreamFileCommand(client: WshClient, data: CommandRemoteStreamFileData, opts?: RpcOpts): AsyncGenerator { - if (this.mockClient) return this.mockClient.mockWshRpcStream(client, "remotestreamfile", data, opts); - return client.wshRpcStream("remotestreamfile", data, opts); - } - // command "remoteterminatejobmanager" [call] RemoteTerminateJobManagerCommand(client: WshClient, data: CommandRemoteTerminateJobManagerData, opts?: RpcOpts): Promise { if (this.mockClient) return this.mockClient.mockWshRpcCall(client, "remoteterminatejobmanager", data, opts); diff --git a/frontend/app/view/preview/preview-directory.tsx b/frontend/app/view/preview/preview-directory.tsx index cdacd810bd..0940ba43b3 100644 --- a/frontend/app/view/preview/preview-directory.tsx +++ b/frontend/app/view/preview/preview-directory.tsx @@ -2,9 +2,9 @@ // SPDX-License-Identifier: Apache-2.0 import { ContextMenuModel } from "@/app/store/contextmenu"; -import { useWaveEnv } from "@/app/waveenv/waveenv"; import { globalStore } from "@/app/store/jotaiStore"; import { TabRpcClient } from "@/app/store/wshrpcutil"; +import { useWaveEnv } from "@/app/waveenv/waveenv"; import { checkKeyPressed, isCharacterKeyEvent } from "@/util/keyutil"; import { PLATFORM, PlatformMacOS } from "@/util/platformutil"; import { addOpenMenuItems } from "@/util/previewutil"; @@ -112,7 +112,6 @@ function DirectoryTable({ newDirectory, }: DirectoryTableProps) { const env = useWaveEnv(); - const searchActive = useAtomValue(model.directorySearchActive); const fullConfig = useAtomValue(env.atoms.fullConfigAtom); const defaultSort = useAtomValue(env.getSettingsKeyAtom("preview:defaultsort")) ?? "name"; const setErrorMsg = useSetAtom(model.errorMsgAtom); @@ -587,28 +586,26 @@ function DirectoryPreview({ model }: DirectoryPreviewProps) { useEffect( () => fireAndForget(async () => { - let entries: FileInfo[]; + const entries: FileInfo[] = []; try { - const file = await env.rpc.FileReadCommand( - TabRpcClient, - { - info: { - path: await model.formatRemoteUri(dirPath, globalStore.get), - }, - }, - null - ); - entries = file.entries ?? []; - if (file?.info && file.info.dir && file.info?.path !== file.info?.dir) { + const remotePath = await model.formatRemoteUri(dirPath, globalStore.get); + const stream = env.rpc.FileListStreamCommand(TabRpcClient, { path: remotePath }, null); + for await (const chunk of stream) { + if (chunk?.fileinfo) { + entries.push(...chunk.fileinfo); + } + } + if (finfo?.dir && finfo?.path !== finfo?.dir) { entries.unshift({ name: "..", - path: file?.info?.dir, + path: finfo.dir, isdir: true, modtime: new Date().getTime(), mimetype: "directory", }); } } catch (e) { + console.error("Directory Read Error", e); setErrorMsg({ status: "Cannot Read Directory", text: `${e}`, diff --git a/frontend/app/view/preview/previewenv.ts b/frontend/app/view/preview/previewenv.ts index 464865a5da..30607b8459 100644 --- a/frontend/app/view/preview/previewenv.ts +++ b/frontend/app/view/preview/previewenv.ts @@ -11,6 +11,7 @@ export type PreviewEnv = WaveEnvSubset<{ ConnEnsureCommand: WaveEnv["rpc"]["ConnEnsureCommand"]; FileInfoCommand: WaveEnv["rpc"]["FileInfoCommand"]; FileReadCommand: WaveEnv["rpc"]["FileReadCommand"]; + FileListStreamCommand: WaveEnv["rpc"]["FileListStreamCommand"]; FileWriteCommand: WaveEnv["rpc"]["FileWriteCommand"]; FileMoveCommand: WaveEnv["rpc"]["FileMoveCommand"]; FileDeleteCommand: WaveEnv["rpc"]["FileDeleteCommand"]; diff --git a/frontend/preview/mock/mockfilesystem.ts b/frontend/preview/mock/mockfilesystem.ts index 6652bbb3fe..9767dddf64 100644 --- a/frontend/preview/mock/mockfilesystem.ts +++ b/frontend/preview/mock/mockfilesystem.ts @@ -8,26 +8,23 @@ const MockDirMimeType = "directory"; const MockDirMode = 0o040755; const MockFileMode = 0o100644; const MockDirectoryChunkSize = 128; -const MockFileChunkSize = 64 * 1024; const MockBaseModTime = Date.parse("2026-03-10T09:00:00.000Z"); const TinyPngBytes = Uint8Array.from([ - 0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a, 0x00, 0x00, 0x00, 0x0d, 0x49, 0x48, 0x44, 0x52, - 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x08, 0x04, 0x00, 0x00, 0x00, 0xb5, 0x1c, 0x0c, - 0x02, 0x00, 0x00, 0x00, 0x0b, 0x49, 0x44, 0x41, 0x54, 0x78, 0xda, 0x63, 0xfc, 0xff, 0x1f, 0x00, - 0x03, 0x03, 0x01, 0xff, 0xa5, 0xf8, 0x8f, 0xb1, 0x00, 0x00, 0x00, 0x00, 0x49, 0x45, 0x4e, 0x44, - 0xae, 0x42, 0x60, 0x82, + 0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a, 0x00, 0x00, 0x00, 0x0d, 0x49, 0x48, 0x44, 0x52, 0x00, 0x00, 0x00, + 0x01, 0x00, 0x00, 0x00, 0x01, 0x08, 0x04, 0x00, 0x00, 0x00, 0xb5, 0x1c, 0x0c, 0x02, 0x00, 0x00, 0x00, 0x0b, 0x49, + 0x44, 0x41, 0x54, 0x78, 0xda, 0x63, 0xfc, 0xff, 0x1f, 0x00, 0x03, 0x03, 0x01, 0xff, 0xa5, 0xf8, 0x8f, 0xb1, 0x00, + 0x00, 0x00, 0x00, 0x49, 0x45, 0x4e, 0x44, 0xae, 0x42, 0x60, 0x82, ]); const TinyJpegBytes = Uint8Array.from([ - 0xff, 0xd8, 0xff, 0xe0, 0x00, 0x10, 0x4a, 0x46, 0x49, 0x46, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, - 0x00, 0x01, 0x00, 0x00, 0xff, 0xdb, 0x00, 0x43, 0x00, 0x03, 0x02, 0x02, 0x03, 0x02, 0x02, 0x03, - 0x03, 0x03, 0x03, 0x04, 0x03, 0x03, 0x04, 0x05, 0x08, 0x05, 0x05, 0x04, 0x04, 0x05, 0x0a, 0x07, - 0x07, 0x06, 0x08, 0x0c, 0x0a, 0x0c, 0x0c, 0x0b, 0x0a, 0x0b, 0x0b, 0x0d, 0x0e, 0x12, 0x10, 0x0d, - 0x0e, 0x11, 0x0e, 0x0b, 0x0b, 0x10, 0x16, 0x10, 0x11, 0x13, 0x14, 0x15, 0x15, 0x15, 0x0c, 0x0f, - 0x17, 0x18, 0x16, 0x14, 0x18, 0x12, 0x14, 0x15, 0x14, 0xff, 0xc0, 0x00, 0x0b, 0x08, 0x00, 0x01, - 0x00, 0x01, 0x01, 0x01, 0x11, 0x00, 0xff, 0xc4, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0xff, 0xc4, 0x00, 0x14, - 0x10, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0xff, 0xda, 0x00, 0x08, 0x01, 0x01, 0x00, 0x00, 0x3f, 0x00, 0xbf, 0xff, 0xd9, + 0xff, 0xd8, 0xff, 0xe0, 0x00, 0x10, 0x4a, 0x46, 0x49, 0x46, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00, + 0x00, 0xff, 0xdb, 0x00, 0x43, 0x00, 0x03, 0x02, 0x02, 0x03, 0x02, 0x02, 0x03, 0x03, 0x03, 0x03, 0x04, 0x03, 0x03, + 0x04, 0x05, 0x08, 0x05, 0x05, 0x04, 0x04, 0x05, 0x0a, 0x07, 0x07, 0x06, 0x08, 0x0c, 0x0a, 0x0c, 0x0c, 0x0b, 0x0a, + 0x0b, 0x0b, 0x0d, 0x0e, 0x12, 0x10, 0x0d, 0x0e, 0x11, 0x0e, 0x0b, 0x0b, 0x10, 0x16, 0x10, 0x11, 0x13, 0x14, 0x15, + 0x15, 0x15, 0x0c, 0x0f, 0x17, 0x18, 0x16, 0x14, 0x18, 0x12, 0x14, 0x15, 0x14, 0xff, 0xc0, 0x00, 0x0b, 0x08, 0x00, + 0x01, 0x00, 0x01, 0x01, 0x01, 0x11, 0x00, 0xff, 0xc4, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0xff, 0xc4, 0x00, 0x14, 0x10, 0x01, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xda, 0x00, 0x08, 0x01, 0x01, + 0x00, 0x00, 0x3f, 0x00, 0xbf, 0xff, 0xd9, ]); type MockFsEntry = { @@ -61,7 +58,6 @@ export type MockFilesystem = { fileRead: (data: FileData) => Promise; fileList: (data: FileListData) => Promise; fileJoin: (paths: string[]) => Promise; - fileReadStream: (data: FileData) => AsyncGenerator; fileListStream: (data: FileListData) => AsyncGenerator; }; @@ -492,33 +488,9 @@ export function makeMockFilesystem(): MockFilesystem { } return toFileInfo(entry); }; - const fileReadStream = async function* (data: FileData): AsyncGenerator { - const info = await fileInfo(data); - yield { info }; - if (info.notfound) { - return; - } - const entry = getEntry(info.path); - if (entry.isdir) { - const dirEntries = (childrenByDir.get(entry.path) ?? []).map((child) => toFileInfo(child)); - for (let idx = 0; idx < dirEntries.length; idx += MockDirectoryChunkSize) { - yield { entries: dirEntries.slice(idx, idx + MockDirectoryChunkSize) }; - } - return; - } - if (entry.content == null || entry.content.byteLength === 0) { - return; - } - const { offset, end } = getReadRange(data, entry.content.byteLength); - for (let currentOffset = offset; currentOffset < end; currentOffset += MockFileChunkSize) { - const chunkEnd = Math.min(currentOffset + MockFileChunkSize, end); - yield { - data64: arrayToBase64(entry.content.slice(currentOffset, chunkEnd)), - at: { offset: currentOffset, size: chunkEnd - currentOffset }, - }; - } - }; - const fileListStream = async function* (data: FileListData): AsyncGenerator { + const fileListStream = async function* ( + data: FileListData + ): AsyncGenerator { const fileInfos = await fileList(data); for (let idx = 0; idx < fileInfos.length; idx += MockDirectoryChunkSize) { yield { fileinfo: fileInfos.slice(idx, idx + MockDirectoryChunkSize) }; @@ -535,7 +507,6 @@ export function makeMockFilesystem(): MockFilesystem { fileRead, fileList, fileJoin, - fileReadStream, fileListStream, }; } diff --git a/frontend/preview/mock/mockwaveenv.test.ts b/frontend/preview/mock/mockwaveenv.test.ts index 876f995338..031be34588 100644 --- a/frontend/preview/mock/mockwaveenv.test.ts +++ b/frontend/preview/mock/mockwaveenv.test.ts @@ -1,4 +1,4 @@ -import { base64ToArray, base64ToString } from "@/util/util"; +import { base64ToString } from "@/util/util"; import { describe, expect, it, vi } from "vitest"; import { DefaultMockFilesystem } from "./mockfilesystem"; @@ -82,26 +82,19 @@ describe("makeMockWaveEnv", () => { } expect(listPackets).toHaveLength(1); expect(listPackets[0].fileinfo).toHaveLength(4); - - const readPackets: FileData[] = []; - for await (const packet of env.rpc.FileReadStreamCommand(null as any, { - info: { path: "/Users/mike/Pictures/beach-sunrise.png" }, - })) { - readPackets.push(packet); - } - expect(readPackets[0].info?.path).toBe("/Users/mike/Pictures/beach-sunrise.png"); - const imageBytes = base64ToArray(readPackets[1].data64); - expect(Array.from(imageBytes.slice(0, 4))).toEqual([0x89, 0x50, 0x4e, 0x47]); }); it("implements secrets commands with in-memory storage", async () => { const { makeMockWaveEnv } = await import("./mockwaveenv"); const env = makeMockWaveEnv({ platform: "linux" }); - await env.rpc.SetSecretsCommand(null as any, { - OPENAI_API_KEY: "sk-test", - ANTHROPIC_API_KEY: "anthropic-test", - } as any); + await env.rpc.SetSecretsCommand( + null as any, + { + OPENAI_API_KEY: "sk-test", + ANTHROPIC_API_KEY: "anthropic-test", + } as any + ); expect(await env.rpc.GetSecretsLinuxStorageBackendCommand(null as any)).toBe("libsecret"); expect(await env.rpc.GetSecretsNamesCommand(null as any)).toEqual(["ANTHROPIC_API_KEY", "OPENAI_API_KEY"]); diff --git a/frontend/preview/mock/mockwaveenv.ts b/frontend/preview/mock/mockwaveenv.ts index faaf6cdde7..79522e4f6f 100644 --- a/frontend/preview/mock/mockwaveenv.ts +++ b/frontend/preview/mock/mockwaveenv.ts @@ -310,9 +310,6 @@ export function makeMockRpc( setCallHandler("fileread", async (_client, data: FileData) => DefaultMockFilesystem.fileRead(data)); setCallHandler("filelist", async (_client, data: FileListData) => DefaultMockFilesystem.fileList(data)); setCallHandler("filejoin", async (_client, data: string[]) => DefaultMockFilesystem.fileJoin(data)); - setStreamHandler("filereadstream", async function* (_client, data: FileData) { - yield* DefaultMockFilesystem.fileReadStream(data); - }); setStreamHandler("fileliststream", async function* (_client, data: FileListData) { yield* DefaultMockFilesystem.fileListStream(data); }); @@ -461,7 +458,11 @@ export function makeMockWaveEnv(mockEnv?: MockEnv): MockWaveEnv { globalStore.set(waveObjectValueAtomCache.get(oref), obj); }, }; - const { rpc, setRpcHandler, setRpcStreamHandler } = makeMockRpc(mergedOverrides.rpc, mergedOverrides.rpcStreaming, mockWosFns); + const { rpc, setRpcHandler, setRpcStreamHandler } = makeMockRpc( + mergedOverrides.rpc, + mergedOverrides.rpcStreaming, + mockWosFns + ); const env = { isMock: true, mockEnv: mergedOverrides, diff --git a/frontend/types/gotypes.d.ts b/frontend/types/gotypes.d.ts index 193929ef4c..d4f4cd30b5 100644 --- a/frontend/types/gotypes.d.ts +++ b/frontend/types/gotypes.d.ts @@ -576,12 +576,6 @@ declare global { publickeybase64: string; }; - // wshrpc.CommandRemoteStreamFileData - type CommandRemoteStreamFileData = { - path: string; - byterange?: string; - }; - // wshrpc.CommandRemoteTerminateJobManagerData type CommandRemoteTerminateJobManagerData = { jobid: string; diff --git a/pkg/remote/fileshare/fsutil/fsutil.go b/pkg/remote/fileshare/fsutil/fsutil.go index a7efd6fdc5..aba9de944e 100644 --- a/pkg/remote/fileshare/fsutil/fsutil.go +++ b/pkg/remote/fileshare/fsutil/fsutil.go @@ -150,13 +150,3 @@ func ReadStreamToFileData(ctx context.Context, readCh <-chan wshrpc.RespOrErrorU } return fileData, nil } - -func ReadFileStreamToWriter(ctx context.Context, readCh <-chan wshrpc.RespOrErrorUnion[wshrpc.FileData], writer io.Writer) error { - return ReadFileStream(ctx, readCh, func(finfo wshrpc.FileInfo) { - }, func(entries []*wshrpc.FileInfo) error { - return nil - }, func(data io.Reader) error { - _, err := io.Copy(writer, data) - return err - }) -} diff --git a/pkg/remote/fileshare/wshfs/wshfs.go b/pkg/remote/fileshare/wshfs/wshfs.go index 80ab53e99b..352f8c1b7b 100644 --- a/pkg/remote/fileshare/wshfs/wshfs.go +++ b/pkg/remote/fileshare/wshfs/wshfs.go @@ -1,4 +1,4 @@ -// Copyright 2025, Command Line Inc. +// Copyright 2026, Command Line Inc. // SPDX-License-Identifier: Apache-2.0 package wshfs @@ -7,12 +7,12 @@ import ( "context" "encoding/base64" "fmt" + "io" "log" "os" "time" "github.com/wavetermdev/waveterm/pkg/remote/connparse" - "github.com/wavetermdev/waveterm/pkg/remote/fileshare/fsutil" "github.com/wavetermdev/waveterm/pkg/wshrpc" "github.com/wavetermdev/waveterm/pkg/wshrpc/wshclient" "github.com/wavetermdev/waveterm/pkg/wshutil" @@ -30,6 +30,7 @@ const ( // This needs to be set by whoever initializes the client, either main-server or wshcmd-connserver var RpcClient *wshutil.WshRpc +var RpcClientRouteId string func parseConnection(ctx context.Context, path string) (*connparse.Connection, error) { conn, err := connparse.ParseURIAndReplaceCurrentHost(ctx, path) @@ -40,31 +41,54 @@ func parseConnection(ctx context.Context, path string) (*connparse.Connection, e } func Read(ctx context.Context, data wshrpc.FileData) (*wshrpc.FileData, error) { + if data.Info == nil { + return nil, fmt.Errorf("file info is required") + } log.Printf("Read: %v", data.Info.Path) conn, err := parseConnection(ctx, data.Info.Path) if err != nil { return nil, err } - rtnCh := readStream(conn, data) - return fsutil.ReadStreamToFileData(ctx, rtnCh) -} - -func ReadStream(ctx context.Context, data wshrpc.FileData) <-chan wshrpc.RespOrErrorUnion[wshrpc.FileData] { - log.Printf("ReadStream: %v", data.Info.Path) - conn, err := parseConnection(ctx, data.Info.Path) - if err != nil { - return wshutil.SendErrCh[wshrpc.FileData](err) - } - return readStream(conn, data) -} - -func readStream(conn *connparse.Connection, data wshrpc.FileData) <-chan wshrpc.RespOrErrorUnion[wshrpc.FileData] { + broker := RpcClient.StreamBroker + if broker == nil { + return nil, fmt.Errorf("stream broker not available") + } + if RpcClientRouteId == "" { + return nil, fmt.Errorf("no route id available") + } + readerRouteId := RpcClientRouteId + writerRouteId := wshutil.MakeConnectionRouteId(conn.Host) + reader, streamMeta := broker.CreateStreamReader(readerRouteId, writerRouteId, 256*1024) + defer reader.Close() + go func() { + <-ctx.Done() + reader.Close() + }() byteRange := "" if data.At != nil && data.At.Size > 0 { byteRange = fmt.Sprintf("%d-%d", data.At.Offset, data.At.Offset+int64(data.At.Size)-1) } - streamFileData := wshrpc.CommandRemoteStreamFileData{Path: conn.Path, ByteRange: byteRange} - return wshclient.RemoteStreamFileCommand(RpcClient, streamFileData, &wshrpc.RpcOpts{Route: wshutil.MakeConnectionRouteId(conn.Host)}) + remoteData := wshrpc.CommandRemoteFileStreamData{ + Path: conn.Path, + ByteRange: byteRange, + StreamMeta: *streamMeta, + } + fileInfo, err := wshclient.RemoteFileStreamCommand(RpcClient, remoteData, &wshrpc.RpcOpts{Route: writerRouteId}) + if err != nil { + return nil, fmt.Errorf("starting remote file stream: %w", err) + } + var rawData []byte + if fileInfo != nil && !fileInfo.IsDir { + rawData, err = io.ReadAll(reader) + if err != nil { + return nil, fmt.Errorf("reading file stream: %w", err) + } + } + rtnData := &wshrpc.FileData{Info: fileInfo} + if len(rawData) > 0 { + rtnData.Data64 = base64.StdEncoding.EncodeToString(rawData) + } + return rtnData, nil } func GetConnectionRouteId(ctx context.Context, path string) (string, error) { diff --git a/pkg/wshrpc/wshclient/wshclient.go b/pkg/wshrpc/wshclient/wshclient.go index 67ef8669ec..2968baa8d7 100644 --- a/pkg/wshrpc/wshclient/wshclient.go +++ b/pkg/wshrpc/wshclient/wshclient.go @@ -364,11 +364,6 @@ func FileReadCommand(w *wshutil.WshRpc, data wshrpc.FileData, opts *wshrpc.RpcOp return resp, err } -// command "filereadstream", wshserver.FileReadStreamCommand -func FileReadStreamCommand(w *wshutil.WshRpc, data wshrpc.FileData, opts *wshrpc.RpcOpts) chan wshrpc.RespOrErrorUnion[wshrpc.FileData] { - return sendRpcRequestResponseStreamHelper[wshrpc.FileData](w, "filereadstream", data, opts) -} - // command "filerestorebackup", wshserver.FileRestoreBackupCommand func FileRestoreBackupCommand(w *wshutil.WshRpc, data wshrpc.CommandFileRestoreBackupData, opts *wshrpc.RpcOpts) error { _, err := sendRpcRequestCallHelper[any](w, "filerestorebackup", data, opts) @@ -775,11 +770,6 @@ func RemoteStreamCpuDataCommand(w *wshutil.WshRpc, opts *wshrpc.RpcOpts) chan ws return sendRpcRequestResponseStreamHelper[wshrpc.TimeSeriesData](w, "remotestreamcpudata", nil, opts) } -// command "remotestreamfile", wshserver.RemoteStreamFileCommand -func RemoteStreamFileCommand(w *wshutil.WshRpc, data wshrpc.CommandRemoteStreamFileData, opts *wshrpc.RpcOpts) chan wshrpc.RespOrErrorUnion[wshrpc.FileData] { - return sendRpcRequestResponseStreamHelper[wshrpc.FileData](w, "remotestreamfile", data, opts) -} - // command "remoteterminatejobmanager", wshserver.RemoteTerminateJobManagerCommand func RemoteTerminateJobManagerCommand(w *wshutil.WshRpc, data wshrpc.CommandRemoteTerminateJobManagerData, opts *wshrpc.RpcOpts) error { _, err := sendRpcRequestCallHelper[any](w, "remoteterminatejobmanager", data, opts) diff --git a/pkg/wshrpc/wshremote/wshremote_file.go b/pkg/wshrpc/wshremote/wshremote_file.go index d796154f48..845fb64ac4 100644 --- a/pkg/wshrpc/wshremote/wshremote_file.go +++ b/pkg/wshrpc/wshremote/wshremote_file.go @@ -18,7 +18,6 @@ import ( "github.com/wavetermdev/waveterm/pkg/panichandler" "github.com/wavetermdev/waveterm/pkg/remote/connparse" - "github.com/wavetermdev/waveterm/pkg/remote/fileshare/fsutil" "github.com/wavetermdev/waveterm/pkg/remote/fileshare/wshfs" "github.com/wavetermdev/waveterm/pkg/util/fileutil" "github.com/wavetermdev/waveterm/pkg/util/utilfn" @@ -32,153 +31,6 @@ const RemoteFileTransferSizeLimit = 32 * 1024 * 1024 var DisableRecursiveFileOpts = true - -func (impl *ServerImpl) remoteStreamFileDir(ctx context.Context, path string, byteRange fileutil.ByteRangeType, dataCallback func(fileInfo []*wshrpc.FileInfo, data []byte, byteRange fileutil.ByteRangeType)) error { - innerFilesEntries, err := os.ReadDir(path) - if err != nil { - return fmt.Errorf("cannot open dir %q: %w", path, err) - } - if byteRange.All { - if len(innerFilesEntries) > wshrpc.MaxDirSize { - innerFilesEntries = innerFilesEntries[:wshrpc.MaxDirSize] - } - } else { - if byteRange.Start < int64(len(innerFilesEntries)) { - var realEnd int64 - if byteRange.OpenEnd { - realEnd = int64(len(innerFilesEntries)) - } else { - realEnd = byteRange.End + 1 - if realEnd > int64(len(innerFilesEntries)) { - realEnd = int64(len(innerFilesEntries)) - } - } - innerFilesEntries = innerFilesEntries[byteRange.Start:realEnd] - } else { - innerFilesEntries = []os.DirEntry{} - } - } - var fileInfoArr []*wshrpc.FileInfo - for _, innerFileEntry := range innerFilesEntries { - if ctx.Err() != nil { - return ctx.Err() - } - innerFileInfoInt, err := innerFileEntry.Info() - if err != nil { - continue - } - innerFileInfo := statToFileInfo(filepath.Join(path, innerFileInfoInt.Name()), innerFileInfoInt, false) - fileInfoArr = append(fileInfoArr, innerFileInfo) - if len(fileInfoArr) >= wshrpc.DirChunkSize { - dataCallback(fileInfoArr, nil, byteRange) - fileInfoArr = nil - } - } - if len(fileInfoArr) > 0 { - dataCallback(fileInfoArr, nil, byteRange) - } - return nil -} - -func (impl *ServerImpl) remoteStreamFileRegular(ctx context.Context, path string, byteRange fileutil.ByteRangeType, dataCallback func(fileInfo []*wshrpc.FileInfo, data []byte, byteRange fileutil.ByteRangeType)) error { - fd, err := os.Open(path) - if err != nil { - return fmt.Errorf("cannot open file %q: %w", path, err) - } - defer utilfn.GracefulClose(fd, "remoteStreamFileRegular", path) - var filePos int64 - if !byteRange.All && byteRange.Start > 0 { - _, err := fd.Seek(byteRange.Start, io.SeekStart) - if err != nil { - return fmt.Errorf("seeking file %q: %w", path, err) - } - filePos = byteRange.Start - } - buf := make([]byte, wshrpc.FileChunkSize) - for { - if ctx.Err() != nil { - return ctx.Err() - } - n, err := fd.Read(buf) - if n > 0 { - if !byteRange.All && !byteRange.OpenEnd && filePos+int64(n) > byteRange.End+1 { - n = int(byteRange.End + 1 - filePos) - } - filePos += int64(n) - dataCallback(nil, buf[:n], byteRange) - } - if !byteRange.All && !byteRange.OpenEnd && filePos >= byteRange.End+1 { - break - } - if errors.Is(err, io.EOF) { - break - } - if err != nil { - return fmt.Errorf("reading file %q: %w", path, err) - } - } - return nil -} - -func (impl *ServerImpl) remoteStreamFileInternal(ctx context.Context, data wshrpc.CommandRemoteStreamFileData, dataCallback func(fileInfo []*wshrpc.FileInfo, data []byte, byteRange fileutil.ByteRangeType)) error { - byteRange, err := fileutil.ParseByteRange(data.ByteRange) - if err != nil { - return err - } - path, err := wavebase.ExpandHomeDir(data.Path) - if err != nil { - return err - } - finfo, err := impl.fileInfoInternal(path, true) - if err != nil { - return fmt.Errorf("cannot stat file %q: %w", path, err) - } - dataCallback([]*wshrpc.FileInfo{finfo}, nil, byteRange) - if finfo.NotFound { - return nil - } - if finfo.IsDir { - return impl.remoteStreamFileDir(ctx, path, byteRange, dataCallback) - } else { - if finfo.Size > RemoteFileTransferSizeLimit { - return fmt.Errorf("file %q size %d exceeds transfer limit of %d bytes", path, finfo.Size, RemoteFileTransferSizeLimit) - } - return impl.remoteStreamFileRegular(ctx, path, byteRange, dataCallback) - } -} - -func (impl *ServerImpl) RemoteStreamFileCommand(ctx context.Context, data wshrpc.CommandRemoteStreamFileData) chan wshrpc.RespOrErrorUnion[wshrpc.FileData] { - ch := make(chan wshrpc.RespOrErrorUnion[wshrpc.FileData], 16) - go func() { - defer func() { - panichandler.PanicHandler("RemoteStreamFileCommand", recover()) - }() - defer close(ch) - firstPk := true - err := impl.remoteStreamFileInternal(ctx, data, func(fileInfo []*wshrpc.FileInfo, data []byte, byteRange fileutil.ByteRangeType) { - resp := wshrpc.FileData{} - fileInfoLen := len(fileInfo) - if fileInfoLen > 1 || !firstPk { - resp.Entries = fileInfo - } else if fileInfoLen == 1 { - resp.Info = fileInfo[0] - } - if firstPk { - firstPk = false - } - if len(data) > 0 { - resp.Data64 = base64.StdEncoding.EncodeToString(data) - resp.At = &wshrpc.FileDataAt{Offset: byteRange.Start, Size: len(data)} - } - ch <- wshrpc.RespOrErrorUnion[wshrpc.FileData]{Response: resp} - }) - if err != nil { - ch <- wshutil.RespErr[wshrpc.FileData](err) - } - }() - return ch -} - // prepareDestForCopy resolves the final destination path and handles overwrite logic. // destPath is the raw destination path (may be a directory or file path). // srcBaseName is the basename of the source file (used when dest is a directory or ends with slash). @@ -315,8 +167,25 @@ func (impl *ServerImpl) RemoteFileCopyCommand(ctx context.Context, data wshrpc.C } defer destFile.Close() - streamChan := wshclient.RemoteStreamFileCommand(wshfs.RpcClient, wshrpc.CommandRemoteStreamFileData{Path: srcConn.Path}, &wshrpc.RpcOpts{Timeout: opts.Timeout, Route: wshutil.MakeConnectionRouteId(srcConn.Host)}) - if err = fsutil.ReadFileStreamToWriter(readCtx, streamChan, destFile); err != nil { + if wshfs.RpcClientRouteId == "" { + return false, fmt.Errorf("stream broker route id not available for file copy") + } + writerRouteId := wshutil.MakeConnectionRouteId(srcConn.Host) + reader, streamMeta := wshfs.RpcClient.StreamBroker.CreateStreamReader(wshfs.RpcClientRouteId, writerRouteId, 256*1024) + log.Printf("RemoteFileCopyCommand: readroute=%s writeroute=%s", streamMeta.ReaderRouteId, streamMeta.WriterRouteId) + defer reader.Close() + go func() { + <-readCtx.Done() + reader.Close() + }() + streamData := wshrpc.CommandRemoteFileStreamData{ + Path: srcConn.Path, + StreamMeta: *streamMeta, + } + if _, err = wshclient.RemoteFileStreamCommand(wshfs.RpcClient, streamData, &wshrpc.RpcOpts{Route: writerRouteId}); err != nil { + return false, fmt.Errorf("error starting file stream for %q: %w", data.SrcUri, err) + } + if _, err = io.Copy(destFile, reader); err != nil { return false, fmt.Errorf("error copying file %q to %q: %w", data.SrcUri, data.DestUri, err) } @@ -342,6 +211,9 @@ func (impl *ServerImpl) RemoteListEntriesCommand(ctx context.Context, data wshrp ch <- wshutil.RespErr[wshrpc.CommandRemoteListEntriesRtnData](err) return } + if data.Opts == nil { + data.Opts = &wshrpc.FileListOpts{} + } innerFilesEntries := []os.DirEntry{} seen := 0 if data.Opts.Limit == 0 { diff --git a/pkg/wshrpc/wshrpctypes_file.go b/pkg/wshrpc/wshrpctypes_file.go index a43a68bc2e..3c64bb2289 100644 --- a/pkg/wshrpc/wshrpctypes_file.go +++ b/pkg/wshrpc/wshrpctypes_file.go @@ -18,24 +18,17 @@ type WshRpcFileInterface interface { FileAppendCommand(ctx context.Context, data FileData) error FileWriteCommand(ctx context.Context, data FileData) error FileReadCommand(ctx context.Context, data FileData) (*FileData, error) - FileReadStreamCommand(ctx context.Context, data FileData) <-chan RespOrErrorUnion[FileData] FileMoveCommand(ctx context.Context, data CommandFileCopyData) error FileCopyCommand(ctx context.Context, data CommandFileCopyData) error FileInfoCommand(ctx context.Context, data FileData) (*FileInfo, error) FileListCommand(ctx context.Context, data FileListData) ([]*FileInfo, error) FileJoinCommand(ctx context.Context, paths []string) (*FileInfo, error) FileListStreamCommand(ctx context.Context, data FileListData) <-chan RespOrErrorUnion[CommandRemoteListEntriesRtnData] - // modern streaming interface FileStreamCommand(ctx context.Context, data CommandFileStreamData) (*FileInfo, error) } type WshRpcRemoteFileInterface interface { - // old streaming inferface - RemoteStreamFileCommand(ctx context.Context, data CommandRemoteStreamFileData) chan RespOrErrorUnion[FileData] - - // modern streaming interface RemoteFileStreamCommand(ctx context.Context, data CommandRemoteFileStreamData) (*FileInfo, error) - RemoteFileCopyCommand(ctx context.Context, data CommandFileCopyData) (bool, error) RemoteListEntriesCommand(ctx context.Context, data CommandRemoteListEntriesData) chan RespOrErrorUnion[CommandRemoteListEntriesRtnData] RemoteFileInfoCommand(ctx context.Context, path string) (*FileInfo, error) diff --git a/pkg/wshrpc/wshserver/wshserver.go b/pkg/wshrpc/wshserver/wshserver.go index b2c533ff48..e66e52320c 100644 --- a/pkg/wshrpc/wshserver/wshserver.go +++ b/pkg/wshrpc/wshserver/wshserver.go @@ -388,10 +388,6 @@ func (ws *WshServer) FileReadCommand(ctx context.Context, data wshrpc.FileData) return wshfs.Read(ctx, data) } -func (ws *WshServer) FileReadStreamCommand(ctx context.Context, data wshrpc.FileData) <-chan wshrpc.RespOrErrorUnion[wshrpc.FileData] { - return wshfs.ReadStream(ctx, data) -} - func (ws *WshServer) FileStreamCommand(ctx context.Context, data wshrpc.CommandFileStreamData) (*wshrpc.FileInfo, error) { return wshfs.FileStream(ctx, data) }