diff --git a/exe/rdx b/exe/rdx index bb5356e3d..8a28abf3c 100755 --- a/exe/rdx +++ b/exe/rdx @@ -3,111 +3,6 @@ $LOAD_PATH.unshift(File.expand_path("../lib", __dir__)) -require "optparse" +require "rubydex/cli" -USAGE = <<~TEXT - Usage: rdx [options] - - Commands: - query Run a Cypher query against the workspace graph and print the result - schema Describe the queryable Cypher schema (labels, relationships, properties) - console Open an interactive session with a populated graph for the current workspace - help Show this help message - - Run `rdx --help` for command-specific options. -TEXT - -def abort_with_usage(message) - warn(message) - warn("") - warn(USAGE) - exit(1) -end - -# Top-level --version / --help / bare invocation, handled before command dispatch. -case ARGV.first -when "--version", "version" - require "rubydex/version" - puts "v#{Rubydex::VERSION}" - exit -when nil, "-h", "--help", "help" - puts USAGE - exit -end - -command = ARGV.shift - -def with_timer(io, message) - io.print(message) - start = Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond) - yield - duration = Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond) - start - io.puts(" finished in #{duration.round(2)}ms") -end - -# Builds the workspace graph, sending progress messages to `progress_io`. -def build_graph(progress_io) - graph = Rubydex::Graph.new - with_timer(progress_io, "Indexing workspace...") { graph.index_workspace } - with_timer(progress_io, "Resolving graph...") { graph.resolve } - graph -end - -# Parses `--format`/`--help` for a command and returns the chosen format. -def parse_format(usage) - format = "table" - OptionParser.new do |parser| - parser.banner = usage - parser.on("--format FORMAT", ["table", "json"], "Output format (table or json)") { |value| format = value } - parser.on("-h", "--help", "Show this help") do - puts parser - exit - end - end.parse! - format -end - -case command -when "query" - format = parse_format("Usage: rdx query [options]") - query = ARGV.shift - abort_with_usage("`query` requires a Cypher query argument") if query.nil? || query.empty? - - require "rubydex" - # Progress goes to stderr so stdout carries only the query result (e.g. for piping JSON). - graph = build_graph($stderr) - begin - print(graph.query(query, format)) - rescue ArgumentError => e - abort(e.message) - end -when "schema" - format = parse_format("Usage: rdx schema [options]") - - require "rubydex" - # The schema is static, so describe it without indexing the workspace. - print(Rubydex::Graph.cypher_schema(format)) -when "console" - OptionParser.new do |parser| - parser.banner = "Usage: rdx console" - parser.on("-h", "--help", "Show this help") do - puts parser - exit - end - end.parse! - - require "rubydex" - graph = build_graph($stdout) - - begin - require "irb" - IRB.setup(nil) - IRB.conf[:IRB_NAME] = "rubydex" - workspace = IRB::WorkSpace.new(binding) - IRB::Irb.new(workspace).run(IRB.conf) - rescue LoadError - abort("Interactive mode requires `irb` to be in the bundle") - end -else - abort_with_usage("unknown command: #{command}") -end +Rubydex::CLI.start diff --git a/lib/rubydex/cli.rb b/lib/rubydex/cli.rb new file mode 100644 index 000000000..eba88edc8 --- /dev/null +++ b/lib/rubydex/cli.rb @@ -0,0 +1,226 @@ +# frozen_string_literal: true + +require "optparse" + +module Rubydex + # Command-line entry point for the `rdx` executable. Parses the top-level command, dispatches to + # the matching subcommand handler and keeps `exe/rdx` itself a thin shim. + # + # Heavyweight requires (`rubydex`, `rubydex/server`) are deferred into the handlers so that paths + # which don't need the native extension (e.g. `--help`, or talking to a resident server) stay + # cheap to start. + class CLI + USAGE = <<~TEXT + Usage: rdx [options] + + Commands: + query Run a Cypher query against the workspace graph and print the result + schema Describe the queryable Cypher schema (labels, relationships, properties) + console Open an interactive session with a populated graph for the current workspace + server Manage the resident server (start, stop, restart, status) + help Show this help message + + Run `rdx --help` for command-specific options. + TEXT + + SERVER_USAGE = <<~TEXT + Usage: rdx server [options] + + Actions: + start Start the server for this workspace + stop Stop the running server for this workspace + restart Restart the server for this workspace + status Print the status of the server for this workspace + TEXT + + class << self + # Convenience entry point used by `exe/rdx`. + #: (?Array[String] argv) -> void + def start(argv = ARGV) + new(argv).run + end + end + + #: (Array[String] argv) -> void + def initialize(argv = ARGV) + @argv = argv + end + + #: -> void + def run + # Top-level --version / --help / bare invocation, handled before command dispatch. + case @argv.first + when "--version", "version" + require "rubydex/version" + puts "v#{Rubydex::VERSION}" + exit + when nil, "-h", "--help", "help" + puts USAGE + exit + end + + dispatch(@argv.shift) + end + + private + + #: (String? command) -> void + def dispatch(command) + case command + when "query" then run_query + when "schema" then run_schema + when "console" then run_console + when "server" then run_server + else abort_with_usage("unknown command: #{command}") + end + end + + #: -> void + def run_query + options = parse_query_options + query = @argv.shift + abort_with_usage("`query` requires a Cypher query argument") if query.nil? || query.empty? + + require "rubydex/server" + + # Server mode is opt-in via `--server` and falls back to inline execution when it's disabled or + # unsupported on this platform. + use_server = options[:server] && !Rubydex::Server.disabled? && Rubydex::Server.supported? + + if use_server + cache = Rubydex::Server::Cache.new(workspace_path: Dir.pwd) + exit(Rubydex::Server::Client.query(cache, { query: query, query_format: options[:format] })) + else + require "rubydex" + # Progress goes to stderr so stdout carries only the query result (e.g. for piping JSON). + graph = build_graph($stderr) + begin + print(graph.query(query, options[:format])) + rescue ArgumentError => e + abort(e.message) + end + end + end + + #: -> void + def run_schema + format = parse_format("Usage: rdx schema [options]") + + require "rubydex" + # The schema is static, so describe it without indexing the workspace. + print(Rubydex::Graph.cypher_schema(format)) + end + + #: -> void + def run_console + OptionParser.new do |parser| + parser.banner = "Usage: rdx console" + parser.on("-h", "--help", "Show this help") do + puts parser + exit + end + end.parse!(@argv) + + require "rubydex" + graph = build_graph($stdout) + + begin + require "irb" + IRB.setup(nil) + IRB.conf[:IRB_NAME] = "rubydex" + workspace = IRB::WorkSpace.new(binding) + IRB::Irb.new(workspace).run(IRB.conf) + rescue LoadError + abort("Interactive mode requires `irb` to be in the bundle") + end + end + + #: -> void + def run_server + action = @argv.shift + detach = true + OptionParser.new do |parser| + parser.banner = SERVER_USAGE + parser.on("--no-detach", "Run the server in the foreground (for debugging / containers)") { detach = false } + parser.on("-h", "--help", "Show this help") do + puts parser + exit + end + end.parse!(@argv) + + require "rubydex/server" + + unless Rubydex::Server.supported? + abort("rdx server mode is not supported on this platform (requires fork + UNIX sockets)") + end + + cache = Rubydex::Server::Cache.new(workspace_path: Dir.pwd) + status = case action + when "start" then Rubydex::Server::Commands.start(cache, detach: detach) + when "stop" then Rubydex::Server::Commands.stop(cache) + when "restart" then Rubydex::Server::Commands.restart(cache, detach: detach) + when "status" then Rubydex::Server::Commands.status(cache) + else abort_with_usage("unknown server action: #{action.inspect}", SERVER_USAGE) + end + + exit(status) + end + + # Parses options for the `query` command: output format plus whether to use the resident server. + #: -> Hash[Symbol, untyped] + def parse_query_options + options = { format: "table", server: false } + OptionParser.new do |parser| + parser.banner = "Usage: rdx query [options]" + parser.on("--format FORMAT", ["table", "json"], "Output format (table or json)") { |value| options[:format] = value } + parser.on("--[no-]server", "Use the resident server (opt-in; off by default)") { |value| options[:server] = value } + parser.on("-h", "--help", "Show this help") do + puts parser + exit + end + end.parse!(@argv) + options + end + + # Parses `--format`/`--help` for a command and returns the chosen format. + #: (String usage) -> String + def parse_format(usage) + format = "table" + OptionParser.new do |parser| + parser.banner = usage + parser.on("--format FORMAT", ["table", "json"], "Output format (table or json)") { |value| format = value } + parser.on("-h", "--help", "Show this help") do + puts parser + exit + end + end.parse!(@argv) + format + end + + # Builds the workspace graph, sending progress messages to `progress_io`. + #: (IO progress_io) -> Rubydex::Graph + def build_graph(progress_io) + graph = Rubydex::Graph.new + with_timer(progress_io, "Indexing workspace...") { graph.index_workspace } + with_timer(progress_io, "Resolving graph...") { graph.resolve } + graph + end + + #: (IO io, String message) { -> void } -> void + def with_timer(io, message) + io.print(message) + start = Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond) + yield + duration = Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond) - start + io.puts(" finished in #{duration.round(2)}ms") + end + + #: (String message, ?String usage) -> void + def abort_with_usage(message, usage = USAGE) + warn(message) + warn("") + warn(usage) + exit(1) + end + end +end diff --git a/lib/rubydex/server.rb b/lib/rubydex/server.rb new file mode 100644 index 000000000..9eb6d5ca1 --- /dev/null +++ b/lib/rubydex/server.rb @@ -0,0 +1,72 @@ +# frozen_string_literal: true + +require "rubydex/version" + +module Rubydex + # Client/server mode for the `rdx` executable. + # + # The expensive indexing + resolution work is performed once by a resident server process that + # keeps the built `Rubydex::Graph` in memory. Subsequent commands (currently `--query`) run against + # the already-built graph over a UNIX domain socket, making follow-up queries effectively instant. + # + # See `tmp/rdx-server-plan/README.md` for the full design. + module Server + # Wire protocol version. Bump on any incompatible change to the request/response shape. + PROTOCOL = 1 + + # How long the client waits for the server's version line / handshake before giving up. + HANDSHAKE_TIMEOUT = 10.0 + + # How long the client waits for a freshly spawned server to become ready (socket to appear). + BOOT_TIMEOUT = 120.0 + + class Error < StandardError; end + + # Raised when a read from the server exceeds its timeout. + class ServerReadTimeout < Error; end + + class << self + # Whether server mode can run on the current platform. Requires `fork` + UNIX domain sockets. + #: -> bool + def supported? + Process.respond_to?(:fork) && defined?(::UNIXSocket) && !Gem.win_platform? + end + + # Whether the user has explicitly disabled the server via the environment. + #: -> bool + def disabled? + ENV.key?("DISABLE_RDX_SERVER") + end + + # Builds a fully indexed + resolved graph for the workspace. Shared by the inline path and the + # server boot path. `progress_io`, when given, receives human-readable progress messages. + #: (workspace_path: String, ?progress_io: IO?) -> Rubydex::Graph + def build_graph(workspace_path:, progress_io: nil) + graph = Rubydex::Graph.new(workspace_path: workspace_path) + with_timer(progress_io, "Indexing workspace...") { graph.index_workspace } + with_timer(progress_io, "Resolving graph...") { graph.resolve } + graph + end + + #: (IO? io, String message) { -> void } -> void + def with_timer(io, message) + unless io + yield + return + end + + io.print(message) + start = Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond) + yield + duration = Process.clock_gettime(Process::CLOCK_MONOTONIC, :float_millisecond) - start + io.puts(" finished in #{duration.round(2)}ms") + end + end + end +end + +require "rubydex/server/cache" +require "rubydex/server/request" +require "rubydex/server/core" +require "rubydex/server/client" +require "rubydex/server/commands" diff --git a/lib/rubydex/server/cache.rb b/lib/rubydex/server/cache.rb new file mode 100644 index 000000000..4424cd1a9 --- /dev/null +++ b/lib/rubydex/server/cache.rb @@ -0,0 +1,185 @@ +# frozen_string_literal: true + +require "digest" +require "fileutils" +require "tmpdir" + +module Rubydex + module Server + # Owns the per-workspace runtime directory and the metadata files inside it (socket, pid, token, + # version, lock). + # + # The directory is keyed by an "app id" derived from the workspace path, the Ruby version, the + # rubydex version and a fingerprint of the loaded native extension. A gem upgrade (or a different + # Ruby) therefore maps to a different directory and forces a fresh server, since the loaded C + # extension cannot be hot-swapped. + class Cache + #: String + attr_reader :workspace_path + + #: (?workspace_path: String) -> void + def initialize(workspace_path: Dir.pwd) + @workspace_path = File.expand_path(workspace_path) + end + + # Stable identity for this workspace's server. + #: -> String + def app_id + @app_id ||= Digest::SHA256.hexdigest( + [@workspace_path, RUBY_VERSION, Rubydex::VERSION, ext_fingerprint].join("\0"), + )[0, 16] + end + + # Compatibility string written to the `version` file and sent as the server's version line. + # A mismatch forces the client to restart the server. + #: -> String + def expected_version + "#{Rubydex::VERSION}:#{ext_fingerprint}" + end + + #: -> String + def dir + @dir ||= File.join(runtime_root, app_id) + end + + #: -> String + def socket_path + File.join(dir, "socket") + end + + #: -> String + def pid_path + File.join(dir, "pid") + end + + #: -> String + def token_path + File.join(dir, "token") + end + + #: -> String + def version_path + File.join(dir, "version") + end + + #: -> String + def lock_path + File.join(dir, "lock") + end + + # Creates the runtime directory (and its parent) with restrictive permissions. + #: -> void + def ensure_dir! + FileUtils.mkdir_p(runtime_root, mode: 0o700) + FileUtils.mkdir_p(dir, mode: 0o700) + # mkdir_p won't tighten perms on a pre-existing dir; enforce them explicitly. + File.chmod(0o700, runtime_root) + File.chmod(0o700, dir) + end + + # Returns the existing token, generating and persisting one if absent. + #: -> String + def token + return @token if @token + + @token = if File.exist?(token_path) + File.read(token_path).chomp + else + generate_token + end + end + + #: -> String + def generate_token + ensure_dir! + value = SecureRandom.hex(32) + File.write(token_path, value) + File.chmod(0o600, token_path) + @token = value + end + + #: -> void + def write_metadata! + ensure_dir! + File.write(pid_path, Process.pid.to_s) + File.write(version_path, expected_version) + end + + # Whether the on-disk version file is compatible with the running gem/extension. + #: -> bool + def version_compatible? + File.exist?(version_path) && File.read(version_path).chomp == expected_version + end + + #: -> Integer? + def server_pid + return unless File.exist?(pid_path) + + pid = File.read(pid_path).to_i + pid.zero? ? nil : pid + end + + # Whether a process with the recorded pid is alive. + #: -> bool + def server_alive? + pid = server_pid + return false unless pid + + Process.kill(0, pid) + true + rescue Errno::ESRCH, Errno::EPERM + false + end + + # Removes stale artifacts left behind by a dead server. + #: -> void + def clean! + [socket_path, pid_path, version_path].each do |path| + File.unlink(path) + rescue Errno::ENOENT + nil + end + end + + private + + #: -> String + def runtime_root + base = ENV["RDX_SERVER_DIR"] + return File.expand_path(base) if base && !base.empty? + + xdg = ENV["XDG_RUNTIME_DIR"] + parent = xdg && !xdg.empty? ? xdg : Dir.tmpdir + File.join(parent, "rubydex-#{uid}") + end + + #: -> (Integer | String) + def uid + Process.respond_to?(:uid) ? Process.uid : "nobody" + end + + # Fingerprint of the native extension artifacts so a recompiled/upgraded extension invalidates + # any running server (the C extension cannot be reloaded in place). + #: -> String + def ext_fingerprint + @ext_fingerprint ||= begin + lib_dir = File.expand_path("../..", __dir__) + artifacts = Dir.glob(File.join(lib_dir, "**", "rubydex.{bundle,so}")) + + Dir.glob(File.join(lib_dir, "librubydex_sys.*")) + + if artifacts.empty? + "noext" + else + parts = artifacts.sort.map do |path| + stat = File.stat(path) + "#{File.basename(path)}:#{stat.size}:#{stat.mtime.to_i}" + end + Digest::SHA256.hexdigest(parts.join("|"))[0, 16] + end + end + end + end + end +end + +require "securerandom" diff --git a/lib/rubydex/server/client.rb b/lib/rubydex/server/client.rb new file mode 100644 index 000000000..63e13ec23 --- /dev/null +++ b/lib/rubydex/server/client.rb @@ -0,0 +1,231 @@ +# frozen_string_literal: true + +require "socket" + +module Rubydex + module Server + # The short-lived client side. Ensures a compatible server is running for the workspace, then + # talks to it over the UNIX socket. Kept deliberately lightweight: it never `require`s the full + # `rubydex` extension on the happy path, so the server actually saves time. + module Client + class << self + # Runs a `--query` against the server, printing its output and returning the exit status. + #: (Cache cache, Hash[Symbol, untyped] options, ?stdout: IO, ?stderr: IO) -> Integer + def query(cache, options, stdout: $stdout, stderr: $stderr) + request( + cache, + { + "command" => "query", + "query" => options[:query], + "query_format" => options[:query_format] || "table", + }, + stdout: stdout, + stderr: stderr, + ) + end + + # Sends a single request and streams the framed response. Restarts the server once if the + # initial connection fails (stale socket / incompatible version). + #: (Cache cache, Hash[String, untyped] payload, ?stdout: IO, ?stderr: IO) -> Integer + def request(cache, payload, stdout: $stdout, stderr: $stderr) + socket = connection(cache) + Request.write(socket, payload.merge(base_payload(cache))) + + response = Request.read(socket) + socket.close + + unless response + stderr.puts("rdx server: empty response") + return 1 + end + + stdout.print(response["stdout"]) if response["stdout"] + stderr.print(response["stderr"]) if response["stderr"] + response["status"] || 0 + end + + # Ensures a server is running, then connects with the version handshake. Restarts once on a + # failed connect. + #: (Cache cache) -> UNIXSocket + def connection(cache) + ensure_server(cache, detach: true) + + socket = connect(cache) + unless socket + restart(cache, detach: true) + socket = connect(cache) + end + + raise Error, "could not connect to the rdx server at #{cache.socket_path}" unless socket + + socket + end + + # Starts a server if none is running, or restarts it if the running one is incompatible. + #: (Cache cache, ?detach: bool) -> void + def ensure_server(cache, detach: true) + if cache.server_alive? && File.socket?(cache.socket_path) + return if cache.version_compatible? + + stop(cache) + end + + cache.clean! + start(cache, detach: detach) + end + + # Forks + daemonizes a server process (unless `detach: false`) and waits until it is ready. + #: (Cache cache, ?detach: bool) -> void + def start(cache, detach: true) + cache.ensure_dir! + + if detach + pid = fork { run_server(cache, detach: true) } + Process.detach(pid) if pid + wait_until_ready(cache) + else + run_server(cache, detach: false) + end + end + + #: (Cache cache, ?detach: bool) -> void + def restart(cache, detach: true) + stop(cache) + cache.clean! + start(cache, detach: detach) + end + + # Stops the running server (gracefully if possible, otherwise via SIGTERM) and cleans up. + # + # The pid is captured up front and waited on directly, because the server removes its own pid + # file during shutdown while it still holds the start flock. Re-reading the pid file would + # make the process look dead too early, letting a restart spawn a new daemon that then fails + # to grab the still-held flock. + #: (Cache cache) -> void + def stop(cache) + pid = cache.server_pid + + if pid && process_alive?(pid) + graceful_stop(cache, pid) + + if process_alive?(pid) + terminate(pid) + wait_for_exit(pid) + end + end + + cache.clean! + end + + private + + # Runs the server in the current process. Guards against concurrent starts with a flock: if + # another process already holds the lock, this returns without starting a second server. + #: (Cache cache, detach: bool) -> void + def run_server(cache, detach:) + lock = File.open(cache.lock_path, File::RDWR | File::CREAT, 0o600) + + unless lock.flock(File::LOCK_EX | File::LOCK_NB) + lock.close + return + end + + Process.daemon(true) if detach + Core.new(cache, lock: lock, detach: detach).run + end + + #: (Cache cache) -> UNIXSocket? + def connect(cache) + # Mirror the server's relative bind so a long runtime directory doesn't blow the + # `sockaddr_un` path limit. The cwd is restored when the block returns. + socket = Dir.chdir(File.dirname(cache.socket_path)) do + UNIXSocket.new(File.basename(cache.socket_path)) + end + version = Request.read_line_with_timeout(socket, HANDSHAKE_TIMEOUT).chomp + + if version == cache.expected_version + socket + else + socket.close + nil + end + rescue Errno::ENOENT, Errno::ECONNREFUSED, Errno::ECONNRESET, ServerReadTimeout + begin + socket&.close + rescue IOError + nil + end + nil + end + + #: (Cache cache, Integer pid) -> void + def graceful_stop(cache, pid) + socket = connect(cache) + return unless socket + + Request.write(socket, base_payload(cache).merge("command" => "stop")) + Request.read(socket) + socket.close + wait_for_exit(pid) + rescue Errno::EPIPE, Errno::ECONNRESET + nil + end + + #: (Integer pid) -> void + def terminate(pid) + Process.kill("TERM", pid) + rescue Errno::ESRCH + nil + end + + #: (Integer pid) -> bool + def process_alive?(pid) + Process.kill(0, pid) + true + rescue Errno::ESRCH, Errno::EPERM + false + end + + #: (Cache cache) -> void + def wait_until_ready(cache) + deadline = monotonic + BOOT_TIMEOUT + + until File.socket?(cache.socket_path) && cache.version_compatible? + raise Error, "timed out waiting for the rdx server to start" if monotonic > deadline + + sleep(0.05) + end + end + + #: (Integer pid) -> bool + def wait_for_exit(pid) + deadline = monotonic + 5.0 + + while process_alive?(pid) + return false if monotonic > deadline + + sleep(0.02) + end + + true + end + + #: (Cache cache) -> Hash[String, untyped] + def base_payload(cache) + { + "protocol" => PROTOCOL, + "token" => cache.token, + "cwd" => Dir.pwd, + "argv" => ARGV, + "env" => {}, + } + end + + #: -> Float + def monotonic + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + end + end + end +end diff --git a/lib/rubydex/server/commands.rb b/lib/rubydex/server/commands.rb new file mode 100644 index 000000000..69602db8f --- /dev/null +++ b/lib/rubydex/server/commands.rb @@ -0,0 +1,54 @@ +# frozen_string_literal: true + +module Rubydex + module Server + # Implementations for the server-control CLI flags (`--start-server`, `--stop-server`, + # `--restart-server`, `--server-status`). Each returns a process exit status. + module Commands + class << self + #: (Cache cache, ?detach: bool, ?stdout: IO) -> Integer + def start(cache, detach: true, stdout: $stdout) + if cache.server_alive? && cache.version_compatible? && File.socket?(cache.socket_path) + stdout.puts("rdx server already running (pid=#{cache.server_pid}, socket=#{cache.socket_path})") + return 0 + end + + Client.ensure_server(cache, detach: detach) + # With --no-detach the server runs in the foreground and only returns here once stopped. + stdout.puts("rdx server started (pid=#{cache.server_pid}, socket=#{cache.socket_path})") + 0 + end + + #: (Cache cache, ?stdout: IO) -> Integer + def stop(cache, stdout: $stdout) + unless cache.server_alive? + stdout.puts("rdx server: no server running for #{cache.workspace_path}") + cache.clean! + return 0 + end + + Client.stop(cache) + stdout.puts("rdx server stopped") + 0 + end + + #: (Cache cache, ?detach: bool, ?stdout: IO) -> Integer + def restart(cache, detach: true, stdout: $stdout) + Client.restart(cache, detach: detach) + stdout.puts("rdx server restarted (pid=#{cache.server_pid}, socket=#{cache.socket_path})") + 0 + end + + #: (Cache cache, ?stdout: IO, ?stderr: IO) -> Integer + def status(cache, stdout: $stdout, stderr: $stderr) + unless cache.server_alive? && File.socket?(cache.socket_path) + stdout.puts("rdx server: not running for #{cache.workspace_path}") + return 0 + end + + Client.request(cache, { "command" => "status" }, stdout: stdout, stderr: stderr) + end + end + end + end +end diff --git a/lib/rubydex/server/core.rb b/lib/rubydex/server/core.rb new file mode 100644 index 000000000..86e15af83 --- /dev/null +++ b/lib/rubydex/server/core.rb @@ -0,0 +1,229 @@ +# frozen_string_literal: true + +require "socket" + +module Rubydex + module Server + # The resident server process. Builds the graph for one workspace, then serves requests over a + # UNIX domain socket. Read-only `query` requests are answered in-process; there is no fork per + # request (Phase 1). + class Core + #: (Cache cache, ?lock: File?, ?detach: bool) -> void + def initialize(cache, lock: nil, detach: true) + @cache = cache + @lock = lock + @detach = detach + @mutex = Mutex.new + @running = true + @started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + @manifest = {} #: Hash[String, Float] + end + + # Boots the graph, writes metadata and serves until stopped. Blocks for the lifetime of the + # server. + #: -> void + def run + require "rubydex" + + redirect_output if @detach + @graph = Server.build_graph(workspace_path: @cache.workspace_path) + @manifest = workspace_manifest + @cache.token # ensure the token file exists before we accept connections + @cache.write_metadata! + + server = open_socket + log("rdx server ready (pid=#{Process.pid}, workspace=#{@cache.workspace_path})") + serve(server) + ensure + cleanup(server) + end + + private + + #: -> UNIXServer + def open_socket + File.unlink(@cache.socket_path) if File.exist?(@cache.socket_path) + # `sockaddr_un` caps the path at ~104 bytes, so bind using a short relative name from inside + # the socket's directory. The cwd is restored when the block returns. + server = Dir.chdir(File.dirname(@cache.socket_path)) do + UNIXServer.new(File.basename(@cache.socket_path)) + end + File.chmod(0o600, @cache.socket_path) + server + end + + #: (UNIXServer server) -> void + def serve(server) + while @running + client = begin + server.accept + rescue IOError, Errno::EBADF + break + end + + handle(client) + end + end + + #: (UNIXSocket client) -> void + def handle(client) + # Version gate: the server's first write lets the client detect an incompatible server and + # restart it before sending any request. + client.puts(@cache.expected_version) + + request = Request.read(client) + return unless request + + unless authorized?(request["token"]) + Request.write(client, response(stderr: "rdx server: unauthorized request\n", status: 1)) + return + end + + dispatch(request, client) + rescue Errno::EPIPE, Errno::ECONNRESET + nil + ensure + begin + client.close + rescue IOError + nil + end + end + + #: (Hash[untyped, untyped] request, UNIXSocket client) -> void + def dispatch(request, client) + case request["command"] + when "query" + Request.write(client, handle_query(request)) + when "status" + Request.write(client, response(stdout: status_report)) + when "stop" + Request.write(client, response(stdout: "rdx server stopped\n")) + @running = false + else + Request.write(client, response(stderr: "rdx server: unknown command #{request["command"].inspect}\n", status: 1)) + end + end + + #: (Hash[untyped, untyped] request) -> Hash[String, untyped] + def handle_query(request) + query = request["query"] + format = request["query_format"] || "table" + + @mutex.synchronize do + refresh_if_stale + response(stdout: @graph.query(query, format)) + end + rescue ArgumentError => e + response(stderr: "#{e.message}\n", status: 1) + end + + # Detects workspace files that changed since the graph was built and applies incremental + # updates before answering. Always correct, occasionally slow (Phase 1 freshness model). + #: -> void + def refresh_if_stale + current = workspace_manifest + changed = current.select { |path, mtime| @manifest[path] != mtime }.keys + deleted = @manifest.keys - current.keys + return if changed.empty? && deleted.empty? + + @graph.index_all(changed) unless changed.empty? + deleted.each { |path| @graph.delete_document(uri_for(path)) } + @graph.resolve + @manifest = current + end + + #: -> Hash[String, Float] + def workspace_manifest + manifest = {} + collect_files(@cache.workspace_path, manifest) + manifest + end + + #: (String dir, Hash[String, Float] manifest) -> void + def collect_files(dir, manifest) + Dir.each_child(dir) do |entry| + full = File.join(dir, entry) + + if File.directory?(full) + next if Rubydex::Graph::IGNORED_DIRECTORIES.include?(entry) + + collect_files(full, manifest) + elsif Rubydex::Graph::INDEXABLE_EXTENSIONS.include?(File.extname(entry)) + manifest[full] = File.mtime(full).to_f + end + end + rescue Errno::ENOENT, Errno::EACCES + nil + end + + #: (String path) -> String + def uri_for(path) + path = "/#{path}" if Gem.win_platform? + URI::File.build(path: path).to_s + end + + #: (?stdout: String, ?stderr: String, ?status: Integer) -> Hash[String, untyped] + def response(stdout: "", stderr: "", status: 0) + { "stdout" => stdout, "stderr" => stderr, "status" => status } + end + + #: -> String + def status_report + uptime = (Process.clock_gettime(Process::CLOCK_MONOTONIC) - @started_at).round(1) + <<~STATUS + rdx server running + pid: #{Process.pid} + workspace: #{@cache.workspace_path} + socket: #{@cache.socket_path} + uptime: #{uptime}s + version: #{@cache.expected_version} + STATUS + end + + #: (String? candidate) -> bool + def authorized?(candidate) + return false unless candidate.is_a?(String) + + expected = @cache.token + return false unless candidate.bytesize == expected.bytesize + + # Constant-time comparison to avoid leaking the token to other local users via timing. + candidate.bytes.zip(expected.bytes).reduce(0) { |acc, (a, b)| acc | (a ^ b) }.zero? + end + + #: -> void + def redirect_output + log_path = ENV["RDX_SERVER_LOG"] + target = log_path && !log_path.empty? ? log_path : File::NULL + $stdout.reopen(target, "a") + $stderr.reopen(target, "a") + $stdout.sync = true + $stderr.sync = true + end + + #: (String message) -> void + def log(message) + $stdout.puts("[#{Time.now.iso8601}] #{message}") + rescue StandardError + nil + end + + #: (UNIXServer? server) -> void + def cleanup(server) + begin + server&.close + rescue IOError + nil + end + @cache.clean! + @lock&.flock(File::LOCK_UN) + @lock&.close + rescue StandardError + nil + end + end + end +end + +require "time" diff --git a/lib/rubydex/server/request.rb b/lib/rubydex/server/request.rb new file mode 100644 index 000000000..6bf68aa61 --- /dev/null +++ b/lib/rubydex/server/request.rb @@ -0,0 +1,51 @@ +# frozen_string_literal: true + +require "json" + +module Rubydex + module Server + # Length-prefixed JSON framing for client/server messages (modeled on Spring's `send_json`). + # + # The wire format is a single decimal line containing the payload's byte size, followed by + # exactly that many bytes of JSON. This avoids delimiter escaping and partial-read ambiguity. + module Request + class << self + # Writes a framed JSON payload to `socket`. + #: (IO socket, Hash[untyped, untyped] payload) -> void + def write(socket, payload) + data = JSON.dump(payload) + socket.puts(data.bytesize) + socket.write(data) + socket.flush + end + + # Reads a framed JSON payload from `socket`. Returns `nil` on EOF / malformed length. + #: (IO socket) -> Hash[untyped, untyped]? + def read(socket) + length_line = socket.gets + return unless length_line + + length = length_line.to_i + return if length <= 0 + + body = socket.read(length) + return unless body + + JSON.parse(body) + end + + # Reads a single line from `socket`, raising `ServerReadTimeout` if nothing arrives within + # `timeout` seconds. Used for the handshake where a wedged server must never hang the client. + #: (IO socket, Float timeout) -> String + def read_line_with_timeout(socket, timeout) + raise ServerReadTimeout if IO.select([socket], nil, nil, timeout).nil? + + line = socket.gets + raise ServerReadTimeout unless line + + line + end + end + end + end +end diff --git a/test/server/cache_test.rb b/test/server/cache_test.rb new file mode 100644 index 000000000..b5c057f27 --- /dev/null +++ b/test/server/cache_test.rb @@ -0,0 +1,126 @@ +# frozen_string_literal: true + +require "test_helper" +require "rubydex/server" +require "tmpdir" + +module Rubydex + module Server + class CacheTest < Minitest::Test + def setup + @runtime_dir = Dir.mktmpdir("rdx-server-test") + @previous_server_dir = ENV["RDX_SERVER_DIR"] + ENV["RDX_SERVER_DIR"] = @runtime_dir + end + + def teardown + ENV["RDX_SERVER_DIR"] = @previous_server_dir + FileUtils.rm_rf(@runtime_dir) + end + + def test_app_id_is_stable_for_the_same_workspace + a = Cache.new(workspace_path: "/some/workspace") + b = Cache.new(workspace_path: "/some/workspace") + + assert_equal(a.app_id, b.app_id) + end + + def test_app_id_differs_between_workspaces + a = Cache.new(workspace_path: "/workspace/a") + b = Cache.new(workspace_path: "/workspace/b") + + refute_equal(a.app_id, b.app_id) + end + + def test_expected_version_includes_gem_version + cache = Cache.new(workspace_path: "/workspace") + assert_match(/\A#{Regexp.escape(Rubydex::VERSION)}:/, cache.expected_version) + end + + def test_ensure_dir_creates_directory_with_restrictive_permissions + cache = Cache.new(workspace_path: "/workspace") + cache.ensure_dir! + + assert(File.directory?(cache.dir)) + + # POSIX permission bits aren't meaningful on Windows (where server mode is unsupported). + skip("POSIX permissions are not enforced on this platform") if Gem.win_platform? + + mode = File.stat(cache.dir).mode & 0o777 + assert_equal(0o700, mode) + end + + def test_token_is_generated_and_persisted_with_restrictive_permissions + cache = Cache.new(workspace_path: "/workspace") + token = cache.token + + refute_empty(token) + assert(File.exist?(cache.token_path)) + + # A fresh cache for the same workspace reads the same token back. + assert_equal(token, Cache.new(workspace_path: "/workspace").token) + + # POSIX permission bits aren't meaningful on Windows (where server mode is unsupported). + skip("POSIX permissions are not enforced on this platform") if Gem.win_platform? + + assert_equal(0o600, File.stat(cache.token_path).mode & 0o777) + end + + def test_write_metadata_records_pid_and_version + cache = Cache.new(workspace_path: "/workspace") + cache.write_metadata! + + assert_equal(Process.pid, cache.server_pid) + assert(cache.version_compatible?) + end + + def test_version_compatible_is_false_without_version_file + cache = Cache.new(workspace_path: "/workspace") + refute(cache.version_compatible?) + end + + def test_version_compatible_is_false_on_mismatch + cache = Cache.new(workspace_path: "/workspace") + cache.ensure_dir! + File.write(cache.version_path, "9.9.9:deadbeef") + + refute(cache.version_compatible?) + end + + def test_server_alive_is_false_without_pid_file + cache = Cache.new(workspace_path: "/workspace") + refute(cache.server_alive?) + end + + def test_server_alive_is_false_for_dead_pid + skip("fork is unavailable on this platform") unless Process.respond_to?(:fork) + + cache = Cache.new(workspace_path: "/workspace") + cache.ensure_dir! + # Spawn a child and reap it so we have a pid that is guaranteed to no longer be alive. + dead_pid = fork { exit } + Process.wait(dead_pid) + File.write(cache.pid_path, dead_pid.to_s) + + refute(cache.server_alive?) + end + + def test_clean_removes_runtime_files + cache = Cache.new(workspace_path: "/workspace") + cache.write_metadata! + File.write(cache.socket_path, "") + + cache.clean! + + refute(File.exist?(cache.pid_path)) + refute(File.exist?(cache.version_path)) + refute(File.exist?(cache.socket_path)) + end + + def test_runtime_dir_honors_override + cache = Cache.new(workspace_path: "/workspace") + assert(cache.dir.start_with?(@runtime_dir)) + end + end + end +end diff --git a/test/server/integration_test.rb b/test/server/integration_test.rb new file mode 100644 index 000000000..78597276c --- /dev/null +++ b/test/server/integration_test.rb @@ -0,0 +1,185 @@ +# frozen_string_literal: true + +require "test_helper" +require "helpers/context" +require "rubydex/server" +require "open3" +require "rbconfig" +require "tmpdir" + +module Rubydex + module Server + # Spawns a real resident server through the `rdx` executable and exercises the client round-trip. + # + # The server is launched as a fresh subprocess (never forked from this test process) so it does + # not inherit the already-loaded native extension's thread pools — exactly how the real CLI + # behaves. Guarded to platforms that support fork + UNIX sockets. + class IntegrationTest < Minitest::Test + include Test::Helpers::WithContext + + LIB_DIR = File.expand_path("../../lib", __dir__) #: String + EXE = File.expand_path("../../exe/rdx", __dir__) #: String + + def setup + skip("server mode unsupported on this platform") unless Server.supported? + + @runtime_dir = Dir.mktmpdir("rdx-server-integration") + @previous_server_dir = ENV["RDX_SERVER_DIR"] + ENV["RDX_SERVER_DIR"] = @runtime_dir + @contexts = [] + end + + def teardown + @contexts&.each do |context| + rdx(["server", "stop"], context) + rescue StandardError + nil + end + ENV["RDX_SERVER_DIR"] = @previous_server_dir if @runtime_dir + FileUtils.rm_rf(@runtime_dir) if @runtime_dir + end + + def test_warm_query_matches_repeated_calls + with_context do |context| + track(context) + context.write!("zoo.rb", <<~RUBY) + class Animal; end + class Dog < Animal; end + class Cat < Animal; end + RUBY + + query = "MATCH (c:Class)-[:INHERITS]->(p:Class) WHERE p.name = 'Animal' RETURN c.name ORDER BY c.name" + output = query!(context, query) + + assert_match(/Cat/, output) + assert_match(/Dog/, output) + assert_match(/2 rows/, output) + + # Server is now resident; a second call returns the same result. + assert(server_alive?(context)) + assert_equal(output, query!(context, query)) + end + end + + def test_server_picks_up_file_changes + with_context do |context| + track(context) + context.write!("zoo.rb", "class Animal; end\nclass Dog < Animal; end\n") + + query = "MATCH (c:Class)-[:INHERITS]->(p:Class) WHERE p.name = 'Animal' RETURN c.name ORDER BY c.name" + refute_match(/Fox/, query!(context, query)) + + sleep(0.01) # ensure a distinct mtime + context.write!("zoo.rb", "class Animal; end\nclass Dog < Animal; end\nclass Fox < Animal; end\n") + + assert_match(/Fox/, query!(context, query)) + end + end + + def test_server_drops_deleted_files + with_context do |context| + track(context) + context.write!("animal.rb", "class Animal; end") + context.write!("dog.rb", "class Dog < Animal; end") + + query = "MATCH (c:Class {name: 'Dog'}) RETURN c.name" + assert_match(/Dog/, query!(context, query)) + + File.delete(context.absolute_path_to("dog.rb")) + + refute_match(/Dog/, query!(context, query)) + end + end + + def test_query_output_matches_inline + with_context do |context| + track(context) + context.write!("zoo.rb", "class Animal; end\nclass Dog < Animal; end\n") + + query = "MATCH (c:Class {name: 'Dog'}) RETURN c.name" + inline, _, inline_status = rdx(["query", query], context) + warm = query!(context, query) + + assert_predicate(inline_status, :success?) + assert_equal(inline, warm) + end + end + + def test_start_status_and_stop + with_context do |context| + track(context) + context.write!("foo.rb", "class Foo; end") + + out, err, status = rdx(["server", "start"], context) + assert_predicate(status, :success?, "start failed: #{err}") + assert_match(/rdx server started/, out) + assert(server_alive?(context)) + + status_out, _, _ = rdx(["server", "status"], context) + assert_match(/rdx server running/, status_out) + assert_match(/pid:/, status_out) + + stop_out, _, _ = rdx(["server", "stop"], context) + assert_match(/rdx server stopped/, stop_out) + refute(server_alive?(context)) + + status_out, _, _ = rdx(["server", "status"], context) + assert_match(/not running/, status_out) + end + end + + def test_restart_replaces_the_server + with_context do |context| + track(context) + context.write!("foo.rb", "class Foo; end") + + rdx(["server", "start"], context) + first_pid = cache(context).server_pid + refute_nil(first_pid) + + rdx(["server", "restart"], context) + second_pid = cache(context).server_pid + + refute_equal(first_pid, second_pid) + assert(server_alive?(context)) + end + end + + private + + #: (Test::Helpers::Context context) -> void + def track(context) + @contexts << context + end + + #: (Test::Helpers::Context context) -> Cache + def cache(context) + Cache.new(workspace_path: context.absolute_path) + end + + #: (Test::Helpers::Context context) -> bool + def server_alive?(context) + cache(context).server_alive? + end + + #: (Array[String] args, Test::Helpers::Context context) -> [String, String, Process::Status] + def rdx(args, context) + Open3.capture3( + RbConfig.ruby, + "-I", + LIB_DIR, + EXE, + *args, + chdir: context.absolute_path, + ) + end + + #: (Test::Helpers::Context context, String query) -> String + def query!(context, query) + out, err, status = rdx(["query", query, "--server"], context) + assert_predicate(status, :success?, "query failed: #{err}") + out + end + end + end +end diff --git a/test/server/request_test.rb b/test/server/request_test.rb new file mode 100644 index 000000000..9c98e688a --- /dev/null +++ b/test/server/request_test.rb @@ -0,0 +1,65 @@ +# frozen_string_literal: true + +require "test_helper" +require "rubydex/server" +require "socket" + +module Rubydex + module Server + class RequestTest < Minitest::Test + def test_write_then_read_round_trips_a_payload + a, b = UNIXSocket.pair + + payload = { "command" => "query", "query" => "MATCH (n) RETURN n", "token" => "abc" } + Request.write(a, payload) + + assert_equal(payload, Request.read(b)) + ensure + a&.close + b&.close + end + + def test_read_returns_nil_on_eof + a, b = UNIXSocket.pair + a.close + + assert_nil(Request.read(b)) + ensure + b&.close + end + + def test_read_handles_large_payloads + a, b = UNIXSocket.pair + + payload = { "stdout" => "x" * 100_000 } + Thread.new { Request.write(a, payload) } + + assert_equal(payload, Request.read(b)) + ensure + a&.close + b&.close + end + + def test_read_line_with_timeout_raises_when_nothing_arrives + a, b = UNIXSocket.pair + + assert_raises(ServerReadTimeout) do + Request.read_line_with_timeout(b, 0.05) + end + ensure + a&.close + b&.close + end + + def test_read_line_with_timeout_returns_the_line + a, b = UNIXSocket.pair + a.puts("0.2.5:fingerprint") + + assert_equal("0.2.5:fingerprint", Request.read_line_with_timeout(b, 1.0).chomp) + ensure + a&.close + b&.close + end + end + end +end