diff --git a/Cargo.lock b/Cargo.lock index 8017ee4..a52764d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -125,12 +125,13 @@ dependencies = [ ] [[package]] -name = "agentos-browser" +name = "agentos-channel-email" version = "0.0.1" dependencies = [ "anyhow", "chrono", "iii-sdk", + "lettre", "serde", "serde_json", "tokio", @@ -140,6 +141,20 @@ dependencies = [ "uuid", ] +[[package]] +name = "agentos-channel-signal" +version = "0.0.1" +dependencies = [ + "anyhow", + "iii-sdk", + "reqwest", + "serde", + "serde_json", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "agentos-channel-slack" version = "0.0.1" @@ -154,6 +169,48 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "agentos-channel-teams" +version = "0.0.1" +dependencies = [ + "anyhow", + "iii-sdk", + "reqwest", + "serde", + "serde_json", + "tokio", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "agentos-channel-webex" +version = "0.0.1" +dependencies = [ + "anyhow", + "iii-sdk", + "reqwest", + "serde", + "serde_json", + "tokio", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "agentos-channel-whatsapp" +version = "0.0.1" +dependencies = [ + "anyhow", + "iii-sdk", + "reqwest", + "serde", + "serde_json", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "agentos-cli" version = "0.0.1" @@ -1506,6 +1563,22 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "email-encoding" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9298e6504d9b9e780ed3f7dfd43a61be8cd0e09eb07f7706a945b0072b6670b6" +dependencies = [ + "base64", + "memchr", +] + +[[package]] +name = "email_address" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e079f19b08ca6239f47f8ba8509c11cf3ea30095831f7fed61441475edd8c449" + [[package]] name = "embedded-io" version = "0.4.0" @@ -1913,6 +1986,12 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + [[package]] name = "hyper" version = "1.8.1" @@ -2320,6 +2399,33 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" +[[package]] +name = "lettre" +version = "0.11.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dabda5859ee7c06b995b9d1165aa52c39110e079ef609db97178d86aeb051fa7" +dependencies = [ + "async-trait", + "base64", + "email-encoding", + "email_address", + "fastrand", + "futures-io", + "futures-util", + "httpdate", + "idna", + "mime", + "nom", + "percent-encoding", + "quoted_printable", + "rustls", + "socket2", + "tokio", + "tokio-rustls", + "url", + "webpki-roots", +] + [[package]] name = "libc" version = "0.2.182" @@ -2414,6 +2520,12 @@ dependencies = [ "rustix 1.1.4", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "mio" version = "1.1.1" @@ -2426,6 +2538,15 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "nom" +version = "8.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df9761775871bdef83bee530e60050f7e54b1105350d6884eb0fb4f46c2f9405" +dependencies = [ + "memchr", +] + [[package]] name = "ntapi" version = "0.4.3" @@ -2902,6 +3023,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "quoted_printable" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "478e0585659a122aa407eb7e3c0e1fa51b1d8a870038bd29f0cf4a8551eea972" + [[package]] name = "r-efi" version = "5.3.0" @@ -3250,6 +3377,7 @@ version = "0.23.37" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "758025cb5fccfd3bc2fd74708fd4682be41d99e5dff73c377c0646c6012c73a4" dependencies = [ + "log", "once_cell", "ring", "rustls-pki-types", diff --git a/Cargo.toml b/Cargo.toml index a1968d3..0bc3db5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,9 +9,12 @@ members = [ "workers/approval", "workers/approval-tiers", "workers/bridge", - "workers/browser", - "workers/code-agent", + "workers/channel-email", + "workers/channel-signal", "workers/channel-slack", + "workers/channel-teams", + "workers/channel-webex", + "workers/channel-whatsapp", "workers/coordination", "workers/council", "workers/cron", diff --git a/src/__tests__/channels-bluesky.test.ts b/src/__tests__/channels-bluesky.test.ts new file mode 100644 index 0000000..27038b0 --- /dev/null +++ b/src/__tests__/channels-bluesky.test.ts @@ -0,0 +1,164 @@ +// @ts-nocheck +import { describe, it, expect, vi, beforeEach, beforeAll } from "vitest"; + +const mockTrigger = vi.fn(async (fnId: string, data?: any): Promise => { + if (fnId === "agent::chat") return { content: "Reply" }; + return null; +}); +const mockTriggerVoid = vi.fn(); + +const handlers: Record = {}; +vi.mock("iii-sdk", () => ({ + registerWorker: () => ({ + registerFunction: (config: any, handler: Function) => { + handlers[config.id] = handler; + }, + registerTrigger: vi.fn(), + trigger: (req: any) => + req.action + ? mockTriggerVoid(req.function_id, req.payload) + : mockTrigger(req.function_id, req.payload), + shutdown: vi.fn(), + }), + TriggerAction: { Void: () => ({}) }, +})); + +vi.mock("@agentos/shared/utils", () => ({ + httpOk: (req: any, data: any) => data, + splitMessage: vi.fn((text: string, limit: number) => { + const chunks: string[] = []; + for (let i = 0; i < text.length; i += limit) + chunks.push(text.slice(i, i + limit)); + return chunks.length ? chunks : [text]; + }), + resolveAgent: vi.fn(async () => "default-agent"), +})); + +const mockFetch = vi.fn(async () => ({ + ok: true, + json: async () => ({ + text: "Fetched message", + id: "msg-1", + accessJwt: "jwt", + did: "did:plc:test", + }), +})); +vi.stubGlobal("fetch", mockFetch); + +beforeEach(() => { + mockTrigger.mockReset(); + mockTrigger.mockImplementation( + async (fnId: string, data?: any): Promise => { + if (fnId === "agent::chat") return { content: "Reply" }; + return null; + }, + ); + mockTriggerVoid.mockClear(); + mockFetch.mockClear(); + mockFetch.mockImplementation(async () => ({ + ok: true, + json: async () => ({ + text: "Fetched message", + id: "msg-1", + accessJwt: "jwt", + did: "did:plc:test", + }), + })); +}); + +beforeAll(async () => { + process.env.BLUESKY_HANDLE = "test.bsky.social"; + process.env.BLUESKY_PASSWORD = "test-password"; + await import("../channels/bluesky.js"); +}); + +async function call(id: string, input: any) { + const handler = handlers[id]; + if (!handler) throw new Error(`Handler ${id} not registered`); + return handler(input); +} + +describe("channel::bluesky::webhook", () => { + it("registers the handler", () => { + expect(handlers["channel::bluesky::webhook"]).toBeDefined(); + }); + + it("ignores messages without text", async () => { + const result = await call("channel::bluesky::webhook", { + body: { did: "did:plc:user1" }, + }); + expect(result.status_code).toBe(200); + }); + + it("processes valid mention", async () => { + const result = await call("channel::bluesky::webhook", { + body: { + did: "did:plc:user2", + text: "Hello Bluesky", + uri: "at://did:plc:user2/post/1", + cid: "bafyrei1", + }, + }); + expect(result.status_code).toBe(200); + }); + + it("routes to agent::chat with bluesky session", async () => { + await call("channel::bluesky::webhook", { + body: { + did: "did:plc:user3", + text: "Bluesky msg", + uri: "at://x/post/2", + cid: "bafyrei2", + }, + }); + const chatCalls = mockTrigger.mock.calls.filter( + (c) => c[0] === "agent::chat", + ); + expect(chatCalls.length).toBe(1); + expect(chatCalls[0][1].sessionId).toBe("bluesky:did:plc:user3"); + }); + + it("calls Bluesky API to send reply", async () => { + await call("channel::bluesky::webhook", { + body: { + did: "did:plc:user4", + text: "Auth test", + uri: "at://x/post/3", + cid: "bafyrei3", + }, + }); + expect(mockFetch).toHaveBeenCalled(); + }); + + it("sends reply as post via AT Protocol", async () => { + await call("channel::bluesky::webhook", { + body: { + did: "did:plc:user5", + text: "Post test", + uri: "at://x/post/4", + cid: "bafyrei4", + }, + }); + const createCalls = (mockFetch.mock.calls as any[][]).filter((c) => + (c[0] as string).includes("createRecord"), + ); + expect(createCalls.length).toBeGreaterThanOrEqual(1); + }); + + it("emits audit event", async () => { + await call("channel::bluesky::webhook", { + body: { + did: "did:plc:user6", + text: "Audit bsky", + uri: "at://x/post/5", + cid: "bafyrei5", + }, + }); + expect(mockTriggerVoid).toHaveBeenCalledWith( + "security::audit", + expect.objectContaining({ + detail: expect.objectContaining({ channel: "bluesky" }), + }), + ); + }); +}); diff --git a/src/__tests__/channels-matrix.test.ts b/src/__tests__/channels-matrix.test.ts new file mode 100644 index 0000000..38bbfd4 --- /dev/null +++ b/src/__tests__/channels-matrix.test.ts @@ -0,0 +1,182 @@ +// @ts-nocheck +import { describe, it, expect, vi, beforeEach, beforeAll } from "vitest"; + +const mockTrigger = vi.fn(async (fnId: string, data?: any): Promise => { + if (fnId === "agent::chat") return { content: "Reply" }; + return null; +}); +const mockTriggerVoid = vi.fn(); + +const handlers: Record = {}; +vi.mock("iii-sdk", () => ({ + registerWorker: () => ({ + registerFunction: (config: any, handler: Function) => { + handlers[config.id] = handler; + }, + registerTrigger: vi.fn(), + trigger: (req: any) => + req.action + ? mockTriggerVoid(req.function_id, req.payload) + : mockTrigger(req.function_id, req.payload), + shutdown: vi.fn(), + }), + TriggerAction: { Void: () => ({}) }, +})); + +vi.mock("@agentos/shared/utils", () => ({ + httpOk: (req: any, data: any) => data, + splitMessage: vi.fn((text: string) => [text]), + resolveAgent: vi.fn(async () => "default-agent"), +})); + +const mockFetch = vi.fn(async () => ({ + ok: true, + json: async () => ({ access_token: "test-token-123" }), +})); +vi.stubGlobal("fetch", mockFetch); + +beforeEach(() => { + mockTrigger.mockReset(); + mockTrigger.mockImplementation(async (fnId: string): Promise => { + if (fnId === "agent::chat") return { content: "Reply" }; + return null; + }); + mockTriggerVoid.mockClear(); + mockFetch.mockClear(); + mockFetch.mockImplementation(async () => ({ + ok: true, + json: async () => ({ access_token: "test-token-123" }), + })); +}); + +describe("Matrix channel", () => { + beforeAll(async () => { + process.env.MATRIX_HOMESERVER = "https://matrix.example.org"; + process.env.MATRIX_TOKEN = "test-matrix-token"; + await import("../channels/matrix.js"); + }); + + it("registers channel::matrix::webhook", () => { + expect(handlers["channel::matrix::webhook"]).toBeDefined(); + }); + + it("processes m.room.message event", async () => { + const result = await handlers["channel::matrix::webhook"]({ + body: { + type: "m.room.message", + room_id: "!room1:matrix.org", + content: { body: "Hello Matrix" }, + sender: "@user:matrix.org", + }, + }); + expect(result.status_code).toBe(200); + }); + + it("routes to agent::chat", async () => { + await handlers["channel::matrix::webhook"]({ + body: { + type: "m.room.message", + room_id: "!room2:matrix.org", + content: { body: "Matrix msg" }, + sender: "@user2:matrix.org", + }, + }); + const chatCalls = mockTrigger.mock.calls.filter(c => c[0] === "agent::chat"); + expect(chatCalls.length).toBeGreaterThanOrEqual(1); + }); + + it("uses room_id for session", async () => { + await handlers["channel::matrix::webhook"]({ + body: { + type: "m.room.message", + room_id: "!sess-room:matrix.org", + content: { body: "Session" }, + sender: "@user3:matrix.org", + }, + }); + const chatCalls = mockTrigger.mock.calls.filter(c => c[0] === "agent::chat"); + const lastChat = chatCalls[chatCalls.length - 1]; + expect(lastChat[1].sessionId).toBe("matrix:!sess-room:matrix.org"); + }); + + it("ignores non-message events", async () => { + const result = await handlers["channel::matrix::webhook"]({ + body: { + type: "m.room.member", + room_id: "!room3:matrix.org", + }, + }); + expect(result.status_code).toBe(200); + }); + + it("ignores empty body content", async () => { + const result = await handlers["channel::matrix::webhook"]({ + body: { + type: "m.room.message", + room_id: "!room4:matrix.org", + content: {}, + sender: "@user4:matrix.org", + }, + }); + expect(result.status_code).toBe(200); + }); + + it("sends reply via Matrix PUT API", async () => { + await handlers["channel::matrix::webhook"]({ + body: { + type: "m.room.message", + room_id: "!reply-room:matrix.org", + content: { body: "Reply test" }, + sender: "@user5:matrix.org", + }, + }); + expect(mockFetch).toHaveBeenCalledWith( + expect.stringContaining("/_matrix/client/v3/rooms/"), + expect.objectContaining({ method: "PUT" }), + ); + }); + + it("sends m.text msgtype", async () => { + await handlers["channel::matrix::webhook"]({ + body: { + type: "m.room.message", + room_id: "!type-room:matrix.org", + content: { body: "Type" }, + sender: "@user6:matrix.org", + }, + }); + const putCall = mockFetch.mock.calls.find(c => c[1]?.method === "PUT"); + if (putCall) { + const body = JSON.parse(putCall[1].body); + expect(body.msgtype).toBe("m.text"); + } + }); + + it("audits channel message", async () => { + await handlers["channel::matrix::webhook"]({ + body: { + type: "m.room.message", + room_id: "!audit-room:matrix.org", + content: { body: "Audit" }, + sender: "@audit-user:matrix.org", + }, + }); + const auditCalls = mockTriggerVoid.mock.calls.filter(c => c[0] === "security::audit"); + expect(auditCalls.some(c => c[1].detail?.channel === "matrix")).toBe(true); + }); + + it("includes bearer token in authorization", async () => { + await handlers["channel::matrix::webhook"]({ + body: { + type: "m.room.message", + room_id: "!auth-room:matrix.org", + content: { body: "Auth check" }, + sender: "@auth-user:matrix.org", + }, + }); + const putCall = mockFetch.mock.calls.find(c => c[1]?.method === "PUT"); + if (putCall) { + expect(putCall[1].headers.Authorization).toBe("Bearer test-matrix-token"); + } + }); +}); diff --git a/src/__tests__/channels-email-mattermost.test.ts b/src/__tests__/channels-mattermost.test.ts similarity index 53% rename from src/__tests__/channels-email-mattermost.test.ts rename to src/__tests__/channels-mattermost.test.ts index 437467d..87588fd 100644 --- a/src/__tests__/channels-email-mattermost.test.ts +++ b/src/__tests__/channels-mattermost.test.ts @@ -29,11 +29,6 @@ vi.mock("@agentos/shared/utils", () => ({ resolveAgent: vi.fn(async () => "default-agent"), })); -const mockSendMail = vi.fn(async () => ({ messageId: "msg-1" })); -vi.mock("nodemailer", () => ({ - createTransport: () => ({ sendMail: mockSendMail }), -})); - const mockFetch = vi.fn(async () => ({ ok: true, json: async () => ({}) })); vi.stubGlobal("fetch", mockFetch); @@ -44,146 +39,9 @@ beforeEach(() => { return null; }); mockTriggerVoid.mockClear(); - mockSendMail.mockClear(); mockFetch.mockClear(); }); -describe("Email channel", () => { - beforeAll(async () => { - process.env.SMTP_HOST = "smtp.test.com"; - process.env.SMTP_PORT = "587"; - process.env.SMTP_USER = "bot@test.com"; - process.env.SMTP_PASS = "test-pass"; - await import("../channels/email.js"); - }); - - it("registers channel::email::webhook", () => { - expect(handlers["channel::email::webhook"]).toBeDefined(); - }); - - it("processes inbound email", async () => { - const result = await handlers["channel::email::webhook"]({ - body: { - from: "user@example.com", - to: "bot@test.com", - subject: "Test", - text: "Hello via email", - }, - }); - expect(result.status_code).toBe(200); - }); - - it("routes email to agent::chat", async () => { - await handlers["channel::email::webhook"]({ - body: { - from: "sender@test.com", - to: "bot@test.com", - subject: "Question", - text: "What is 2+2?", - }, - }); - const chatCalls = mockTrigger.mock.calls.filter(c => c[0] === "agent::chat"); - expect(chatCalls.length).toBe(1); - expect(chatCalls[0][1].message).toContain("What is 2+2?"); - expect(chatCalls[0][1].message).toContain("Subject: Question"); - }); - - it("uses sender address for session ID", async () => { - await handlers["channel::email::webhook"]({ - body: { - from: "session@test.com", - to: "bot@test.com", - subject: "Session test", - text: "Session content", - }, - }); - const chatCalls = mockTrigger.mock.calls.filter(c => c[0] === "agent::chat"); - expect(chatCalls[0][1].sessionId).toBe("email:session@test.com"); - }); - - it("sends reply via SMTP", async () => { - await handlers["channel::email::webhook"]({ - body: { - from: "reply@test.com", - to: "bot@test.com", - subject: "Reply test", - text: "Reply me", - }, - }); - expect(mockSendMail).toHaveBeenCalledWith( - expect.objectContaining({ - to: "reply@test.com", - subject: "Re: Reply test", - }), - ); - }); - - it("handles missing subject", async () => { - await handlers["channel::email::webhook"]({ - body: { - from: "no-subject@test.com", - to: "bot@test.com", - text: "No subject email", - }, - }); - const chatCalls = mockTrigger.mock.calls.filter(c => c[0] === "agent::chat"); - expect(chatCalls[0][1].message).toContain("Subject: (none)"); - }); - - it("ignores email without from", async () => { - const result = await handlers["channel::email::webhook"]({ - body: { - to: "bot@test.com", - text: "No from", - }, - }); - expect(result.status_code).toBe(200); - const chatCalls = mockTrigger.mock.calls.filter(c => c[0] === "agent::chat"); - expect(chatCalls.length).toBe(0); - }); - - it("ignores email without text", async () => { - const result = await handlers["channel::email::webhook"]({ - body: { - from: "empty@test.com", - to: "bot@test.com", - subject: "Empty", - }, - }); - expect(result.status_code).toBe(200); - const chatCalls = mockTrigger.mock.calls.filter(c => c[0] === "agent::chat"); - expect(chatCalls.length).toBe(0); - }); - - it("audits email message", async () => { - await handlers["channel::email::webhook"]({ - body: { - from: "audit@test.com", - to: "bot@test.com", - subject: "Audit", - text: "Audit email", - }, - }); - const auditCalls = mockTriggerVoid.mock.calls.filter(c => c[0] === "security::audit"); - expect(auditCalls.some(c => c[1].type === "channel_message")).toBe(true); - expect(auditCalls.some(c => c[1].detail.channel === "email")).toBe(true); - }); - - it("reply subject includes Re: prefix", async () => { - await handlers["channel::email::webhook"]({ - body: { - from: "re@test.com", - to: "bot@test.com", - subject: "Original", - text: "Content", - }, - }); - expect(mockSendMail).toHaveBeenCalledWith( - expect.objectContaining({ subject: "Re: Original" }), - ); - }); -}); - describe("Mattermost channel", () => { beforeAll(async () => { process.env.MATTERMOST_URL = "https://mattermost.example.com"; diff --git a/src/__tests__/channels-teams.test.ts b/src/__tests__/channels-teams.test.ts index ff64cfb..38bbfd4 100644 --- a/src/__tests__/channels-teams.test.ts +++ b/src/__tests__/channels-teams.test.ts @@ -49,136 +49,134 @@ beforeEach(() => { })); }); -describe("Microsoft Teams channel", () => { +describe("Matrix channel", () => { beforeAll(async () => { - process.env.TEAMS_APP_ID = "test-app-id"; - process.env.TEAMS_APP_PASSWORD = "test-password"; - await import("../channels/teams.js"); + process.env.MATRIX_HOMESERVER = "https://matrix.example.org"; + process.env.MATRIX_TOKEN = "test-matrix-token"; + await import("../channels/matrix.js"); }); - it("registers channel::teams::webhook", () => { - expect(handlers["channel::teams::webhook"]).toBeDefined(); + it("registers channel::matrix::webhook", () => { + expect(handlers["channel::matrix::webhook"]).toBeDefined(); }); - it("processes message activity", async () => { - const result = await handlers["channel::teams::webhook"]({ + it("processes m.room.message event", async () => { + const result = await handlers["channel::matrix::webhook"]({ body: { - type: "message", - text: "Hello Teams", - conversation: { id: "conv-1" }, - from: { id: "user-1" }, - serviceUrl: "https://smba.trafficmanager.net/teams/", - id: "act-1", + type: "m.room.message", + room_id: "!room1:matrix.org", + content: { body: "Hello Matrix" }, + sender: "@user:matrix.org", }, }); expect(result.status_code).toBe(200); }); it("routes to agent::chat", async () => { - await handlers["channel::teams::webhook"]({ + await handlers["channel::matrix::webhook"]({ body: { - type: "message", - text: "Teams msg", - conversation: { id: "conv-2" }, - from: { id: "user-2" }, - serviceUrl: "https://smba.trafficmanager.net/teams/", - id: "act-2", + type: "m.room.message", + room_id: "!room2:matrix.org", + content: { body: "Matrix msg" }, + sender: "@user2:matrix.org", }, }); const chatCalls = mockTrigger.mock.calls.filter(c => c[0] === "agent::chat"); - expect(chatCalls.length).toBe(1); - expect(chatCalls[0][1].message).toBe("Teams msg"); + expect(chatCalls.length).toBeGreaterThanOrEqual(1); }); - it("uses conversation ID for session", async () => { - await handlers["channel::teams::webhook"]({ + it("uses room_id for session", async () => { + await handlers["channel::matrix::webhook"]({ body: { - type: "message", - text: "Sess test", - conversation: { id: "conv-sess" }, - from: { id: "user-3" }, - serviceUrl: "https://smba.trafficmanager.net/teams/", - id: "act-3", + type: "m.room.message", + room_id: "!sess-room:matrix.org", + content: { body: "Session" }, + sender: "@user3:matrix.org", }, }); const chatCalls = mockTrigger.mock.calls.filter(c => c[0] === "agent::chat"); - expect(chatCalls[0][1].sessionId).toBe("teams:conv-sess"); + const lastChat = chatCalls[chatCalls.length - 1]; + expect(lastChat[1].sessionId).toBe("matrix:!sess-room:matrix.org"); }); - it("ignores non-message activities", async () => { - const result = await handlers["channel::teams::webhook"]({ + it("ignores non-message events", async () => { + const result = await handlers["channel::matrix::webhook"]({ body: { - type: "conversationUpdate", - conversation: { id: "conv-upd" }, + type: "m.room.member", + room_id: "!room3:matrix.org", }, }); expect(result.status_code).toBe(200); - const chatCalls = mockTrigger.mock.calls.filter(c => c[0] === "agent::chat"); - expect(chatCalls.length).toBe(0); }); - it("sends reply via Bot Framework API", async () => { - await handlers["channel::teams::webhook"]({ + it("ignores empty body content", async () => { + const result = await handlers["channel::matrix::webhook"]({ body: { - type: "message", - text: "Reply test", - conversation: { id: "conv-reply" }, - from: { id: "user-4" }, - serviceUrl: "https://smba.trafficmanager.net/teams/", - id: "act-reply", + type: "m.room.message", + room_id: "!room4:matrix.org", + content: {}, + sender: "@user4:matrix.org", }, }); - expect(mockFetch).toHaveBeenCalledWith( - expect.stringContaining("/v3/conversations/conv-reply/activities"), - expect.objectContaining({ method: "POST" }), - ); + expect(result.status_code).toBe(200); }); - it("gets OAuth token for reply", async () => { - await handlers["channel::teams::webhook"]({ + it("sends reply via Matrix PUT API", async () => { + await handlers["channel::matrix::webhook"]({ body: { - type: "message", - text: "Auth", - conversation: { id: "conv-auth" }, - from: { id: "user-5" }, - serviceUrl: "https://smba.trafficmanager.net/teams/", - id: "act-auth", + type: "m.room.message", + room_id: "!reply-room:matrix.org", + content: { body: "Reply test" }, + sender: "@user5:matrix.org", }, }); expect(mockFetch).toHaveBeenCalledWith( - expect.stringContaining("login.microsoftonline.com"), - expect.objectContaining({ method: "POST" }), + expect.stringContaining("/_matrix/client/v3/rooms/"), + expect.objectContaining({ method: "PUT" }), ); }); + it("sends m.text msgtype", async () => { + await handlers["channel::matrix::webhook"]({ + body: { + type: "m.room.message", + room_id: "!type-room:matrix.org", + content: { body: "Type" }, + sender: "@user6:matrix.org", + }, + }); + const putCall = mockFetch.mock.calls.find(c => c[1]?.method === "PUT"); + if (putCall) { + const body = JSON.parse(putCall[1].body); + expect(body.msgtype).toBe("m.text"); + } + }); + it("audits channel message", async () => { - await handlers["channel::teams::webhook"]({ + await handlers["channel::matrix::webhook"]({ body: { - type: "message", - text: "Audit", - conversation: { id: "conv-audit" }, - from: { id: "user-audit" }, - serviceUrl: "https://smba.trafficmanager.net/teams/", - id: "act-audit", + type: "m.room.message", + room_id: "!audit-room:matrix.org", + content: { body: "Audit" }, + sender: "@audit-user:matrix.org", }, }); const auditCalls = mockTriggerVoid.mock.calls.filter(c => c[0] === "security::audit"); - expect(auditCalls.some(c => c[1].type === "channel_message")).toBe(true); - expect(auditCalls.some(c => c[1].detail.channel === "teams")).toBe(true); + expect(auditCalls.some(c => c[1].detail?.channel === "matrix")).toBe(true); }); - it("handles empty text", async () => { - const result = await handlers["channel::teams::webhook"]({ + it("includes bearer token in authorization", async () => { + await handlers["channel::matrix::webhook"]({ body: { - type: "message", - text: "", - conversation: { id: "conv-empty" }, - from: { id: "user-6" }, - serviceUrl: "https://smba.trafficmanager.net/teams/", - id: "act-empty", + type: "m.room.message", + room_id: "!auth-room:matrix.org", + content: { body: "Auth check" }, + sender: "@auth-user:matrix.org", }, }); - expect(result.status_code).toBe(200); + const putCall = mockFetch.mock.calls.find(c => c[1]?.method === "PUT"); + if (putCall) { + expect(putCall[1].headers.Authorization).toBe("Bearer test-matrix-token"); + } }); }); - diff --git a/src/__tests__/channels-webex.test.ts b/src/__tests__/channels-webex.test.ts index 9123f64..27038b0 100644 --- a/src/__tests__/channels-webex.test.ts +++ b/src/__tests__/channels-webex.test.ts @@ -67,8 +67,9 @@ beforeEach(() => { }); beforeAll(async () => { - process.env.WEBEX_TOKEN = "test-webex-token"; - await import("../channels/webex.js"); + process.env.BLUESKY_HANDLE = "test.bsky.social"; + process.env.BLUESKY_PASSWORD = "test-password"; + await import("../channels/bluesky.js"); }); async function call(id: string, input: any) { @@ -77,102 +78,87 @@ async function call(id: string, input: any) { return handler(input); } -describe("channel::webex::webhook", () => { +describe("channel::bluesky::webhook", () => { it("registers the handler", () => { - expect(handlers["channel::webex::webhook"]).toBeDefined(); + expect(handlers["channel::bluesky::webhook"]).toBeDefined(); }); - it("ignores non-messages resources", async () => { - const result = await call("channel::webex::webhook", { - body: { resource: "rooms", event: "created" }, + it("ignores messages without text", async () => { + const result = await call("channel::bluesky::webhook", { + body: { did: "did:plc:user1" }, }); expect(result.status_code).toBe(200); - expect(result.body.ok).toBe(true); }); - it("ignores non-created events", async () => { - const result = await call("channel::webex::webhook", { - body: { resource: "messages", event: "deleted" }, + it("processes valid mention", async () => { + const result = await call("channel::bluesky::webhook", { + body: { + did: "did:plc:user2", + text: "Hello Bluesky", + uri: "at://did:plc:user2/post/1", + cid: "bafyrei1", + }, }); expect(result.status_code).toBe(200); }); - it("fetches message content from Webex API", async () => { - await call("channel::webex::webhook", { + it("routes to agent::chat with bluesky session", async () => { + await call("channel::bluesky::webhook", { body: { - resource: "messages", - event: "created", - data: { id: "msg-123", roomId: "room-1", personId: "person-1" }, + did: "did:plc:user3", + text: "Bluesky msg", + uri: "at://x/post/2", + cid: "bafyrei2", }, }); - expect(mockFetch).toHaveBeenCalledWith( - expect.stringContaining("webexapis.com/v1/messages/msg-123"), - expect.objectContaining({ - headers: expect.objectContaining({ - Authorization: "Bearer test-webex-token", - }), - }), + const chatCalls = mockTrigger.mock.calls.filter( + (c) => c[0] === "agent::chat", ); + expect(chatCalls.length).toBe(1); + expect(chatCalls[0][1].sessionId).toBe("bluesky:did:plc:user3"); }); - it("routes to agent::chat with webex session", async () => { - await call("channel::webex::webhook", { + it("calls Bluesky API to send reply", async () => { + await call("channel::bluesky::webhook", { body: { - resource: "messages", - event: "created", - data: { id: "msg-456", roomId: "room-2", personId: "p-1" }, + did: "did:plc:user4", + text: "Auth test", + uri: "at://x/post/3", + cid: "bafyrei3", }, }); - const chatCalls = mockTrigger.mock.calls.filter( - (c) => c[0] === "agent::chat", - ); - expect(chatCalls.length).toBe(1); - expect(chatCalls[0][1].sessionId).toBe("webex:room-2"); + expect(mockFetch).toHaveBeenCalled(); }); - it("sends reply to Webex room", async () => { - await call("channel::webex::webhook", { + it("sends reply as post via AT Protocol", async () => { + await call("channel::bluesky::webhook", { body: { - resource: "messages", - event: "created", - data: { id: "msg-789", roomId: "room-3", personId: "p-2" }, + did: "did:plc:user5", + text: "Post test", + uri: "at://x/post/4", + cid: "bafyrei4", }, }); - const sendCalls = mockFetch.mock.calls.filter( - (c) => (c[1] as any)?.method === "POST", + const createCalls = (mockFetch.mock.calls as any[][]).filter((c) => + (c[0] as string).includes("createRecord"), ); - expect(sendCalls.length).toBeGreaterThan(0); + expect(createCalls.length).toBeGreaterThanOrEqual(1); }); it("emits audit event", async () => { - await call("channel::webex::webhook", { + await call("channel::bluesky::webhook", { body: { - resource: "messages", - event: "created", - data: { id: "msg-a", roomId: "r-a", personId: "p-a" }, + did: "did:plc:user6", + text: "Audit bsky", + uri: "at://x/post/5", + cid: "bafyrei5", }, }); expect(mockTriggerVoid).toHaveBeenCalledWith( "security::audit", expect.objectContaining({ - detail: expect.objectContaining({ channel: "webex" }), + detail: expect.objectContaining({ channel: "bluesky" }), }), ); }); - - it("handles empty text response", async () => { - mockFetch.mockImplementationOnce(async () => ({ - ok: true, - json: async () => ({ text: "" }), - })); - const result = await call("channel::webex::webhook", { - body: { - resource: "messages", - event: "created", - data: { id: "msg-empty", roomId: "r-e", personId: "p-e" }, - }, - }); - expect(result.status_code).toBe(200); - }); }); - diff --git a/src/__tests__/channels-whatsapp-signal.test.ts b/src/__tests__/channels-whatsapp-signal.test.ts deleted file mode 100644 index b7d929b..0000000 --- a/src/__tests__/channels-whatsapp-signal.test.ts +++ /dev/null @@ -1,311 +0,0 @@ -import { describe, it, expect, vi, beforeEach, beforeAll } from "vitest"; - -const mockTrigger = vi.fn(async (fnId: string, data?: any): Promise => { - if (fnId === "agent::chat") return { content: "Reply" }; - return null; -}); -const mockTriggerVoid = vi.fn(); - -const handlers: Record = {}; -vi.mock("iii-sdk", () => ({ - registerWorker: () => ({ - registerFunction: (config: any, handler: Function) => { - handlers[config.id] = handler; - }, - registerTrigger: vi.fn(), - trigger: (req: any) => - req.action - ? mockTriggerVoid(req.function_id, req.payload) - : mockTrigger(req.function_id, req.payload), - shutdown: vi.fn(), - }), - TriggerAction: { Void: () => ({}) }, -})); - -vi.mock("@agentos/shared/utils", () => ({ - httpOk: (req: any, data: any) => data, - splitMessage: vi.fn((text: string, limit: number) => { - const chunks: string[] = []; - for (let i = 0; i < text.length; i += limit) - chunks.push(text.slice(i, i + limit)); - return chunks.length ? chunks : [text]; - }), - resolveAgent: vi.fn(async () => "default-agent"), -})); - -const mockFetch = vi.fn(async () => ({ ok: true, json: async () => ({}) })); -vi.stubGlobal("fetch", mockFetch); - -beforeEach(() => { - mockTrigger.mockReset(); - mockTrigger.mockImplementation( - async (fnId: string, data?: any): Promise => { - if (fnId === "agent::chat") return { content: "Reply" }; - return null; - }, - ); - mockTriggerVoid.mockClear(); - mockFetch.mockClear(); -}); - -beforeAll(async () => { - process.env.WHATSAPP_TOKEN = "test-wa-token"; - process.env.WHATSAPP_PHONE_ID = "123456"; - process.env.SIGNAL_API_URL = "http://signal-api:8080"; - process.env.SIGNAL_PHONE = "+1234567890"; - await import("../channels/whatsapp.js"); - await import("../channels/signal.js"); -}); - -async function call(id: string, input: any) { - const handler = handlers[id]; - if (!handler) throw new Error(`Handler ${id} not registered`); - return handler(input); -} - -describe("channel::whatsapp::webhook", () => { - it("registers the handler", () => { - expect(handlers["channel::whatsapp::webhook"]).toBeDefined(); - }); - - it("ignores non-whatsapp_business_account objects", async () => { - const result = await call("channel::whatsapp::webhook", { - body: { object: "page" }, - }); - expect(result.status_code).toBe(200); - expect(result.body.ok).toBe(true); - }); - - it("ignores messages without text body", async () => { - const result = await call("channel::whatsapp::webhook", { - body: { - object: "whatsapp_business_account", - entry: [{ changes: [{ value: { messages: [{ from: "+1" }] } }] }], - }, - }); - expect(result.status_code).toBe(200); - }); - - it("processes valid text message", async () => { - const result = await call("channel::whatsapp::webhook", { - body: { - object: "whatsapp_business_account", - entry: [ - { - changes: [ - { - value: { - messages: [ - { from: "+15551234567", text: { body: "Hello WA" } }, - ], - }, - }, - ], - }, - ], - }, - }); - expect(result.status_code).toBe(200); - }); - - it("routes message to agent::chat", async () => { - await call("channel::whatsapp::webhook", { - body: { - object: "whatsapp_business_account", - entry: [ - { - changes: [ - { - value: { - messages: [{ from: "+100", text: { body: "WA msg" } }], - }, - }, - ], - }, - ], - }, - }); - const chatCalls = mockTrigger.mock.calls.filter( - (c) => c[0] === "agent::chat", - ); - expect(chatCalls.length).toBe(1); - expect(chatCalls[0][1].message).toBe("WA msg"); - expect(chatCalls[0][1].sessionId).toBe("whatsapp:+100"); - }); - - it("sends reply via WhatsApp API", async () => { - await call("channel::whatsapp::webhook", { - body: { - object: "whatsapp_business_account", - entry: [ - { - changes: [ - { - value: { - messages: [{ from: "+200", text: { body: "Test" } }], - }, - }, - ], - }, - ], - }, - }); - expect(mockFetch).toHaveBeenCalledWith( - expect.stringContaining("graph.facebook.com"), - expect.objectContaining({ method: "POST" }), - ); - }); - - it("emits audit event", async () => { - await call("channel::whatsapp::webhook", { - body: { - object: "whatsapp_business_account", - entry: [ - { - changes: [ - { - value: { - messages: [{ from: "+300", text: { body: "Audit" } }], - }, - }, - ], - }, - ], - }, - }); - expect(mockTriggerVoid).toHaveBeenCalledWith( - "security::audit", - expect.objectContaining({ type: "channel_message" }), - ); - }); - - it("ignores missing entry array", async () => { - const result = await call("channel::whatsapp::webhook", { - body: { object: "whatsapp_business_account" }, - }); - expect(result.status_code).toBe(200); - }); - - it("ignores empty changes array", async () => { - const result = await call("channel::whatsapp::webhook", { - body: { object: "whatsapp_business_account", entry: [{ changes: [] }] }, - }); - expect(result.status_code).toBe(200); - }); -}); - -describe("channel::signal::webhook", () => { - it("registers the handler", () => { - expect(handlers["channel::signal::webhook"]).toBeDefined(); - }); - - it("ignores messages without dataMessage", async () => { - const result = await call("channel::signal::webhook", { - body: { envelope: {} }, - }); - expect(result.status_code).toBe(200); - }); - - it("ignores messages without text content", async () => { - const result = await call("channel::signal::webhook", { - body: { envelope: { dataMessage: {} } }, - }); - expect(result.status_code).toBe(200); - }); - - it("processes direct message", async () => { - const result = await call("channel::signal::webhook", { - body: { - envelope: { - source: "+15550001111", - dataMessage: { message: "Hello Signal" }, - }, - }, - }); - expect(result.status_code).toBe(200); - }); - - it("routes to agent::chat with signal session", async () => { - await call("channel::signal::webhook", { - body: { - envelope: { - source: "+15559998888", - dataMessage: { message: "Signal test" }, - }, - }, - }); - const chatCalls = mockTrigger.mock.calls.filter( - (c) => c[0] === "agent::chat", - ); - expect(chatCalls.length).toBe(1); - expect(chatCalls[0][1].sessionId).toBe("signal:+15559998888"); - }); - - it("uses groupId for session when available", async () => { - await call("channel::signal::webhook", { - body: { - envelope: { - source: "+1555", - dataMessage: { - message: "Group msg", - groupInfo: { groupId: "grp-123" }, - }, - }, - }, - }); - const chatCalls = mockTrigger.mock.calls.filter( - (c) => c[0] === "agent::chat", - ); - expect(chatCalls[0][1].sessionId).toBe("signal:grp-123"); - }); - - it("sends reply via Signal API", async () => { - await call("channel::signal::webhook", { - body: { - envelope: { - source: "+1000", - dataMessage: { message: "Reply test" }, - }, - }, - }); - expect(mockFetch).toHaveBeenCalledWith( - expect.stringContaining("/v2/send"), - expect.objectContaining({ method: "POST" }), - ); - }); - - it("sends group reply with group_id", async () => { - await call("channel::signal::webhook", { - body: { - envelope: { - source: "+2000", - dataMessage: { - message: "Group reply", - groupInfo: { groupId: "g-abc" }, - }, - }, - }, - }); - const fetchBody = JSON.parse( - (mockFetch.mock.calls[0] as any[])[1].body as string, - ); - expect(fetchBody.group_id).toBe("g-abc"); - }); - - it("emits audit event for signal messages", async () => { - await call("channel::signal::webhook", { - body: { - envelope: { - source: "+3000", - dataMessage: { message: "Audit signal" }, - }, - }, - }); - expect(mockTriggerVoid).toHaveBeenCalledWith( - "security::audit", - expect.objectContaining({ - detail: expect.objectContaining({ channel: "signal" }), - }), - ); - }); -}); diff --git a/src/channels/email.ts b/src/channels/email.ts deleted file mode 100644 index 8b7d152..0000000 --- a/src/channels/email.ts +++ /dev/null @@ -1,74 +0,0 @@ -import { registerWorker, TriggerAction } from "iii-sdk"; -import { ENGINE_URL, OTEL_CONFIG, registerShutdown } from "@agentos/shared/config"; -import { createTransport } from "nodemailer"; -import { splitMessage, resolveAgent } from "@agentos/shared/utils"; - -const sdk = registerWorker(ENGINE_URL, { - workerName: "channel-email", - otel: OTEL_CONFIG, -}); -registerShutdown(sdk); -const { registerFunction, registerTrigger, trigger } = sdk; - -const transporter = createTransport({ - host: process.env.SMTP_HOST || "localhost", - port: Number(process.env.SMTP_PORT || 587), - secure: process.env.SMTP_SECURE === "true", - auth: { - user: process.env.SMTP_USER || "", - pass: process.env.SMTP_PASS || "", - }, -}); - -registerFunction( - { - id: "channel::email::webhook", - description: "Handle inbound email webhook", - }, - async (req) => { - const body = req.body || req; - const { from, to, subject, text } = body; - - if (!from || !text) return { status_code: 200, body: { ok: true } }; - - const agentId = await resolveAgent(sdk, "email", to); - - const response: any = await trigger({ - function_id: "agent::chat", - payload: { - agentId, - message: `Subject: ${subject || "(none)"}\n\n${text}`, - sessionId: `email:${from}`, - }, - }); - - await sendMessage(from, `Re: ${subject || ""}`, response.content); - - trigger({ - function_id: "security::audit", - payload: { - type: "channel_message", - agentId, - detail: { channel: "email", from, to }, - }, - action: TriggerAction.Void(), - }); - - return { status_code: 200, body: { ok: true } }; - }, -); - -registerTrigger({ - type: "http", - function_id: "channel::email::webhook", - config: { api_path: "webhook/email", http_method: "POST" }, -}); - -async function sendMessage(to: string, subject: string, text: string) { - await transporter.sendMail({ - from: process.env.SMTP_USER, - to, - subject, - text, - }); -} diff --git a/src/channels/signal.ts b/src/channels/signal.ts deleted file mode 100644 index 8ac34f1..0000000 --- a/src/channels/signal.ts +++ /dev/null @@ -1,106 +0,0 @@ -import { registerWorker, TriggerAction } from "iii-sdk"; -import { ENGINE_URL, OTEL_CONFIG, registerShutdown } from "@agentos/shared/config"; -import { createSecretGetter } from "@agentos/shared/secrets"; -import { splitMessage, resolveAgent } from "@agentos/shared/utils"; - -const sdk = registerWorker(ENGINE_URL, { - workerName: "channel-signal", - otel: OTEL_CONFIG, -}); -registerShutdown(sdk); -const { registerFunction, registerTrigger, trigger } = sdk; - -const getSecret = createSecretGetter(trigger); - -registerFunction( - { - id: "channel::signal::webhook", - description: "Handle Signal REST API bridge webhook", - }, - async (req) => { - const body = req.body || req; - const { envelope } = body; - - if (!envelope?.dataMessage?.message) - return { status_code: 200, body: { ok: true } }; - - const source = envelope.source; - const text = envelope.dataMessage.message; - const groupId = envelope.dataMessage.groupInfo?.groupId; - - const channelKey = groupId || source; - const agentId = await resolveAgent(sdk, "signal", channelKey); - - const response: any = await trigger({ - function_id: "agent::chat", - payload: { - agentId, - message: text, - sessionId: `signal:${channelKey}`, - }, - }); - - if (!response?.content) { - console.warn("signal: agent returned empty response", { channelKey }); - return { status_code: 500, body: { error: "Empty agent response" } }; - } - - await sendMessage(source, response.content, groupId); - - trigger({ - function_id: "security::audit", - payload: { - type: "channel_message", - agentId, - detail: { channel: "signal", source, groupId }, - }, - action: TriggerAction.Void(), - }); - - return { status_code: 200, body: { ok: true } }; - }, -); - -registerTrigger({ - type: "http", - function_id: "channel::signal::webhook", - config: { api_path: "webhook/signal", http_method: "POST" }, -}); - -async function sendMessage(recipient: string, text: string, groupId?: string) { - const apiUrl = await getSecret("SIGNAL_API_URL"); - if (!apiUrl) { - throw new Error("SIGNAL_API_URL not configured"); - } - const phone = await getSecret("SIGNAL_PHONE"); - if (!phone) { - throw new Error("SIGNAL_PHONE not configured"); - } - const chunks = splitMessage(text, 4096); - for (const chunk of chunks) { - const controller = new AbortController(); - const timer = setTimeout(() => controller.abort(), 10_000); - try { - const res = await fetch(`${apiUrl}/v2/send`, { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - message: chunk, - number: phone, - ...(groupId - ? { recipients: [], group_id: groupId } - : { recipients: [recipient] }), - }), - signal: controller.signal, - }); - if (!res.ok) { - const body = await res.text().catch(() => ""); - throw new Error( - `Signal send failed (${res.status}): ${body.slice(0, 300)}`, - ); - } - } finally { - clearTimeout(timer); - } - } -} diff --git a/src/channels/teams.ts b/src/channels/teams.ts deleted file mode 100644 index ef5e48d..0000000 --- a/src/channels/teams.ts +++ /dev/null @@ -1,120 +0,0 @@ -import { registerWorker, TriggerAction } from "iii-sdk"; -import { ENGINE_URL, OTEL_CONFIG, registerShutdown } from "@agentos/shared/config"; -import { createSecretGetter } from "@agentos/shared/secrets"; -import { splitMessage, resolveAgent } from "@agentos/shared/utils"; - -const sdk = registerWorker(ENGINE_URL, { - workerName: "channel-teams", - otel: OTEL_CONFIG, -}); -registerShutdown(sdk); -const { registerFunction, registerTrigger, trigger } = sdk; - -const getSecret = createSecretGetter(trigger); - -const AUTH_URL = - "https://login.microsoftonline.com/botframework.com/oauth2/v2.0/token"; - -registerFunction( - { - id: "channel::teams::webhook", - description: "Handle Microsoft Teams Bot Framework webhook", - }, - async (req) => { - const activity = req.body || req; - - if (activity.type !== "message") - return { status_code: 200, body: { ok: true } }; - - const conversationId = activity.conversation?.id; - const text = activity.text || ""; - const userId = activity.from?.id; - const serviceUrl = activity.serviceUrl; - - const agentId = await resolveAgent(sdk, "teams", conversationId); - - const response: any = await trigger({ - function_id: "agent::chat", - payload: { - agentId, - message: text, - sessionId: `teams:${conversationId}`, - }, - }); - - await sendMessage( - serviceUrl, - conversationId, - activity.id, - response.content, - ); - - trigger({ - function_id: "security::audit", - payload: { - type: "channel_message", - agentId, - detail: { channel: "teams", conversationId, userId }, - }, - action: TriggerAction.Void(), - }); - - return { status_code: 200, body: { ok: true } }; - }, -); - -registerTrigger({ - type: "http", - function_id: "channel::teams::webhook", - config: { api_path: "webhook/teams", http_method: "POST" }, -}); - -async function getToken(): Promise { - const appId = await getSecret("TEAMS_APP_ID"); - const appPassword = await getSecret("TEAMS_APP_PASSWORD"); - if (!appId || !appPassword) { - throw new Error("Missing Teams credentials"); - } - const res = await fetch(AUTH_URL, { - method: "POST", - headers: { "Content-Type": "application/x-www-form-urlencoded" }, - body: new URLSearchParams({ - grant_type: "client_credentials", - client_id: appId, - client_secret: appPassword, - scope: "https://api.botframework.com/.default", - }).toString(), - }); - if (!res.ok) { - throw new Error(`Token request failed: ${res.status}`); - } - const data = (await res.json()) as { access_token: string }; - if (!data.access_token) { - throw new Error("Token response missing access_token"); - } - return data.access_token; -} - -async function sendMessage( - serviceUrl: string, - conversationId: string, - replyToId: string, - text: string, -) { - const token = await getToken(); - const chunks = splitMessage(text, 4096); - for (const chunk of chunks) { - await fetch(`${serviceUrl}/v3/conversations/${conversationId}/activities`, { - method: "POST", - headers: { - Authorization: `Bearer ${token}`, - "Content-Type": "application/json", - }, - body: JSON.stringify({ - type: "message", - text: chunk, - replyToId, - }), - }); - } -} diff --git a/src/channels/webex.ts b/src/channels/webex.ts deleted file mode 100644 index 0100a6f..0000000 --- a/src/channels/webex.ts +++ /dev/null @@ -1,95 +0,0 @@ -import { registerWorker, TriggerAction } from "iii-sdk"; -import { ENGINE_URL, OTEL_CONFIG, registerShutdown } from "@agentos/shared/config"; -import { createSecretGetter } from "@agentos/shared/secrets"; -import { splitMessage, resolveAgent } from "@agentos/shared/utils"; - -const sdk = registerWorker(ENGINE_URL, { - workerName: "channel-webex", - otel: OTEL_CONFIG, -}); -registerShutdown(sdk); -const { registerFunction, registerTrigger, trigger } = sdk; - -const getSecret = createSecretGetter(trigger); -const API_URL = "https://webexapis.com/v1"; - -registerFunction( - { id: "channel::webex::webhook", description: "Handle Cisco Webex webhook" }, - async (req) => { - const body = req.body || req; - - if (body.resource !== "messages" || body.event !== "created") { - return { status_code: 200, body: { ok: true } }; - } - - const messageId = body.data?.id; - const roomId = body.data?.roomId; - const personId = body.data?.personId; - - const webexToken = await getSecret("WEBEX_TOKEN"); - if (!webexToken) { - return { - status_code: 500, - body: { error: "WEBEX_TOKEN not configured" }, - }; - } - const msgRes = await fetch(`${API_URL}/messages/${messageId}`, { - headers: { Authorization: `Bearer ${webexToken}` }, - }); - if (!msgRes.ok) { - return { - status_code: 502, - body: { error: "Failed to fetch Webex message" }, - }; - } - const msg = (await msgRes.json()) as { text: string }; - const text = msg.text; - - if (!text) return { status_code: 200, body: { ok: true } }; - - const agentId = await resolveAgent(sdk, "webex", roomId); - - const response: any = await trigger({ - function_id: "agent::chat", - payload: { - agentId, - message: text, - sessionId: `webex:${roomId}`, - }, - }); - - await sendMessage(roomId, response.content, webexToken); - - trigger({ - function_id: "security::audit", - payload: { - type: "channel_message", - agentId, - detail: { channel: "webex", roomId, personId }, - }, - action: TriggerAction.Void(), - }); - - return { status_code: 200, body: { ok: true } }; - }, -); - -registerTrigger({ - type: "http", - function_id: "channel::webex::webhook", - config: { api_path: "webhook/webex", http_method: "POST" }, -}); - -async function sendMessage(roomId: string, text: string, token: string) { - const chunks = splitMessage(text, 7439); - for (const chunk of chunks) { - await fetch(`${API_URL}/messages`, { - method: "POST", - headers: { - Authorization: `Bearer ${token}`, - "Content-Type": "application/json", - }, - body: JSON.stringify({ roomId, text: chunk }), - }); - } -} diff --git a/src/channels/whatsapp.ts b/src/channels/whatsapp.ts deleted file mode 100644 index dfd1ebe..0000000 --- a/src/channels/whatsapp.ts +++ /dev/null @@ -1,92 +0,0 @@ -import { registerWorker, TriggerAction } from "iii-sdk"; -import { ENGINE_URL, OTEL_CONFIG, registerShutdown } from "@agentos/shared/config"; -import { createSecretGetter } from "@agentos/shared/secrets"; -import { splitMessage, resolveAgent } from "@agentos/shared/utils"; - -const sdk = registerWorker(ENGINE_URL, { - workerName: "channel-whatsapp", - otel: OTEL_CONFIG, -}); -registerShutdown(sdk); -const { registerFunction, registerTrigger, trigger } = sdk; - -const getSecret = createSecretGetter(trigger); - -registerFunction( - { - id: "channel::whatsapp::webhook", - description: "Handle WhatsApp Business API webhook", - }, - async (req) => { - const body = req.body || req; - - if (body.object !== "whatsapp_business_account") - return { status_code: 200, body: { ok: true } }; - - const entry = body.entry?.[0]; - const change = entry?.changes?.[0]; - const message = change?.value?.messages?.[0]; - - if (!message?.text?.body) return { status_code: 200, body: { ok: true } }; - - const from = message.from; - const text = message.text.body; - const agentId = await resolveAgent(sdk, "whatsapp", from); - - const response: any = await trigger({ - function_id: "agent::chat", - payload: { - agentId, - message: text, - sessionId: `whatsapp:${from}`, - }, - }); - - await sendMessage(from, response.content); - - trigger({ - function_id: "security::audit", - payload: { - type: "channel_message", - agentId, - detail: { channel: "whatsapp", from }, - }, - action: TriggerAction.Void(), - }); - - return { status_code: 200, body: { ok: true } }; - }, -); - -registerTrigger({ - type: "http", - function_id: "channel::whatsapp::webhook", - config: { api_path: "webhook/whatsapp", http_method: "POST" }, -}); - -async function sendMessage(to: string, text: string) { - const token = await getSecret("WHATSAPP_TOKEN"); - if (!token) { - throw new Error("WHATSAPP_TOKEN not configured"); - } - const phoneId = await getSecret("WHATSAPP_PHONE_ID"); - if (!phoneId) { - throw new Error("WHATSAPP_PHONE_ID not configured"); - } - const chunks = splitMessage(text, 4096); - for (const chunk of chunks) { - await fetch(`https://graph.facebook.com/v18.0/${phoneId}/messages`, { - method: "POST", - headers: { - Authorization: `Bearer ${token}`, - "Content-Type": "application/json", - }, - body: JSON.stringify({ - messaging_product: "whatsapp", - to, - type: "text", - text: { body: chunk }, - }), - }); - } -} diff --git a/workers/channel-email/Cargo.toml b/workers/channel-email/Cargo.toml new file mode 100644 index 0000000..d259c14 --- /dev/null +++ b/workers/channel-email/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "agentos-channel-email" +version.workspace = true +edition.workspace = true +license.workspace = true + +[[bin]] +name = "agentos-channel-email" +path = "src/main.rs" + +[dependencies] +iii-sdk.workspace = true +tokio.workspace = true +serde.workspace = true +serde_json.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true +anyhow.workspace = true +lettre = { version = "0.11", default-features = false, features = ["builder", "smtp-transport", "tokio1-rustls-tls"] } diff --git a/workers/channel-email/iii.worker.yaml b/workers/channel-email/iii.worker.yaml new file mode 100644 index 0000000..9ad9aa7 --- /dev/null +++ b/workers/channel-email/iii.worker.yaml @@ -0,0 +1,7 @@ +iii: v1 +name: channel-email +language: rust +deploy: binary +manifest: Cargo.toml +bin: agentos-channel-email +description: "Email channel adapter — inbound webhook + outbound SMTP via lettre" diff --git a/workers/channel-email/src/main.rs b/workers/channel-email/src/main.rs new file mode 100644 index 0000000..68c4340 --- /dev/null +++ b/workers/channel-email/src/main.rs @@ -0,0 +1,216 @@ +use iii_sdk::error::IIIError; +use iii_sdk::{III, InitOptions, RegisterFunction, RegisterTriggerInput, TriggerRequest, register_worker}; +use lettre::message::Message; +use lettre::transport::smtp::authentication::Credentials; +use lettre::{AsyncSmtpTransport, AsyncTransport, Tokio1Executor}; +use serde_json::{Value, json}; + +/// Resolve which agent should handle a given email recipient. +/// Mirrors `resolveAgent(sdk, "email", to)` from src/channels/email.ts. +async fn resolve_agent(iii: &III, channel: &str, channel_id: &str) -> String { + let key = format!("{channel}:{channel_id}"); + let result = iii + .trigger(TriggerRequest { + function_id: "state::get".to_string(), + payload: json!({ "scope": "channel_agents", "key": key }), + action: None, + timeout_ms: None, + }) + .await; + if let Ok(value) = result + && let Some(agent) = value.get("agentId").and_then(|v| v.as_str()) + { + return agent.to_string(); + } + "default".to_string() +} + +/// Build an SMTP transport from env (SMTP_HOST/SMTP_PORT/SMTP_SECURE/SMTP_USER/SMTP_PASS). +fn build_transport() -> Result, IIIError> { + let host = std::env::var("SMTP_HOST").unwrap_or_else(|_| "localhost".to_string()); + let port: u16 = std::env::var("SMTP_PORT") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(587); + let secure = std::env::var("SMTP_SECURE").map(|s| s == "true").unwrap_or(false); + let user = std::env::var("SMTP_USER").unwrap_or_default(); + let pass = std::env::var("SMTP_PASS").unwrap_or_default(); + + let mut builder = if secure { + AsyncSmtpTransport::::relay(&host) + .map_err(|e| IIIError::Handler(format!("SMTP relay error: {e}")))? + } else { + AsyncSmtpTransport::::builder_dangerous(&host) + }; + builder = builder.port(port); + if !user.is_empty() { + builder = builder.credentials(Credentials::new(user, pass)); + } + Ok(builder.build()) +} + +async fn send_mail(to: &str, subject: &str, text: &str) -> Result<(), IIIError> { + let from = std::env::var("SMTP_USER").unwrap_or_default(); + if from.is_empty() { + return Err(IIIError::Handler("SMTP_USER not configured".into())); + } + let email = Message::builder() + .from(from.parse().map_err(|e| IIIError::Handler(format!("invalid from: {e}")))?) + .to(to.parse().map_err(|e| IIIError::Handler(format!("invalid to: {e}")))?) + .subject(subject) + .body(text.to_string()) + .map_err(|e| IIIError::Handler(format!("message build: {e}")))?; + let transport = build_transport()?; + transport + .send(email) + .await + .map_err(|e| IIIError::Handler(format!("smtp send: {e}")))?; + Ok(()) +} + +/// Handle inbound email webhook (e.g. SendGrid Inbound Parse / Mailgun routes). +/// Mirrors `channel::email::webhook` in src/channels/email.ts. +async fn handle_webhook(iii: &III, req: Value) -> Result { + let body = req.get("body").cloned().unwrap_or_else(|| req.clone()); + + let from = body.get("from").and_then(|v| v.as_str()).unwrap_or(""); + let to = body.get("to").and_then(|v| v.as_str()).unwrap_or(""); + let subject = body.get("subject").and_then(|v| v.as_str()).unwrap_or(""); + let text = body.get("text").and_then(|v| v.as_str()).unwrap_or(""); + + if from.is_empty() || text.is_empty() { + return Ok(json!({ "status_code": 200, "body": { "ok": true } })); + } + + let agent_id = resolve_agent(iii, "email", to).await; + let subject_display = if subject.is_empty() { "(none)" } else { subject }; + + let chat = iii + .trigger(TriggerRequest { + function_id: "agent::chat".to_string(), + payload: json!({ + "agentId": agent_id, + "message": format!("Subject: {subject_display}\n\n{text}"), + "sessionId": format!("email:{from}"), + }), + action: None, + timeout_ms: None, + }) + .await + .map_err(|e| IIIError::Handler(format!("agent::chat failed: {e}")))?; + + let reply = chat.get("content").and_then(|v| v.as_str()).unwrap_or(""); + let reply_subject = format!("Re: {subject}"); + + if let Err(e) = send_mail(from, &reply_subject, reply).await { + tracing::error!(to = %from, error = %e, "failed to send email reply"); + } + + // Fire-and-forget audit (mirrors TriggerAction.Void()). + let audit_iii = iii.clone(); + let from_owned = from.to_string(); + let to_owned = to.to_string(); + let agent_for_audit = agent_id.clone(); + tokio::spawn(async move { + let _ = audit_iii + .trigger(TriggerRequest { + function_id: "security::audit".to_string(), + payload: json!({ + "type": "channel_message", + "agentId": agent_for_audit, + "detail": { "channel": "email", "from": from_owned, "to": to_owned }, + }), + action: None, + timeout_ms: None, + }) + .await; + }); + + Ok(json!({ "status_code": 200, "body": { "ok": true } })) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + let ws_url = + std::env::var("III_WS_URL").unwrap_or_else(|_| "ws://localhost:49134".to_string()); + let iii = register_worker(&ws_url, InitOptions::default()); + + let iii_clone = iii.clone(); + iii.register_function( + RegisterFunction::new_async("channel::email::webhook", move |input: Value| { + let iii = iii_clone.clone(); + async move { handle_webhook(&iii, input).await } + }) + .description("Handle inbound email webhook"), + ); + + iii.register_trigger(RegisterTriggerInput { + trigger_type: "http".to_string(), + function_id: "channel::email::webhook".to_string(), + config: json!({ "http_method": "POST", "api_path": "webhook/email" }), + metadata: None, + })?; + + tracing::info!("channel-email worker started"); + tokio::signal::ctrl_c().await?; + iii.shutdown_async().await; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn missing_from_returns_ok_without_dispatch() { + let body = json!({ "to": "bot@x.com", "text": "hi" }); + let from = body.get("from").and_then(|v| v.as_str()).unwrap_or(""); + let text = body.get("text").and_then(|v| v.as_str()).unwrap_or(""); + assert!(from.is_empty()); + assert!(!text.is_empty()); + } + + #[test] + fn missing_text_returns_ok_without_dispatch() { + let body = json!({ "from": "u@x.com", "to": "bot@x.com", "subject": "S" }); + let text = body.get("text").and_then(|v| v.as_str()).unwrap_or(""); + assert!(text.is_empty()); + } + + #[test] + fn subject_fallback_when_missing() { + let subject = ""; + let display = if subject.is_empty() { "(none)" } else { subject }; + assert_eq!(display, "(none)"); + } + + #[test] + fn subject_used_verbatim_when_present() { + let subject = "Hello"; + let display = if subject.is_empty() { "(none)" } else { subject }; + assert_eq!(display, "Hello"); + } + + #[test] + fn reply_subject_has_re_prefix() { + let subject = "Original"; + let reply = format!("Re: {subject}"); + assert_eq!(reply, "Re: Original"); + } + + #[test] + fn reply_subject_with_empty_subject_keeps_re_prefix() { + let subject = ""; + let reply = format!("Re: {subject}"); + assert_eq!(reply, "Re: "); + } + + #[test] + fn session_id_format_uses_from_address() { + let from = "user@x.com"; + let session = format!("email:{from}"); + assert_eq!(session, "email:user@x.com"); + } +} diff --git a/workers/channel-signal/Cargo.toml b/workers/channel-signal/Cargo.toml new file mode 100644 index 0000000..7c63574 --- /dev/null +++ b/workers/channel-signal/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "agentos-channel-signal" +version.workspace = true +edition.workspace = true +license.workspace = true + +[[bin]] +name = "agentos-channel-signal" +path = "src/main.rs" + +[dependencies] +iii-sdk.workspace = true +tokio.workspace = true +serde.workspace = true +serde_json.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true +anyhow.workspace = true +reqwest = { version = "0.12", features = ["json", "rustls-tls"], default-features = false } diff --git a/workers/channel-signal/iii.worker.yaml b/workers/channel-signal/iii.worker.yaml new file mode 100644 index 0000000..f57e6c4 --- /dev/null +++ b/workers/channel-signal/iii.worker.yaml @@ -0,0 +1,7 @@ +iii: v1 +name: channel-signal +language: rust +deploy: binary +manifest: Cargo.toml +bin: agentos-channel-signal +description: "Signal channel adapter — REST bridge to signal-cli-rest-api" diff --git a/workers/channel-signal/src/main.rs b/workers/channel-signal/src/main.rs new file mode 100644 index 0000000..83dc6ca --- /dev/null +++ b/workers/channel-signal/src/main.rs @@ -0,0 +1,326 @@ +use iii_sdk::error::IIIError; +use iii_sdk::{III, InitOptions, RegisterFunction, RegisterTriggerInput, TriggerRequest, register_worker}; +use serde_json::{Value, json}; +use std::time::Duration; + +const MAX_MESSAGE_LEN: usize = 4096; +const SEND_TIMEOUT: Duration = Duration::from_secs(10); + +async fn get_secret(iii: &III, key: &str) -> String { + let result = iii + .trigger(TriggerRequest { + function_id: "vault::get".to_string(), + payload: json!({ "key": key }), + action: None, + timeout_ms: None, + }) + .await; + if let Ok(value) = result + && let Some(v) = value.get("value").and_then(|v| v.as_str()) + && !v.is_empty() + { + return v.to_string(); + } + std::env::var(key).unwrap_or_default() +} + +async fn resolve_agent(iii: &III, channel: &str, channel_id: &str) -> String { + let key = format!("{channel}:{channel_id}"); + let result = iii + .trigger(TriggerRequest { + function_id: "state::get".to_string(), + payload: json!({ "scope": "channel_agents", "key": key }), + action: None, + timeout_ms: None, + }) + .await; + if let Ok(value) = result + && let Some(agent) = value.get("agentId").and_then(|v| v.as_str()) + { + return agent.to_string(); + } + "default".to_string() +} + +fn split_message(text: &str, max_len: usize) -> Vec { + if text.chars().count() <= max_len { + return vec![text.to_string()]; + } + let mut chunks: Vec = Vec::new(); + let mut remaining = text.to_string(); + while !remaining.is_empty() { + if remaining.chars().count() <= max_len { + chunks.push(remaining); + break; + } + let cutoff = remaining + .char_indices() + .nth(max_len) + .map(|(idx, _)| idx) + .unwrap_or(remaining.len()); + let window = &remaining[..cutoff]; + let split_at = match window.rfind('\n') { + Some(idx) if window[..idx].chars().count() > max_len / 2 => idx, + _ => cutoff, + }; + chunks.push(remaining[..split_at].to_string()); + remaining = remaining[split_at..].to_string(); + } + chunks +} + +async fn send_message( + client: &reqwest::Client, + api_url: &str, + phone: &str, + recipient: &str, + text: &str, + group_id: Option<&str>, +) -> Result<(), IIIError> { + if api_url.is_empty() { + return Err(IIIError::Handler("SIGNAL_API_URL not configured".into())); + } + if phone.is_empty() { + return Err(IIIError::Handler("SIGNAL_PHONE not configured".into())); + } + let url = format!("{}/v2/send", api_url.trim_end_matches('/')); + for chunk in split_message(text, MAX_MESSAGE_LEN) { + let payload = match group_id { + Some(gid) => json!({ + "message": chunk, + "number": phone, + "recipients": [], + "group_id": gid, + }), + None => json!({ + "message": chunk, + "number": phone, + "recipients": [recipient], + }), + }; + let resp = client + .post(&url) + .timeout(SEND_TIMEOUT) + .json(&payload) + .send() + .await + .map_err(|e| IIIError::Handler(format!("Signal send error: {e}")))?; + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + return Err(IIIError::Handler(format!( + "Signal send failed ({status}): {}", + body.chars().take(300).collect::() + ))); + } + } + Ok(()) +} + +async fn handle_webhook( + iii: &III, + client: &reqwest::Client, + req: Value, +) -> Result { + let body = req.get("body").cloned().unwrap_or_else(|| req.clone()); + let envelope = body.get("envelope"); + let text = envelope + .and_then(|e| e.get("dataMessage")) + .and_then(|d| d.get("message")) + .and_then(|v| v.as_str()) + .unwrap_or(""); + if text.is_empty() { + return Ok(json!({ "status_code": 200, "body": { "ok": true } })); + } + + let source = envelope + .and_then(|e| e.get("source")) + .and_then(|v| v.as_str()) + .unwrap_or_default() + .to_string(); + let group_id = envelope + .and_then(|e| e.get("dataMessage")) + .and_then(|d| d.get("groupInfo")) + .and_then(|g| g.get("groupId")) + .and_then(|v| v.as_str()) + .map(String::from); + + let channel_key = group_id.clone().unwrap_or_else(|| source.clone()); + let agent_id = resolve_agent(iii, "signal", &channel_key).await; + + let chat = iii + .trigger(TriggerRequest { + function_id: "agent::chat".to_string(), + payload: json!({ + "agentId": agent_id, + "message": text, + "sessionId": format!("signal:{channel_key}"), + }), + action: None, + timeout_ms: None, + }) + .await + .map_err(|e| IIIError::Handler(format!("agent::chat failed: {e}")))?; + + let reply = chat.get("content").and_then(|v| v.as_str()).unwrap_or(""); + if reply.is_empty() { + tracing::warn!(channel_key = %channel_key, "signal: agent returned empty response"); + return Ok(json!({ + "status_code": 500, + "body": { "error": "Empty agent response" } + })); + } + + let api_url = get_secret(iii, "SIGNAL_API_URL").await; + let phone = get_secret(iii, "SIGNAL_PHONE").await; + if let Err(e) = send_message(client, &api_url, &phone, &source, reply, group_id.as_deref()).await + { + tracing::error!(channel_key = %channel_key, error = %e, "failed to send Signal reply"); + } + + let audit_iii = iii.clone(); + let source_for_audit = source.clone(); + let group_for_audit = group_id.clone(); + let agent_for_audit = agent_id.clone(); + tokio::spawn(async move { + let _ = audit_iii + .trigger(TriggerRequest { + function_id: "security::audit".to_string(), + payload: json!({ + "type": "channel_message", + "agentId": agent_for_audit, + "detail": { + "channel": "signal", + "source": source_for_audit, + "groupId": group_for_audit + }, + }), + action: None, + timeout_ms: None, + }) + .await; + }); + + Ok(json!({ "status_code": 200, "body": { "ok": true } })) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + let ws_url = + std::env::var("III_WS_URL").unwrap_or_else(|_| "ws://localhost:49134".to_string()); + let iii = register_worker(&ws_url, InitOptions::default()); + let client = reqwest::Client::new(); + + let iii_clone = iii.clone(); + let client_clone = client.clone(); + iii.register_function( + RegisterFunction::new_async("channel::signal::webhook", move |input: Value| { + let iii = iii_clone.clone(); + let client = client_clone.clone(); + async move { handle_webhook(&iii, &client, input).await } + }) + .description("Handle Signal REST API bridge webhook"), + ); + + iii.register_trigger(RegisterTriggerInput { + trigger_type: "http".to_string(), + function_id: "channel::signal::webhook".to_string(), + config: json!({ "http_method": "POST", "api_path": "webhook/signal" }), + metadata: None, + })?; + + tracing::info!("channel-signal worker started"); + tokio::signal::ctrl_c().await?; + iii.shutdown_async().await; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn ignores_empty_envelope() { + let body = json!({ "envelope": {} }); + let text = body + .pointer("/envelope/dataMessage/message") + .and_then(|v| v.as_str()) + .unwrap_or(""); + assert!(text.is_empty()); + } + + #[test] + fn extracts_direct_message() { + let body = json!({ + "envelope": { + "source": "+15551234567", + "dataMessage": { "message": "hi" } + } + }); + let text = body + .pointer("/envelope/dataMessage/message") + .and_then(|v| v.as_str()) + .unwrap_or(""); + assert_eq!(text, "hi"); + } + + #[test] + fn extracts_group_id() { + let body = json!({ + "envelope": { + "source": "+1", + "dataMessage": { + "message": "g", + "groupInfo": { "groupId": "GRP1" } + } + } + }); + let gid = body + .pointer("/envelope/dataMessage/groupInfo/groupId") + .and_then(|v| v.as_str()); + assert_eq!(gid, Some("GRP1")); + } + + #[test] + fn group_id_used_as_channel_key_when_present() { + let group_id: Option = Some("GRP".into()); + let source = "+1".to_string(); + let key = group_id.clone().unwrap_or_else(|| source.clone()); + assert_eq!(key, "GRP"); + } + + #[test] + fn source_used_as_channel_key_when_no_group() { + let group_id: Option = None; + let source = "+5551".to_string(); + let key = group_id.clone().unwrap_or_else(|| source.clone()); + assert_eq!(key, "+5551"); + } + + #[test] + fn session_id_format() { + let key = "+1"; + assert_eq!(format!("signal:{key}"), "signal:+1"); + } + + #[test] + fn split_short_text_returns_single_chunk() { + assert_eq!(split_message("hi", 4096), vec!["hi".to_string()]); + } + + #[test] + fn split_preserves_total_length() { + let text = "x".repeat(10_000); + let chunks = split_message(&text, 4096); + assert_eq!(chunks.concat(), text); + } + + #[test] + fn api_url_trailing_slash_normalized() { + let url = "http://signal/"; + let full = format!("{}/v2/send", url.trim_end_matches('/')); + assert_eq!(full, "http://signal/v2/send"); + } +} diff --git a/workers/channel-teams/Cargo.toml b/workers/channel-teams/Cargo.toml new file mode 100644 index 0000000..09fafe2 --- /dev/null +++ b/workers/channel-teams/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "agentos-channel-teams" +version.workspace = true +edition.workspace = true +license.workspace = true + +[[bin]] +name = "agentos-channel-teams" +path = "src/main.rs" + +[dependencies] +iii-sdk.workspace = true +tokio.workspace = true +serde.workspace = true +serde_json.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true +anyhow.workspace = true +reqwest = { version = "0.12", features = ["json", "rustls-tls"], default-features = false } diff --git a/workers/channel-teams/iii.worker.yaml b/workers/channel-teams/iii.worker.yaml new file mode 100644 index 0000000..4980624 --- /dev/null +++ b/workers/channel-teams/iii.worker.yaml @@ -0,0 +1,7 @@ +iii: v1 +name: channel-teams +language: rust +deploy: binary +manifest: Cargo.toml +bin: agentos-channel-teams +description: "Microsoft Teams Bot Framework channel adapter" diff --git a/workers/channel-teams/src/main.rs b/workers/channel-teams/src/main.rs new file mode 100644 index 0000000..6a3d476 --- /dev/null +++ b/workers/channel-teams/src/main.rs @@ -0,0 +1,329 @@ +use iii_sdk::error::IIIError; +use iii_sdk::{III, InitOptions, RegisterFunction, RegisterTriggerInput, TriggerRequest, register_worker}; +use serde_json::{Value, json}; + +const AUTH_URL: &str = "https://login.microsoftonline.com/botframework.com/oauth2/v2.0/token"; +const MAX_MESSAGE_LEN: usize = 4096; + +async fn get_secret(iii: &III, key: &str) -> String { + let result = iii + .trigger(TriggerRequest { + function_id: "vault::get".to_string(), + payload: json!({ "key": key }), + action: None, + timeout_ms: None, + }) + .await; + if let Ok(value) = result + && let Some(v) = value.get("value").and_then(|v| v.as_str()) + && !v.is_empty() + { + return v.to_string(); + } + std::env::var(key).unwrap_or_default() +} + +async fn resolve_agent(iii: &III, channel: &str, channel_id: &str) -> String { + let key = format!("{channel}:{channel_id}"); + let result = iii + .trigger(TriggerRequest { + function_id: "state::get".to_string(), + payload: json!({ "scope": "channel_agents", "key": key }), + action: None, + timeout_ms: None, + }) + .await; + if let Ok(value) = result + && let Some(agent) = value.get("agentId").and_then(|v| v.as_str()) + { + return agent.to_string(); + } + "default".to_string() +} + +fn split_message(text: &str, max_len: usize) -> Vec { + if text.chars().count() <= max_len { + return vec![text.to_string()]; + } + let mut chunks: Vec = Vec::new(); + let mut remaining = text.to_string(); + while !remaining.is_empty() { + if remaining.chars().count() <= max_len { + chunks.push(remaining); + break; + } + let cutoff = remaining + .char_indices() + .nth(max_len) + .map(|(idx, _)| idx) + .unwrap_or(remaining.len()); + let window = &remaining[..cutoff]; + let split_at = match window.rfind('\n') { + Some(idx) if window[..idx].chars().count() > max_len / 2 => idx, + _ => cutoff, + }; + chunks.push(remaining[..split_at].to_string()); + remaining = remaining[split_at..].to_string(); + } + chunks +} + +/// Exchange app credentials for a Bot Framework access token. +async fn get_token(client: &reqwest::Client, app_id: &str, app_password: &str) -> Result { + if app_id.is_empty() || app_password.is_empty() { + return Err(IIIError::Handler("Missing Teams credentials".into())); + } + let params = [ + ("grant_type", "client_credentials"), + ("client_id", app_id), + ("client_secret", app_password), + ("scope", "https://api.botframework.com/.default"), + ]; + let resp = client + .post(AUTH_URL) + .form(¶ms) + .send() + .await + .map_err(|e| IIIError::Handler(format!("Teams token request failed: {e}")))?; + let status = resp.status(); + if !status.is_success() { + return Err(IIIError::Handler(format!("Token request failed: {status}"))); + } + let body: Value = resp + .json() + .await + .map_err(|e| IIIError::Handler(format!("Token decode: {e}")))?; + body.get("access_token") + .and_then(|v| v.as_str()) + .map(String::from) + .ok_or_else(|| IIIError::Handler("Token response missing access_token".into())) +} + +async fn send_message( + client: &reqwest::Client, + token: &str, + service_url: &str, + conversation_id: &str, + reply_to_id: &str, + text: &str, +) -> Result<(), IIIError> { + let url = format!( + "{}/v3/conversations/{}/activities", + service_url.trim_end_matches('/'), + conversation_id + ); + for chunk in split_message(text, MAX_MESSAGE_LEN) { + let resp = client + .post(&url) + .bearer_auth(token) + .json(&json!({ + "type": "message", + "text": chunk, + "replyToId": reply_to_id, + })) + .send() + .await + .map_err(|e| IIIError::Handler(format!("Teams send error: {e}")))?; + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + return Err(IIIError::Handler(format!( + "Teams send failed ({status}): {}", + body.chars().take(300).collect::() + ))); + } + } + Ok(()) +} + +async fn handle_webhook( + iii: &III, + client: &reqwest::Client, + req: Value, +) -> Result { + let activity = req.get("body").cloned().unwrap_or_else(|| req.clone()); + + if activity.get("type").and_then(|v| v.as_str()) != Some("message") { + return Ok(json!({ "status_code": 200, "body": { "ok": true } })); + } + + let conversation_id = activity + .get("conversation") + .and_then(|c| c.get("id")) + .and_then(|v| v.as_str()) + .unwrap_or_default() + .to_string(); + let text = activity.get("text").and_then(|v| v.as_str()).unwrap_or("").to_string(); + let user_id = activity + .get("from") + .and_then(|f| f.get("id")) + .and_then(|v| v.as_str()) + .unwrap_or_default() + .to_string(); + let service_url = activity + .get("serviceUrl") + .and_then(|v| v.as_str()) + .unwrap_or_default() + .to_string(); + let activity_id = activity + .get("id") + .and_then(|v| v.as_str()) + .unwrap_or_default() + .to_string(); + + let agent_id = resolve_agent(iii, "teams", &conversation_id).await; + + let chat = iii + .trigger(TriggerRequest { + function_id: "agent::chat".to_string(), + payload: json!({ + "agentId": agent_id, + "message": text, + "sessionId": format!("teams:{conversation_id}"), + }), + action: None, + timeout_ms: None, + }) + .await + .map_err(|e| IIIError::Handler(format!("agent::chat failed: {e}")))?; + + let reply = chat.get("content").and_then(|v| v.as_str()).unwrap_or(""); + if !reply.is_empty() { + let app_id = get_secret(iii, "TEAMS_APP_ID").await; + let app_password = get_secret(iii, "TEAMS_APP_PASSWORD").await; + match get_token(client, &app_id, &app_password).await { + Ok(token) => { + if let Err(e) = send_message( + client, + &token, + &service_url, + &conversation_id, + &activity_id, + reply, + ) + .await + { + tracing::error!(conversation = %conversation_id, error = %e, "failed to send Teams reply"); + } + } + Err(e) => { + tracing::error!(error = %e, "failed to acquire Teams token"); + } + } + } + + let audit_iii = iii.clone(); + let conv_for_audit = conversation_id.clone(); + let user_for_audit = user_id.clone(); + let agent_for_audit = agent_id.clone(); + tokio::spawn(async move { + let _ = audit_iii + .trigger(TriggerRequest { + function_id: "security::audit".to_string(), + payload: json!({ + "type": "channel_message", + "agentId": agent_for_audit, + "detail": { + "channel": "teams", + "conversationId": conv_for_audit, + "userId": user_for_audit + }, + }), + action: None, + timeout_ms: None, + }) + .await; + }); + + Ok(json!({ "status_code": 200, "body": { "ok": true } })) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + let ws_url = + std::env::var("III_WS_URL").unwrap_or_else(|_| "ws://localhost:49134".to_string()); + let iii = register_worker(&ws_url, InitOptions::default()); + let client = reqwest::Client::new(); + + let iii_clone = iii.clone(); + let client_clone = client.clone(); + iii.register_function( + RegisterFunction::new_async("channel::teams::webhook", move |input: Value| { + let iii = iii_clone.clone(); + let client = client_clone.clone(); + async move { handle_webhook(&iii, &client, input).await } + }) + .description("Handle Microsoft Teams Bot Framework webhook"), + ); + + iii.register_trigger(RegisterTriggerInput { + trigger_type: "http".to_string(), + function_id: "channel::teams::webhook".to_string(), + config: json!({ "http_method": "POST", "api_path": "webhook/teams" }), + metadata: None, + })?; + + tracing::info!("channel-teams worker started"); + tokio::signal::ctrl_c().await?; + iii.shutdown_async().await; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn ignores_non_message_activity() { + let activity = json!({ "type": "conversationUpdate" }); + assert_ne!(activity.get("type").and_then(|v| v.as_str()), Some("message")); + } + + #[test] + fn detects_message_activity() { + let activity = json!({ "type": "message", "text": "hello" }); + assert_eq!(activity.get("type").and_then(|v| v.as_str()), Some("message")); + } + + #[test] + fn extracts_conversation_id() { + let activity = json!({ "type": "message", "conversation": { "id": "conv-1" } }); + let id = activity + .pointer("/conversation/id") + .and_then(|v| v.as_str()) + .unwrap_or_default(); + assert_eq!(id, "conv-1"); + } + + #[test] + fn session_id_format() { + let conv = "conv-2"; + assert_eq!(format!("teams:{conv}"), "teams:conv-2"); + } + + #[test] + fn split_short_text_returns_single_chunk() { + assert_eq!(split_message("hi", 4096), vec!["hi".to_string()]); + } + + #[test] + fn split_preserves_total_length() { + let text = "x".repeat(10_000); + let chunks = split_message(&text, 4096); + assert_eq!(chunks.concat(), text); + } + + #[test] + fn url_with_trailing_slash_is_normalized() { + let svc = "https://service.test.com/"; + let conv = "C1"; + let url = format!( + "{}/v3/conversations/{}/activities", + svc.trim_end_matches('/'), + conv + ); + assert_eq!(url, "https://service.test.com/v3/conversations/C1/activities"); + } +} diff --git a/workers/channel-webex/Cargo.toml b/workers/channel-webex/Cargo.toml new file mode 100644 index 0000000..ce2d395 --- /dev/null +++ b/workers/channel-webex/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "agentos-channel-webex" +version.workspace = true +edition.workspace = true +license.workspace = true + +[[bin]] +name = "agentos-channel-webex" +path = "src/main.rs" + +[dependencies] +iii-sdk.workspace = true +tokio.workspace = true +serde.workspace = true +serde_json.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true +anyhow.workspace = true +reqwest = { version = "0.12", features = ["json", "rustls-tls"], default-features = false } diff --git a/workers/channel-webex/iii.worker.yaml b/workers/channel-webex/iii.worker.yaml new file mode 100644 index 0000000..e2628f4 --- /dev/null +++ b/workers/channel-webex/iii.worker.yaml @@ -0,0 +1,7 @@ +iii: v1 +name: channel-webex +language: rust +deploy: binary +manifest: Cargo.toml +bin: agentos-channel-webex +description: "Cisco Webex channel adapter — webhook + Webex REST API" diff --git a/workers/channel-webex/src/main.rs b/workers/channel-webex/src/main.rs new file mode 100644 index 0000000..19ef1b4 --- /dev/null +++ b/workers/channel-webex/src/main.rs @@ -0,0 +1,292 @@ +use iii_sdk::error::IIIError; +use iii_sdk::{III, InitOptions, RegisterFunction, RegisterTriggerInput, TriggerRequest, register_worker}; +use serde_json::{Value, json}; + +const API_URL: &str = "https://webexapis.com/v1"; +const MAX_MESSAGE_LEN: usize = 7439; + +async fn get_secret(iii: &III, key: &str) -> String { + let result = iii + .trigger(TriggerRequest { + function_id: "vault::get".to_string(), + payload: json!({ "key": key }), + action: None, + timeout_ms: None, + }) + .await; + if let Ok(value) = result + && let Some(v) = value.get("value").and_then(|v| v.as_str()) + && !v.is_empty() + { + return v.to_string(); + } + std::env::var(key).unwrap_or_default() +} + +async fn resolve_agent(iii: &III, channel: &str, channel_id: &str) -> String { + let key = format!("{channel}:{channel_id}"); + let result = iii + .trigger(TriggerRequest { + function_id: "state::get".to_string(), + payload: json!({ "scope": "channel_agents", "key": key }), + action: None, + timeout_ms: None, + }) + .await; + if let Ok(value) = result + && let Some(agent) = value.get("agentId").and_then(|v| v.as_str()) + { + return agent.to_string(); + } + "default".to_string() +} + +fn split_message(text: &str, max_len: usize) -> Vec { + if text.chars().count() <= max_len { + return vec![text.to_string()]; + } + let mut chunks: Vec = Vec::new(); + let mut remaining = text.to_string(); + while !remaining.is_empty() { + if remaining.chars().count() <= max_len { + chunks.push(remaining); + break; + } + let cutoff = remaining + .char_indices() + .nth(max_len) + .map(|(idx, _)| idx) + .unwrap_or(remaining.len()); + let window = &remaining[..cutoff]; + let split_at = match window.rfind('\n') { + Some(idx) if window[..idx].chars().count() > max_len / 2 => idx, + _ => cutoff, + }; + chunks.push(remaining[..split_at].to_string()); + remaining = remaining[split_at..].to_string(); + } + chunks +} + +async fn fetch_message( + client: &reqwest::Client, + token: &str, + message_id: &str, +) -> Result, IIIError> { + let resp = client + .get(format!("{API_URL}/messages/{message_id}")) + .bearer_auth(token) + .send() + .await + .map_err(|e| IIIError::Handler(format!("Webex fetch error: {e}")))?; + if !resp.status().is_success() { + return Ok(None); + } + let body: Value = resp + .json() + .await + .map_err(|e| IIIError::Handler(format!("Webex decode: {e}")))?; + Ok(body + .get("text") + .and_then(|v| v.as_str()) + .map(String::from)) +} + +async fn send_message( + client: &reqwest::Client, + token: &str, + room_id: &str, + text: &str, +) -> Result<(), IIIError> { + for chunk in split_message(text, MAX_MESSAGE_LEN) { + let resp = client + .post(format!("{API_URL}/messages")) + .bearer_auth(token) + .json(&json!({ "roomId": room_id, "text": chunk })) + .send() + .await + .map_err(|e| IIIError::Handler(format!("Webex send error: {e}")))?; + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + return Err(IIIError::Handler(format!( + "Webex send failed ({status}): {}", + body.chars().take(300).collect::() + ))); + } + } + Ok(()) +} + +async fn handle_webhook( + iii: &III, + client: &reqwest::Client, + req: Value, +) -> Result { + let body = req.get("body").cloned().unwrap_or_else(|| req.clone()); + + let resource = body.get("resource").and_then(|v| v.as_str()).unwrap_or(""); + let event = body.get("event").and_then(|v| v.as_str()).unwrap_or(""); + if resource != "messages" || event != "created" { + return Ok(json!({ "status_code": 200, "body": { "ok": true } })); + } + + let message_id = body + .pointer("/data/id") + .and_then(|v| v.as_str()) + .unwrap_or_default() + .to_string(); + let room_id = body + .pointer("/data/roomId") + .and_then(|v| v.as_str()) + .unwrap_or_default() + .to_string(); + let person_id = body + .pointer("/data/personId") + .and_then(|v| v.as_str()) + .unwrap_or_default() + .to_string(); + + let webex_token = get_secret(iii, "WEBEX_TOKEN").await; + if webex_token.is_empty() { + return Ok(json!({ + "status_code": 500, + "body": { "error": "WEBEX_TOKEN not configured" } + })); + } + + let text = match fetch_message(client, &webex_token, &message_id).await? { + Some(t) if !t.is_empty() => t, + _ => return Ok(json!({ "status_code": 200, "body": { "ok": true } })), + }; + + let agent_id = resolve_agent(iii, "webex", &room_id).await; + + let chat = iii + .trigger(TriggerRequest { + function_id: "agent::chat".to_string(), + payload: json!({ + "agentId": agent_id, + "message": text, + "sessionId": format!("webex:{room_id}"), + }), + action: None, + timeout_ms: None, + }) + .await + .map_err(|e| IIIError::Handler(format!("agent::chat failed: {e}")))?; + + let reply = chat.get("content").and_then(|v| v.as_str()).unwrap_or(""); + if !reply.is_empty() + && let Err(e) = send_message(client, &webex_token, &room_id, reply).await + { + tracing::error!(room = %room_id, error = %e, "failed to send Webex reply"); + } + + let audit_iii = iii.clone(); + let room_for_audit = room_id.clone(); + let person_for_audit = person_id.clone(); + let agent_for_audit = agent_id.clone(); + tokio::spawn(async move { + let _ = audit_iii + .trigger(TriggerRequest { + function_id: "security::audit".to_string(), + payload: json!({ + "type": "channel_message", + "agentId": agent_for_audit, + "detail": { + "channel": "webex", + "roomId": room_for_audit, + "personId": person_for_audit + }, + }), + action: None, + timeout_ms: None, + }) + .await; + }); + + Ok(json!({ "status_code": 200, "body": { "ok": true } })) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + let ws_url = + std::env::var("III_WS_URL").unwrap_or_else(|_| "ws://localhost:49134".to_string()); + let iii = register_worker(&ws_url, InitOptions::default()); + let client = reqwest::Client::new(); + + let iii_clone = iii.clone(); + let client_clone = client.clone(); + iii.register_function( + RegisterFunction::new_async("channel::webex::webhook", move |input: Value| { + let iii = iii_clone.clone(); + let client = client_clone.clone(); + async move { handle_webhook(&iii, &client, input).await } + }) + .description("Handle Cisco Webex webhook"), + ); + + iii.register_trigger(RegisterTriggerInput { + trigger_type: "http".to_string(), + function_id: "channel::webex::webhook".to_string(), + config: json!({ "http_method": "POST", "api_path": "webhook/webex" }), + metadata: None, + })?; + + tracing::info!("channel-webex worker started"); + tokio::signal::ctrl_c().await?; + iii.shutdown_async().await; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn ignores_non_message_resource() { + let body = json!({ "resource": "memberships", "event": "created" }); + let resource = body.get("resource").and_then(|v| v.as_str()).unwrap_or(""); + assert_ne!(resource, "messages"); + } + + #[test] + fn ignores_non_created_event() { + let body = json!({ "resource": "messages", "event": "deleted" }); + let event = body.get("event").and_then(|v| v.as_str()).unwrap_or(""); + assert_ne!(event, "created"); + } + + #[test] + fn extracts_data_fields() { + let body = json!({ + "resource": "messages", + "event": "created", + "data": { "id": "M1", "roomId": "R1", "personId": "P1" } + }); + assert_eq!(body.pointer("/data/id").and_then(|v| v.as_str()), Some("M1")); + assert_eq!(body.pointer("/data/roomId").and_then(|v| v.as_str()), Some("R1")); + assert_eq!(body.pointer("/data/personId").and_then(|v| v.as_str()), Some("P1")); + } + + #[test] + fn split_short_text_returns_single_chunk() { + assert_eq!(split_message("hi", 7439), vec!["hi".to_string()]); + } + + #[test] + fn split_preserves_total_length() { + let text = "x".repeat(20_000); + let chunks = split_message(&text, 7439); + assert_eq!(chunks.concat(), text); + } + + #[test] + fn session_id_format() { + let room = "R1"; + assert_eq!(format!("webex:{room}"), "webex:R1"); + } +} diff --git a/workers/channel-whatsapp/Cargo.toml b/workers/channel-whatsapp/Cargo.toml new file mode 100644 index 0000000..984425f --- /dev/null +++ b/workers/channel-whatsapp/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "agentos-channel-whatsapp" +version.workspace = true +edition.workspace = true +license.workspace = true + +[[bin]] +name = "agentos-channel-whatsapp" +path = "src/main.rs" + +[dependencies] +iii-sdk.workspace = true +tokio.workspace = true +serde.workspace = true +serde_json.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true +anyhow.workspace = true +reqwest = { version = "0.12", features = ["json", "rustls-tls"], default-features = false } diff --git a/workers/channel-whatsapp/iii.worker.yaml b/workers/channel-whatsapp/iii.worker.yaml new file mode 100644 index 0000000..2d9571c --- /dev/null +++ b/workers/channel-whatsapp/iii.worker.yaml @@ -0,0 +1,7 @@ +iii: v1 +name: channel-whatsapp +language: rust +deploy: binary +manifest: Cargo.toml +bin: agentos-channel-whatsapp +description: "WhatsApp Business API channel adapter — inbound webhook + outbound Graph API" diff --git a/workers/channel-whatsapp/src/main.rs b/workers/channel-whatsapp/src/main.rs new file mode 100644 index 0000000..250a077 --- /dev/null +++ b/workers/channel-whatsapp/src/main.rs @@ -0,0 +1,306 @@ +use iii_sdk::error::IIIError; +use iii_sdk::{III, InitOptions, RegisterFunction, RegisterTriggerInput, TriggerRequest, register_worker}; +use serde_json::{Value, json}; + +const WHATSAPP_API_BASE: &str = "https://graph.facebook.com/v18.0"; +const MAX_MESSAGE_LEN: usize = 4096; + +/// Get a secret from `vault::get` first, falling back to env var. +async fn get_secret(iii: &III, key: &str) -> String { + let result = iii + .trigger(TriggerRequest { + function_id: "vault::get".to_string(), + payload: json!({ "key": key }), + action: None, + timeout_ms: None, + }) + .await; + if let Ok(value) = result + && let Some(v) = value.get("value").and_then(|v| v.as_str()) + && !v.is_empty() + { + return v.to_string(); + } + std::env::var(key).unwrap_or_default() +} + +async fn resolve_agent(iii: &III, channel: &str, channel_id: &str) -> String { + let key = format!("{channel}:{channel_id}"); + let result = iii + .trigger(TriggerRequest { + function_id: "state::get".to_string(), + payload: json!({ "scope": "channel_agents", "key": key }), + action: None, + timeout_ms: None, + }) + .await; + if let Ok(value) = result + && let Some(agent) = value.get("agentId").and_then(|v| v.as_str()) + { + return agent.to_string(); + } + "default".to_string() +} + +/// UTF-8-safe split into max-`max_len` char chunks, breaking on newline when reasonable. +fn split_message(text: &str, max_len: usize) -> Vec { + if text.chars().count() <= max_len { + return vec![text.to_string()]; + } + let mut chunks: Vec = Vec::new(); + let mut remaining = text.to_string(); + while !remaining.is_empty() { + if remaining.chars().count() <= max_len { + chunks.push(remaining); + break; + } + let cutoff = remaining + .char_indices() + .nth(max_len) + .map(|(idx, _)| idx) + .unwrap_or(remaining.len()); + let window = &remaining[..cutoff]; + let split_at = match window.rfind('\n') { + Some(idx) if window[..idx].chars().count() > max_len / 2 => idx, + _ => cutoff, + }; + chunks.push(remaining[..split_at].to_string()); + remaining = remaining[split_at..].to_string(); + } + chunks +} + +async fn send_message( + client: &reqwest::Client, + token: &str, + phone_id: &str, + to: &str, + text: &str, +) -> Result<(), IIIError> { + if token.is_empty() { + return Err(IIIError::Handler("WHATSAPP_TOKEN not configured".into())); + } + if phone_id.is_empty() { + return Err(IIIError::Handler("WHATSAPP_PHONE_ID not configured".into())); + } + let url = format!("{WHATSAPP_API_BASE}/{phone_id}/messages"); + for chunk in split_message(text, MAX_MESSAGE_LEN) { + let resp = client + .post(&url) + .bearer_auth(token) + .json(&json!({ + "messaging_product": "whatsapp", + "to": to, + "type": "text", + "text": { "body": chunk }, + })) + .send() + .await + .map_err(|e| IIIError::Handler(format!("WhatsApp send error: {e}")))?; + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + return Err(IIIError::Handler(format!( + "WhatsApp send failed ({status}): {}", + body.chars().take(300).collect::() + ))); + } + } + Ok(()) +} + +async fn handle_webhook( + iii: &III, + client: &reqwest::Client, + req: Value, +) -> Result { + let body = req.get("body").cloned().unwrap_or_else(|| req.clone()); + + if body.get("object").and_then(|v| v.as_str()) != Some("whatsapp_business_account") { + return Ok(json!({ "status_code": 200, "body": { "ok": true } })); + } + + let message = body + .get("entry") + .and_then(|e| e.get(0)) + .and_then(|e| e.get("changes")) + .and_then(|c| c.get(0)) + .and_then(|c| c.get("value")) + .and_then(|v| v.get("messages")) + .and_then(|m| m.get(0)); + + let Some(message) = message else { + return Ok(json!({ "status_code": 200, "body": { "ok": true } })); + }; + + let text = message + .get("text") + .and_then(|t| t.get("body")) + .and_then(|b| b.as_str()) + .unwrap_or(""); + if text.is_empty() { + return Ok(json!({ "status_code": 200, "body": { "ok": true } })); + } + + let from = message + .get("from") + .and_then(|v| v.as_str()) + .unwrap_or_default() + .to_string(); + let agent_id = resolve_agent(iii, "whatsapp", &from).await; + + let chat = iii + .trigger(TriggerRequest { + function_id: "agent::chat".to_string(), + payload: json!({ + "agentId": agent_id, + "message": text, + "sessionId": format!("whatsapp:{from}"), + }), + action: None, + timeout_ms: None, + }) + .await + .map_err(|e| IIIError::Handler(format!("agent::chat failed: {e}")))?; + + let reply = chat.get("content").and_then(|v| v.as_str()).unwrap_or(""); + if !reply.is_empty() { + let token = get_secret(iii, "WHATSAPP_TOKEN").await; + let phone_id = get_secret(iii, "WHATSAPP_PHONE_ID").await; + if let Err(e) = send_message(client, &token, &phone_id, &from, reply).await { + tracing::error!(to = %from, error = %e, "failed to send WhatsApp reply"); + } + } + + let audit_iii = iii.clone(); + let from_for_audit = from.clone(); + let agent_for_audit = agent_id.clone(); + tokio::spawn(async move { + let _ = audit_iii + .trigger(TriggerRequest { + function_id: "security::audit".to_string(), + payload: json!({ + "type": "channel_message", + "agentId": agent_for_audit, + "detail": { "channel": "whatsapp", "from": from_for_audit }, + }), + action: None, + timeout_ms: None, + }) + .await; + }); + + Ok(json!({ "status_code": 200, "body": { "ok": true } })) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + + let ws_url = + std::env::var("III_WS_URL").unwrap_or_else(|_| "ws://localhost:49134".to_string()); + let iii = register_worker(&ws_url, InitOptions::default()); + let client = reqwest::Client::new(); + + let iii_clone = iii.clone(); + let client_clone = client.clone(); + iii.register_function( + RegisterFunction::new_async("channel::whatsapp::webhook", move |input: Value| { + let iii = iii_clone.clone(); + let client = client_clone.clone(); + async move { handle_webhook(&iii, &client, input).await } + }) + .description("Handle WhatsApp Business API webhook"), + ); + + iii.register_trigger(RegisterTriggerInput { + trigger_type: "http".to_string(), + function_id: "channel::whatsapp::webhook".to_string(), + config: json!({ "http_method": "POST", "api_path": "webhook/whatsapp" }), + metadata: None, + })?; + + tracing::info!("channel-whatsapp worker started"); + tokio::signal::ctrl_c().await?; + iii.shutdown_async().await; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn split_short_text_returns_single_chunk() { + let chunks = split_message("hi", 4096); + assert_eq!(chunks, vec!["hi".to_string()]); + } + + #[test] + fn split_preserves_total_length() { + let text = "x".repeat(10_000); + let chunks = split_message(&text, 4096); + let joined: String = chunks.concat(); + assert_eq!(joined, text); + } + + #[test] + fn split_handles_multibyte_chars() { + let text: String = "🦀".repeat(10); + let chunks = split_message(&text, 3); + let joined: String = chunks.concat(); + assert_eq!(joined, text); + } + + #[test] + fn ignores_non_whatsapp_object() { + let body = json!({ "object": "page" }); + assert_ne!( + body.get("object").and_then(|v| v.as_str()), + Some("whatsapp_business_account") + ); + } + + #[test] + fn extracts_text_from_nested_payload() { + let body = json!({ + "object": "whatsapp_business_account", + "entry": [{ + "changes": [{ + "value": { + "messages": [{ + "from": "15551234567", + "text": { "body": "hello" } + }] + } + }] + }] + }); + let text = body + .get("entry") + .and_then(|e| e.get(0)) + .and_then(|e| e.get("changes")) + .and_then(|c| c.get(0)) + .and_then(|c| c.get("value")) + .and_then(|v| v.get("messages")) + .and_then(|m| m.get(0)) + .and_then(|m| m.get("text")) + .and_then(|t| t.get("body")) + .and_then(|b| b.as_str()) + .unwrap_or(""); + assert_eq!(text, "hello"); + } + + #[test] + fn missing_text_yields_empty() { + let body = json!({ + "object": "whatsapp_business_account", + "entry": [{ "changes": [{ "value": { "messages": [{ "from": "1" }] } }] }] + }); + let text = body + .pointer("/entry/0/changes/0/value/messages/0/text/body") + .and_then(|b| b.as_str()) + .unwrap_or(""); + assert!(text.is_empty()); + } +}