diff --git a/lib/ontologies_linked_data/services/rank_solr_propagator.rb b/lib/ontologies_linked_data/services/rank_solr_propagator.rb index 0afc5602..dfd6c09c 100644 --- a/lib/ontologies_linked_data/services/rank_solr_propagator.rb +++ b/lib/ontologies_linked_data/services/rank_solr_propagator.rb @@ -1,5 +1,7 @@ # frozen_string_literal: true +require 'redis' + module LinkedData module Services # Propagates the current ontology rank values into the denormalized @@ -14,21 +16,39 @@ module Services # bioportalScore/umlsScore using the configured weights. # # Updates use true Solr atomic updates ({ id, ontologyRank: { set: score } }) - # so no per-class triplestore read is needed. + # so no per-class triplestore read is needed. Batches are sent without a + # commit; a single commit is issued *between* ontologies (when no updates are + # in flight) to keep the transaction log bounded without pausing replicas + # mid-stream — committing during the update stream backs up SolrCloud's + # leader->replica forwarding queue and causes "distributed update stalled" + # 500s. Each Solr call is retried with backoff so a transient stall is + # survived rather than aborting the whole run. + # + # Ontologies whose rank is unchanged since the last successful propagation + # are skipped (tracked per acronym in Redis). Pass force: true to ignore the + # skip cache and re-propagate everything (e.g. after a collection rebuild). # # See https://github.com/ncbo/ncbo_cron/issues/132 class RankSolrPropagator - DEFAULT_BATCH_SIZE = 1000 + DEFAULT_BATCH_SIZE = 2_500 + PROGRESS_EVERY = 50_000 # log per-ontology progress every N docs + MAX_RETRIES = 5 + RETRY_BASE_SLEEP = 5 # seconds; exponential backoff: 5, 10, 20, 40, 80 + # Redis key holding the last successfully propagated rank per acronym. + LAST_PROPAGATED_REDIS_FIELD = 'ontology_rank_solr_propagated' - def initialize(logger: nil, batch_size: DEFAULT_BATCH_SIZE) + # batch_size precedence: explicit arg > RANK_SOLR_BATCH_SIZE env > default. + # The env override lets the batch size be tuned on staging without a deploy. + def initialize(logger: nil, batch_size: nil) @logger = logger || Logger.new($stdout) - @batch_size = batch_size + @batch_size = (batch_size || ENV['RANK_SOLR_BATCH_SIZE'] || DEFAULT_BATCH_SIZE).to_i + @retry_count = 0 end - # Reads the rank map and writes each ontology's normalizedScore onto all - # of its term_search docs. Returns the count of ontologies whose docs were - # updated. - def propagate(rank_map = nil) + # Reads the rank map and writes each ontology's normalizedScore onto all of + # its term_search docs. Unchanged ontologies are skipped unless force: true. + # Returns the number of ontologies whose docs were updated. + def propagate(rank_map = nil, force: false) rank_map ||= LinkedData::Models::Ontology.rank if rank_map.nil? || rank_map.empty? @@ -36,39 +56,73 @@ def propagate(rank_map = nil) return 0 end + last_propagated = force ? {} : load_last_propagated + total_onts = rank_map.size updated = 0 - rank_map.each do |acronym, rank_info| + skipped = 0 + @retry_count = 0 + + rank_map.each_with_index do |(acronym, rank_info), i| score = rank_info[:normalizedScore].to_f + + if !force && last_propagated[acronym] == score + skipped += 1 + next + end + + log("[#{i + 1}/#{total_onts}] #{acronym} rank=#{score}") count = update_ontology_rank(acronym, score) - @logger.info("RankSolrPropagator: #{acronym} rank=#{score} (#{count} docs)") + + last_propagated[acronym] = score + save_last_propagated(last_propagated) updated += 1 if count.positive? end - LinkedData::Models::Class.indexCommit(nil) - @logger.info("RankSolrPropagator: committed; updated #{updated} ontologies") + summary = "done; updated #{updated}, skipped #{skipped} unchanged (of #{total_onts}); Solr retries: #{@retry_count}" + if @retry_count.zero? + log(summary) + else + @logger.warn("RankSolrPropagator: BACKPRESSURE — #{summary}. " \ + 'Solr stalled and was retried; consider a smaller RANK_SOLR_BATCH_SIZE.') + @logger.flush if @logger.respond_to?(:flush) + end updated end private - # Cursor-scans all term_search docs for the acronym and issues batched - # atomic updates to ontologyRank. Returns the number of docs updated. + # Cursor-scans all term_search docs for the acronym, issues batched atomic + # updates to ontologyRank (no commit), then commits once. Returns the + # number of docs updated. def update_ontology_rank(acronym, score) + query = "submissionAcronym:\"#{acronym}\"" + num_found = (solr_search(query, rows: 0).dig('response', 'numFound') || 0).to_i + + if num_found.zero? + log(" #{acronym}: no term_search docs; skipping") + return 0 + end + log(" #{acronym}: #{num_found} docs to update") + cursor = '*' total = 0 - query = "submissionAcronym:\"#{acronym}\"" + last_logged = 0 loop do - resp = LinkedData::Models::Class.search( - query, - { fl: 'id', rows: @batch_size, sort: 'id asc', cursorMark: cursor } - ) + resp = solr_search(query, fl: 'id', rows: @batch_size, sort: 'id asc', cursorMark: cursor) docs = resp.dig('response', 'docs') || [] unless docs.empty? batch = docs.map { |d| { id: d['id'], ontologyRank: { set: score } } } - LinkedData::Models::Class.search_client.index_document(batch, commit: false) + with_retry("update #{acronym}") do + LinkedData::Models::Class.search_client.index_document(batch, commit: false) + end total += batch.size + + if total - last_logged >= PROGRESS_EVERY + log(" #{acronym}: #{total}/#{num_found} docs…") + last_logged = total + end end next_cursor = resp['nextCursorMark'] @@ -77,8 +131,71 @@ def update_ontology_rank(acronym, score) cursor = next_cursor end + # Commit between ontologies (no updates in flight) — keeps the tlog + # bounded without stalling replica forwarding mid-stream. + log(" #{acronym}: committing #{total} docs…") + with_retry("commit #{acronym}") { LinkedData::Models::Class.indexCommit(nil) } + log(" #{acronym}: done (#{total} docs)") total end + + def solr_search(query, **params) + with_retry('search') { LinkedData::Models::Class.search(query, params) } + end + + # Retries transient Solr errors (e.g. distributed-update stalls under load) + # with exponential backoff. Re-raises once retries are exhausted. + def with_retry(what) + attempts = 0 + begin + yield + rescue RSolr::Error::Http, RSolr::Error::ConnectionRefused, + Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE, Errno::ETIMEDOUT, + Net::ReadTimeout, Net::OpenTimeout, Net::WriteTimeout, + SocketError => e + attempts += 1 + @retry_count += 1 + first_line = e.message.to_s.lines.first&.strip + + if attempts > MAX_RETRIES + @logger.error("RankSolrPropagator: BACKPRESSURE — #{what} still failing after " \ + "#{MAX_RETRIES} retries; aborting — #{first_line}") + @logger.flush if @logger.respond_to?(:flush) + raise + end + + wait = RETRY_BASE_SLEEP * (2**(attempts - 1)) + @logger.warn("RankSolrPropagator: BACKPRESSURE — #{what} failed (attempt " \ + "#{attempts}/#{MAX_RETRIES}); retrying in #{wait}s — #{first_line}") + @logger.flush if @logger.respond_to?(:flush) + sleep(wait) + retry + end + end + + def log(msg) + @logger.info("RankSolrPropagator: #{msg}") + @logger.flush if @logger.respond_to?(:flush) + end + + def redis + @redis ||= Redis.new(host: LinkedData.settings.ontology_analytics_redis_host, + port: LinkedData.settings.ontology_analytics_redis_port) + end + + def load_last_propagated + raw = redis.get(LAST_PROPAGATED_REDIS_FIELD) + raw ? Marshal.load(raw) : {} + rescue StandardError => e + @logger.warn("RankSolrPropagator: could not load skip cache (#{e.class}: #{e.message}); propagating all") + {} + end + + def save_last_propagated(map) + redis.set(LAST_PROPAGATED_REDIS_FIELD, Marshal.dump(map)) + rescue StandardError => e + @logger.warn("RankSolrPropagator: could not save skip cache (#{e.class}: #{e.message})") + end end end end diff --git a/test/services/test_rank_solr_propagator.rb b/test/services/test_rank_solr_propagator.rb index 3b83cb80..1c13c0dd 100644 --- a/test/services/test_rank_solr_propagator.rb +++ b/test/services/test_rank_solr_propagator.rb @@ -1,5 +1,6 @@ require_relative '../models/test_ontology_common' require 'mocha/minitest' +require 'stringio' class TestRankSolrPropagator < LinkedData::TestOntologyCommon ACRONYM = 'BRO' @@ -12,6 +13,7 @@ def self.after_suite def setup self.class.after_suite + clear_propagation_cache # Index a small ontology's terms into the real local term_search collection. submission_parse(ACRONYM, 'BRO Ontology', './test/data/ontology_files/BRO_v3.5.owl', 1, @@ -22,6 +24,19 @@ def setup def teardown self.class.after_suite + clear_propagation_cache + end + + def clear_propagation_cache + Redis.new(host: LinkedData.settings.ontology_analytics_redis_host, + port: LinkedData.settings.ontology_analytics_redis_port) + .del(LinkedData::Services::RankSolrPropagator::LAST_PROPAGATED_REDIS_FIELD) + end + + def stub_rank(score) + LinkedData::Models::Ontology.stubs(:rank).returns( + { ACRONYM => { bioportalScore: 0.5, umlsScore: 0.0, normalizedScore: score } } + ) end def term_docs @@ -35,9 +50,7 @@ def test_propagate_writes_normalized_score_to_all_term_docs docs_before = term_docs refute_empty docs_before, 'expected BRO terms to be indexed in term_search' - LinkedData::Models::Ontology.stubs(:rank).returns( - { ACRONYM => { bioportalScore: 0.5, umlsScore: 0.0, normalizedScore: 0.642 } } - ) + stub_rank(0.642) updated = LinkedData::Services::RankSolrPropagator.new.propagate assert_equal 1, updated @@ -57,9 +70,7 @@ def test_propagate_preserves_searchable_fields )['response']['numFound'] refute_equal 0, before, 'expected a prefLabel match before propagation' - LinkedData::Models::Ontology.stubs(:rank).returns( - { ACRONYM => { bioportalScore: 0.5, umlsScore: 0.0, normalizedScore: 0.642 } } - ) + stub_rank(0.642) LinkedData::Services::RankSolrPropagator.new.propagate # The same search still works after atomic updates (copyField targets and @@ -75,4 +86,58 @@ def test_propagate_returns_zero_for_empty_rank_map LinkedData::Models::Ontology.stubs(:rank).returns({}) assert_equal 0, LinkedData::Services::RankSolrPropagator.new.propagate end + + def test_propagate_skips_unchanged_ontology_on_second_run + stub_rank(0.642) + + first = LinkedData::Services::RankSolrPropagator.new.propagate + assert_equal 1, first, 'first run should propagate the ontology' + + # Rank unchanged -> the second run should skip it entirely. + second = LinkedData::Services::RankSolrPropagator.new.propagate + assert_equal 0, second, 'unchanged ontology should be skipped on the second run' + end + + def test_force_repropagates_even_when_unchanged + stub_rank(0.642) + + LinkedData::Services::RankSolrPropagator.new.propagate + # force: true ignores the skip cache and re-propagates. + forced = LinkedData::Services::RankSolrPropagator.new.propagate(nil, force: true) + assert_equal 1, forced, 'force should re-propagate even when rank is unchanged' + end + + # Manifests the Solr-stall condition: the first atomic-update POST raises a + # retryable error, and we assert the run recovers AND emits a visible signal + # (a BACKPRESSURE warning plus a non-zero retry count in the summary). + def test_backpressure_is_retried_and_logged + stub_rank(0.642) + + log_io = StringIO.new + propagator = LinkedData::Services::RankSolrPropagator.new(logger: Logger.new(log_io)) + propagator.stubs(:sleep) # skip the backoff wait in the test + + # RSolr::Error::ConnectionRefused is the error that previously slipped past + # the retry list and aborted a live run; assert it is now retried. + client = LinkedData::Models::Class.search_client + conn_refused = RSolr::Error::ConnectionRefused.new(uri: URI('http://solr.test/term_search/select')) + client.stubs(:index_document).raises(conn_refused).then.returns(true) + + updated = propagator.propagate + assert_equal 1, updated, 'run should recover from the transient error' + + log = log_io.string + assert_match(/BACKPRESSURE/, log, 'a retry must emit a filterable BACKPRESSURE warning') + assert_match(/Solr retries: 1/, log, 'summary must report the retry count') + end + + # A clean run reports zero retries, so "Solr retries: 0" is the all-clear. + def test_clean_run_reports_zero_retries + stub_rank(0.642) + + log_io = StringIO.new + LinkedData::Services::RankSolrPropagator.new(logger: Logger.new(log_io)).propagate + + assert_match(/Solr retries: 0/, log_io.string, 'clean run must report zero retries') + end end