Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/config/defaultOptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export default {
prefix: "",
preLoadModules: "",
reloadHandler: false,
rubyWatchDirs: [],
resourceRoutes: false,
terminateIdleLambdaTime: 60,
useDocker: false,
Expand Down
2 changes: 1 addition & 1 deletion src/lambda/handler-runner/HandlerRunner.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ export default class HandlerRunner {
if (supportedRuby.has(runtime)) {
const { default: RubyRunner } = await import("./ruby-runner/index.js")

return new RubyRunner(this.#funOptions, this.#env)
return new RubyRunner(this.#funOptions, this.#env, this.#options)
}

if (supportedJava.has(runtime)) {
Expand Down
217 changes: 169 additions & 48 deletions src/lambda/handler-runner/ruby-runner/RubyRunner.js
Original file line number Diff line number Diff line change
@@ -1,36 +1,144 @@
import { spawn } from "node:child_process"
import { watch } from "node:fs"
import { EOL, platform } from "node:os"
import { relative } from "node:path"
import { cwd } from "node:process"
import { resolve, relative } from "node:path"
import process, { cwd, nextTick } from "node:process"
import { createInterface } from "node:readline"
import { join } from "desm"
import { execa } from "execa"
import { log } from "../../../utils/log.js"
import { splitHandlerPathAndName } from "../../../utils/index.js"

const { parse, stringify } = JSON
const { hasOwn } = Object
const { assign, hasOwn } = Object

export default class RubyRunner {
static #payloadIdentifier = "__offline_payload__"

#env = null

#handlerName = null
#handlerProcess = null

#handlerPath = null
#runtime = null

constructor(funOptions, env) {
#spawnArgs = null

#spawnOptions = null

#watchers = []

#debounceTimer = null

#busy = false

#restartQueued = false

#watchDirs = []

// Spawn a persistent Ruby process in the constructor (mirrors PythonRunner).
// The process stays alive across invocations and communicates via stdin/stdout.
// File changes trigger an automatic restart when rubyWatchDirs is configured.
constructor(funOptions, env, options = {}) {
const [handlerPath, handlerName] = splitHandlerPathAndName(
funOptions.handler,
)

this.#env = env
this.#handlerName = handlerName
this.#handlerPath = handlerPath
this.#runtime = platform() === "win32" ? "ruby.exe" : "ruby"

this.#spawnArgs = [
join(import.meta.url, "invoke.rb"),
relative(cwd(), handlerPath),
handlerName,
]

this.#spawnOptions = {
env: assign(process.env, this.#env),
}

this.#watchDirs = options.rubyWatchDirs ?? []

this.#spawnProcess()

if (this.#watchDirs.length > 0) {
this.#setupFileWatcher()
}
}

#spawnProcess() {
this.#handlerProcess = spawn(
this.#runtime,
this.#spawnArgs,
this.#spawnOptions,
)

this.#handlerProcess.stdout.readline = createInterface({
input: this.#handlerProcess.stdout,
})
}

#setupFileWatcher() {
const watchDirs = this.#watchDirs.map((dir) => resolve(cwd(), dir))

for (const dir of watchDirs) {
try {
const watcher = watch(
dir,
{ recursive: true },
(_eventType, filename) => {
if (!filename?.endsWith(".rb")) {
return
}

this.#onFileChanged(filename)
},
)

this.#watchers.push(watcher)
} catch {
// Directory may not exist, skip
}
}
}

#onFileChanged(filename) {
if (this.#debounceTimer) {
clearTimeout(this.#debounceTimer)
}

this.#debounceTimer = setTimeout(() => {
log.notice(`Ruby file changed: ${filename}, reloading handler...`)
this.#scheduleRestart()
}, 100)
}

#scheduleRestart() {
if (this.#busy) {
// Defer restart until the current invocation completes
this.#restartQueued = true
} else {
this.#restartProcess()
}
}

#restartProcess() {
this.#handlerProcess.kill()
this.#spawnProcess()
}

