Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 80 additions & 48 deletions packages/visual-editor/src/a2/a2/opal-adk-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ import {
OPAL_BACKEND_API_PREFIX,
Outcome,
} from "@breadboard-ai/types";
import { err, toLLMContent } from "./utils.js";
import { err, ok, toLLMContent } from "./utils.js";
import { A2ModuleArgs } from "../runnable-module-factory.js";
import { iteratorFromStream } from "@breadboard-ai/utils";
import { ModelConstraint } from "../agent/functions/generate.js";
import { PidginTranslator } from "../agent/pidgin-translator.js";
import { AgentUI } from "../agent/ui.js";
import { AgentFileSystem } from "../agent/file-system.js";
Expand Down Expand Up @@ -66,6 +65,7 @@ export type PlanStep = {

export interface BuildStreamingRequestBodyOptions {
completedPrompt: LLMContent;
executionInputs?: LLMContent[];
modelConstraint?: string;
uiType?: string;
uiPrompt?: LLMContent;
Expand All @@ -76,23 +76,28 @@ export interface BuildStreamingRequestBodyOptions {

type StreamingRequestPart = {
text?: string;
partMetadata?: { input_name: string };
inline_data?: {
mime_type: string;
data: string;
};
file_data?: {
mime_type: string;
file_uri: string;
};
};

type StreamingRequestBody = {
session_id?: string;
objective?: LLMContent;
model_name?: string;
invocation_id?: string;
contents?: Array<{
execution_inputs?: Record<string, {
parts: StreamingRequestPart[];
role: string;
}>;
node_config?: {
node_api?: string;
};
agent_mode_node_config?: {
model_constraint?: string;
ui_type?: string;
ui_prompt?: LLMContent;
};
Expand Down Expand Up @@ -129,43 +134,76 @@ class OpalAdkStream {
}
}

buildStreamingRequestBody(
options: BuildStreamingRequestBodyOptions
): StreamingRequestBody {
buildStreamingRequestBody(options: BuildStreamingRequestBodyOptions): Outcome<StreamingRequestBody> {
const {
completedPrompt,
executionInputs,
uiType,
uiPrompt,
nodeApi,
invocationId,
sessionId,
} = options;
console.log("uiType: ", uiType);
const contents: NonNullable<StreamingRequestBody["contents"]> = [];

let textCount = 0;
const execution_inputs: NonNullable<StreamingRequestBody["execution_inputs"]> = {};
if (!completedPrompt.parts) {
console.error("opal-adk-stream: Missing required prompt.");
}
for (const part of completedPrompt.parts) {
if ("text" in part) {
textCount++;
contents.push({
parts: [
{
text: part.text,
partMetadata: { input_name: `text_${textCount}` },
},
],
const error = err("opal-adk-stream: Missing required prompt.");
console.error(error);
return error;
};
if (executionInputs) {
for (const content of executionInputs) {
if (!content.parts) {
const error = err("opal-adk-stream: Execution input has no parts.");
console.error(error);
return error;
}
if (content.parts.length > 1) {
const error = err("opal-adk-stream: Execution input has multiple part.");
console.error(error);
return error;
}
const part = content.parts[0];
let inputName = "";
const requestPart: StreamingRequestPart = {};

if ("text" in part) {
inputName = "input_text";
requestPart.text = part.text;
} else if ("inlineData" in part) {
const mime_type = part.inlineData.mimeType;
if (mime_type.startsWith("image/")) {
inputName = "input_image";
} else if (mime_type.startsWith("audio/")) {
inputName = "input_audio";
} else if (mime_type.startsWith("video/")) {
inputName = "input_video";
} else {
inputName = "input_data";
}
requestPart.inline_data = {
mime_type: part.inlineData.mimeType,
data: part.inlineData.data,
};
} else if ("fileData" in part) {
inputName = "input_file";
requestPart.file_data = {
mime_type: part.fileData.mimeType,
file_uri: part.fileData.fileUri,
};
} else {
continue;
}

execution_inputs[inputName] = {
parts: [requestPart],
role: "user",
});
};
}
}

const baseBody: StreamingRequestBody = {
objective: completedPrompt,
model_name: undefined,
contents,
execution_inputs,
};

// 'node-agent' is specifically for agent mode while any other
Expand Down Expand Up @@ -195,27 +233,18 @@ class OpalAdkStream {
}

async executeOpalAdkStream(
objective: LLMContent,
opalAdkAgent?: string,
params?: LLMContent[],
modelConstraint?: ModelConstraint,
uiType?: string,
uiPrompt?: LLMContent,
invocationId?: string,
sessionId?: string
): Promise<Outcome<LLMContent>> {
sessionId?: string): Promise<Outcome<LLMContent>> {
const ui = this.ui;

if (!params || params.length === 0) {
return err("opal-adk-stream: No params provided");
}
if (modelConstraint === undefined) {
modelConstraint = "none";
}
if (opalAdkAgent && !VALID_NODE_KEYS.includes(opalAdkAgent)) {
const error = err(
`opal-adk-stream: Invalid node key: ${opalAdkAgent}, ` +
`valid keys are: ${VALID_NODE_KEYS.join(", ")}`
);
const error = err(`opal-adk-stream: Invalid node key: ${opalAdkAgent}, ` +
`valid keys are: ${VALID_NODE_KEYS.join(", ")}`);
console.error(error);
return error;
}
Expand All @@ -224,16 +253,21 @@ class OpalAdkStream {
const baseUrl = OPAL_ADK_ENDPOINT;
const url = new URL(baseUrl);
url.searchParams.set("alt", "sse");
const requestBody = this.buildStreamingRequestBody({
completedPrompt: params[0],
const requestBodyOrError = this.buildStreamingRequestBody({
completedPrompt: objective,
executionInputs: params,
uiType,
uiPrompt,
nodeApi: opalAdkAgent,
invocationId,
sessionId,
sessionId
});

ui.progress.sendOpalAdkRequest("", requestBody);
if (!ok(requestBodyOrError)) {
return requestBodyOrError;
}
const requestBody = requestBodyOrError;
ui.progress.sendOpalAdkRequest("", requestBody)
const response = await this.moduleArgs.fetchWithCreds(url.toString(), {
method: "POST",
headers: { "Content-Type": "application/json" },
Expand All @@ -244,9 +278,7 @@ class OpalAdkStream {
console.log("response: ", response);
if (!response.ok) {
const errorText = await response.text();
const error = err(
`Streaming request failed: ${response.status} ${errorText}`
);
const error = err(`Streaming request failed: ${response.status} ${errorText}`);
console.error(error);
return error;
}
Expand Down
16 changes: 12 additions & 4 deletions packages/visual-editor/src/a2/agent/agent-adk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
} from "@breadboard-ai/types";
import { ok } from "@breadboard-ai/utils";
import { Template } from "../a2/template.js";
import { toLLMContent, isLLMContent } from "../a2/utils.js";
import { A2ModuleArgs } from "../runnable-module-factory.js";
import { OpalAdkStream, NODE_AGENT_KEY } from "../a2/opal-adk-stream.js";
import { AgentInputs, AgentOutputs, toAgentOutputs } from "./main.js"
Expand All @@ -27,18 +28,25 @@ export async function invokeAgentAdk(
const params = Object.fromEntries(
Object.entries(rest).filter(([key]) => key.startsWith("p-z-"))
);

const opalAdkStream = new OpalAdkStream(moduleArgs);
const uiType = enableA2UI ? "a2ui" : "chat";
const template = new Template(prompt_template, moduleArgs.context.currentGraph);
const template = new Template(prompt_template);
// TODO: This is going to need to be updated for Opal ADK subtitution.
const completed_prompt = await template.substitute(params);
if (!ok(completed_prompt)) {
return completed_prompt;
}
console.log("substitutine: ", completed_prompt);

const paramsArray = Object.entries(rest)
.filter(([key]) => key.startsWith("p-z-"))
.map(([_, value]) =>
isLLMContent(value) ? value : toLLMContent(String(value))
);
const results = await opalAdkStream.executeOpalAdkStream(
completed_prompt,
NODE_AGENT_KEY,
[completed_prompt],
"none",
paramsArray,
uiType,
uiPrompt,
invocation_id,
Expand Down
7 changes: 3 additions & 4 deletions packages/visual-editor/src/a2/deep-research/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,9 @@ async function invokeOpalAdk(
return substituting;
}
const opalAdkStream = new OpalAdkStream(moduleArgs);
const results = await opalAdkStream.executeOpalAdkStream(DEEP_RESEARCH_KEY, [
substituting,
]);
console.log("deep-research results", results);
const results = await opalAdkStream
.executeOpalAdkStream(substituting, DEEP_RESEARCH_KEY);
console.log("deep-research results", results)
return {
context: [...(context || []), results],
};
Expand Down
Loading