-
Notifications
You must be signed in to change notification settings - Fork 101
Add MCP Streamable HTTP specification support for the client #210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
09637b6
4765528
bd55244
03a76cd
3b99748
137929c
b91a9db
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,57 +1,35 @@ | ||||||
| # frozen_string_literal: true | ||||||
|
|
||||||
| require "mcp" | ||||||
| require "mcp/client" | ||||||
| require "mcp/client/http" | ||||||
| require "mcp/client/tool" | ||||||
| require "net/http" | ||||||
| require "uri" | ||||||
| require "json" | ||||||
| require "logger" | ||||||
|
|
||||||
| # Logger for client operations | ||||||
| logger = Logger.new($stdout) | ||||||
| logger.formatter = proc do |severity, datetime, _progname, msg| | ||||||
| "[CLIENT] #{severity} #{datetime.strftime("%H:%M:%S.%L")} - #{msg}\n" | ||||||
| end | ||||||
| SERVER_URL = "http://localhost:9393" | ||||||
|
|
||||||
| # Server configuration | ||||||
| SERVER_URL = "http://localhost:9393/mcp" | ||||||
| PROTOCOL_VERSION = "2024-11-05" | ||||||
|
|
||||||
| # Helper method to make JSON-RPC requests | ||||||
| def make_request(session_id, method, params = {}, id = nil) | ||||||
| uri = URI(SERVER_URL) | ||||||
| http = Net::HTTP.new(uri.host, uri.port) | ||||||
|
|
||||||
| request = Net::HTTP::Post.new(uri) | ||||||
| request["Content-Type"] = "application/json" | ||||||
| request["Mcp-Session-Id"] = session_id if session_id | ||||||
|
|
||||||
| body = { | ||||||
| jsonrpc: "2.0", | ||||||
| method: method, | ||||||
| params: params, | ||||||
| id: id || SecureRandom.uuid, | ||||||
| } | ||||||
|
|
||||||
| request.body = body.to_json | ||||||
| response = http.request(request) | ||||||
|
|
||||||
| { | ||||||
| status: response.code, | ||||||
| headers: response.to_hash, | ||||||
| body: JSON.parse(response.body), | ||||||
| } | ||||||
| rescue => e | ||||||
| { error: e.message } | ||||||
| # Logger for client operations | ||||||
| def create_logger | ||||||
| logger = Logger.new($stdout) | ||||||
| logger.formatter = proc do |severity, datetime, _progname, msg| | ||||||
| "[CLIENT] #{severity} #{datetime.strftime("%H:%M:%S.%L")} - #{msg}\n" | ||||||
| end | ||||||
| logger | ||||||
| end | ||||||
|
|
||||||
| # Connect to SSE stream | ||||||
| # Connect to SSE stream for real-time notifications | ||||||
| # Note: The SDK doesn't support SSE streaming yet, so we use raw Net::HTTP | ||||||
| def connect_sse(session_id, logger) | ||||||
| uri = URI(SERVER_URL) | ||||||
|
|
||||||
| logger.info("Connecting to SSE stream...") | ||||||
|
|
||||||
| Net::HTTP.start(uri.host, uri.port) do |http| | ||||||
| request = Net::HTTP::Get.new(uri) | ||||||
| request["Mcp-Session-Id"] = session_id | ||||||
| request["MCP-Session-Id"] = session_id | ||||||
| request["Accept"] = "text/event-stream" | ||||||
| request["Cache-Control"] = "no-cache" | ||||||
|
|
||||||
|
|
@@ -62,14 +40,10 @@ def connect_sse(session_id, logger) | |||||
| response.read_body do |chunk| | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's use https://rubygems.org/gems/event_stream_parser for parsing. SSE appears easy to parse but it has weird edge cases. Thanks for kicking off the effort! I've been planning on porting this standalone client I had written for a project, but never got around to doing it: https://gist.github.com/atesgoral/75172912b5951d9be33497b80aba4397 You can see how |
||||||
| chunk.split("\n").each do |line| | ||||||
| if line.start_with?("data: ") | ||||||
| data = line[6..-1] | ||||||
| begin | ||||||
| logger.info("SSE data: #{data}") | ||||||
| rescue JSON::ParserError | ||||||
| logger.debug("Non-JSON SSE data: #{data}") | ||||||
| end | ||||||
| data = line[6..] | ||||||
| logger.info("SSE event: #{data}") | ||||||
| elsif line.start_with?(": ") | ||||||
| logger.debug("SSE keepalive received: #{line}") | ||||||
| logger.debug("SSE keepalive: #{line}") | ||||||
| end | ||||||
| end | ||||||
| end | ||||||
|
|
@@ -79,125 +53,126 @@ def connect_sse(session_id, logger) | |||||
| end | ||||||
| end | ||||||
| rescue Interrupt | ||||||
| logger.info("SSE connection interrupted by user") | ||||||
| logger.info("SSE connection interrupted") | ||||||
| rescue => e | ||||||
| logger.error("SSE connection error: #{e.message}") | ||||||
| end | ||||||
|
|
||||||
| # Main client flow | ||||||
| def main | ||||||
| logger = Logger.new($stdout) | ||||||
| logger.formatter = proc do |severity, datetime, _progname, msg| | ||||||
| "[CLIENT] #{severity} #{datetime.strftime("%H:%M:%S.%L")} - #{msg}\n" | ||||||
| end | ||||||
|
|
||||||
| puts "=== MCP SSE Test Client ===" | ||||||
|
|
||||||
| # Step 1: Initialize session | ||||||
| logger.info("Initializing session...") | ||||||
|
|
||||||
| init_response = make_request( | ||||||
| nil, | ||||||
| "initialize", | ||||||
| { | ||||||
| protocolVersion: PROTOCOL_VERSION, | ||||||
| capabilities: {}, | ||||||
| clientInfo: { | ||||||
| name: "sse-test-client", | ||||||
| version: "1.0", | ||||||
| }, | ||||||
| }, | ||||||
| "init-1", | ||||||
| ) | ||||||
|
|
||||||
| if init_response[:error] | ||||||
| logger.error("Failed to initialize: #{init_response[:error]}") | ||||||
| exit(1) | ||||||
| end | ||||||
|
|
||||||
| session_id = init_response[:headers]["mcp-session-id"]&.first | ||||||
|
|
||||||
| if session_id.nil? | ||||||
| logger.error("No session ID received") | ||||||
| exit(1) | ||||||
| end | ||||||
|
|
||||||
| logger.info("Session initialized: #{session_id}") | ||||||
| logger.info("Server info: #{init_response[:body]["result"]["serverInfo"]}") | ||||||
|
|
||||||
| # Step 2: Start SSE connection in a separate thread | ||||||
| sse_thread = Thread.new { connect_sse(session_id, logger) } | ||||||
|
|
||||||
| # Give SSE time to connect | ||||||
| sleep(1) | ||||||
|
|
||||||
| # Step 3: Interactive menu | ||||||
| loop do | ||||||
| puts <<~MESSAGE.chomp | ||||||
|
|
||||||
| === Available Actions === | ||||||
| 1. Send custom notification | ||||||
| 2. Test echo | ||||||
| 3. List tools | ||||||
| 0. Exit | ||||||
|
|
||||||
| Choose an action:#{" "} | ||||||
| MESSAGE | ||||||
|
|
||||||
| choice = gets.chomp | ||||||
|
|
||||||
| case choice | ||||||
| when "1" | ||||||
| print("Enter notification message: ") | ||||||
| message = gets.chomp | ||||||
| print("Enter delay in seconds (0 for immediate): ") | ||||||
| delay = gets.chomp.to_f | ||||||
|
|
||||||
| response = make_request( | ||||||
| session_id, | ||||||
| "tools/call", | ||||||
| { | ||||||
| name: "notification_tool", | ||||||
| arguments: { | ||||||
| message: message, | ||||||
| delay: delay, | ||||||
| }, | ||||||
| }, | ||||||
| ) | ||||||
| if response[:body]["accepted"] | ||||||
| logger.info("Notification sent successfully") | ||||||
| logger = create_logger | ||||||
|
|
||||||
| puts <<~MESSAGE | ||||||
| MCP Streamable HTTP Client (SDK + SSE) | ||||||
keisku marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
| Make sure the server is running (ruby examples/streamable_http_server.rb) | ||||||
| #{"=" * 60} | ||||||
| MESSAGE | ||||||
|
|
||||||
| # Initialize SDK client | ||||||
| transport = MCP::Client::HTTP.new(url: SERVER_URL) | ||||||
| client = MCP::Client.new(transport: transport) | ||||||
|
|
||||||
| begin | ||||||
| # Initialize session using SDK | ||||||
| puts "=== Initializing session ===" | ||||||
| init_response = client.connect( | ||||||
| client_info: { name: "streamable-http-client", version: "1.0" }, | ||||||
| ) | ||||||
| puts "Session ID: #{client.session_id}" | ||||||
| puts "Protocol Version: #{client.protocol_version}" | ||||||
| puts "Server Info: #{init_response.dig("result", "serverInfo")}" | ||||||
keisku marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
|
|
||||||
| # Get available tools BEFORE establishing SSE connection | ||||||
| # (Once SSE is active, server sends responses via SSE stream, not POST response) | ||||||
| puts "=== Listing tools ===" | ||||||
| tools = client.tools | ||||||
| tools.each { |t| puts " - #{t.name}: #{t.description}" } | ||||||
|
|
||||||
| echo_tool = tools.find { |t| t.name == "echo" } | ||||||
| notification_tool = tools.find { |t| t.name == "notification_tool" } | ||||||
|
|
||||||
| # Start SSE connection in a separate thread (uses raw HTTP) | ||||||
| # Note: After this, server responses will be sent via SSE, not POST | ||||||
| sse_thread = Thread.new { connect_sse(client.session_id, logger) } | ||||||
|
|
||||||
| # Give SSE time to connect | ||||||
| sleep(1) | ||||||
|
|
||||||
| # Interactive menu | ||||||
| loop do | ||||||
| puts <<~MENU.chomp | ||||||
|
|
||||||
| === Available Actions === | ||||||
| 1. Send notification (triggers SSE event) | ||||||
| 2. Echo message | ||||||
| 3. List tools | ||||||
| 0. Exit | ||||||
|
|
||||||
| Choose an action:#{" "} | ||||||
| MENU | ||||||
|
|
||||||
| choice = gets&.chomp | ||||||
|
||||||
| choice = gets&.chomp | |
| choice = gets.chomp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you look over the other similar parts as well?
Uh oh!
There was an error while loading. Please reload this page.