// no-op
// () => void
cleanup() {}
cleanup() {
for (const watcher of this.#watchers) {
watcher.close()
}

this.#watchers = []

if (this.#debounceTimer) {
clearTimeout(this.#debounceTimer)
}

this.#handlerProcess.kill()
}

#parsePayload(value) {
let payload
Expand Down Expand Up @@ -63,44 +171,57 @@ export default class RubyRunner {

// invokeLocalRuby, loosely based on:
// https://github.com/serverless/serverless/blob/v1.50.0/lib/plugins/aws/invokeLocal/index.js#L556
// invoke.rb, copy/pasted entirely as is:
// https://github.com/serverless/serverless/blob/v1.50.0/lib/plugins/aws/invokeLocal/invoke.rb
async run(event, context) {
const runtime = platform() === "win32" ? "ruby.exe" : "ruby"

// https://docs.aws.amazon.com/lambda/latest/dg/ruby-context.html

// https://docs.aws.amazon.com/lambda/latest/dg/ruby-context.html
// exclude callbackWaitsForEmptyEventLoop, don't mutate context
const { callbackWaitsForEmptyEventLoop, ..._context } = context

const input = stringify({
context: _context,
event,
})

// console.log(input)

const { stderr, stdout } = await execa(
runtime,
[
join(import.meta.url, "invoke.rb"),
relative(cwd(), this.#handlerPath),
this.#handlerName,
],
{
env: this.#env,
input,
// shell: true,
},
)

if (stderr) {
// TODO

log.notice(stderr)
this.#busy = true

try {
return await new Promise((res, rej) => {
// https://docs.aws.amazon.com/lambda/latest/dg/ruby-context.html
// exclude callbackWaitsForEmptyEventLoop, don't mutate context
const { callbackWaitsForEmptyEventLoop, ..._context } = context

const input = stringify({
context: _context,
event,
})

const onErr = (data) => {
// TODO

log.notice(data.toString())
}

const onLine = (line) => {
try {
const parsed = this.#parsePayload(line.toString())
if (parsed) {
this.#handlerProcess.stdout.readline.removeListener(
"line",
onLine,
)
this.#handlerProcess.stderr.removeListener("data", onErr)
res(parsed)
}
} catch (err) {
rej(err)
}
}

this.#handlerProcess.stdout.readline.on("line", onLine)
this.#handlerProcess.stderr.on("data", onErr)

nextTick(() => {
this.#handlerProcess.stdin.write(input)
this.#handlerProcess.stdin.write("\n")
})
})
} finally {
this.#busy = false

if (this.#restartQueued) {
this.#restartQueued = false
this.#restartProcess()
}
}

return this.#parsePayload(stdout)
}
}
79 changes: 45 additions & 34 deletions src/lambda/handler-runner/ruby-runner/invoke.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# copy/pasted entirely from:
# Persistent Ruby invoke script for serverless-offline.
# Mirrors the Python runner pattern: spawn once, loop forever via stdin/stdout.
#
# Original one-shot version was based on:
# https://github.com/serverless/serverless/blob/v1.50.0/lib/plugins/aws/invokeLocal/invoke.rb

require 'json'
Expand Down Expand Up @@ -28,26 +31,6 @@ def initialize(context:)
def get_remaining_time_in_millis
[@timeout*1000 - ((Time.now() - @created_time)*1000).round, 0].max
end

# def invoked_function_arn
# "arn:aws:lambda:serverless:#{function_name}"
# end
#
# def memory_limit_in_mb
# return @memory_limit_in_mb
# end
#
# def log_group_name
# return @log_group_name
# end
#
# def log_stream_name
# return Time.now.strftime('%Y/%m/%d') +'/[$' + function_version + ']58419525dade4d17a495dceeeed44708'
# end
#
# def log(message)
# puts message
# end
end


Expand All @@ -56,7 +39,7 @@ def attach_tty
$stdin.reopen "/dev/tty", "a+"
end
rescue
puts "tty unavailable"
$stderr.puts "tty unavailable"
end

if __FILE__ == $0
Expand All @@ -68,25 +51,53 @@ def attach_tty
handler_path = ARGV[0]
handler_name = ARGV[1]

input = JSON.load($stdin) || {}

# Load the handler module ONCE at startup
require("./#{handler_path}")

# handler name is either a global method or a static method in a class
# my_method or MyModule::MyClass.my_method
handler_method, handler_class = handler_name.split(".").reverse
handler_class ||= "Kernel"

attach_tty

context = FakeLambdaContext.new(context: input['context'])
result = Object.const_get(handler_class).send(handler_method, event: input['event'], context: context)
# Keep a reference to the original stdin for reading from the parent process
original_stdin = $stdin.dup

data = {
# just an identifier to distinguish between
# interesting data (result) and stdout/print
'__offline_payload__': result
}
attach_tty

puts data.to_json
# Persistent loop: read JSON from stdin, invoke handler, write result to stdout
while (line = original_stdin.gets)
line = line.strip
next if line.empty?

begin
input = JSON.parse(line)

context = FakeLambdaContext.new(context: input['context'] || {})
result = Object.const_get(handler_class).send(handler_method, event: input['event'], context: context)

data = {
# just an identifier to distinguish between
# interesting data (result) and stdout/print
'__offline_payload__': result
}

$stdout.write(data.to_json)
$stdout.write("\n")
$stdout.flush
rescue => e
$stderr.write("#{e.class}: #{e.message}\n")
$stderr.write(e.backtrace.join("\n") + "\n")
$stderr.flush

error_data = {
'__offline_payload__': {
'statusCode' => 500,
'body' => JSON.generate({ error: e.message })
}
}
$stdout.write(error_data.to_json)
$stdout.write("\n")
$stdout.flush
end
end
end
Loading