Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 137 additions & 20 deletions lib/ontologies_linked_data/services/rank_solr_propagator.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# frozen_string_literal: true

require 'redis'

module LinkedData
module Services
# Propagates the current ontology rank values into the denormalized
Expand All @@ -14,61 +16,113 @@ module Services
# bioportalScore/umlsScore using the configured weights.
#
# Updates use true Solr atomic updates ({ id, ontologyRank: { set: score } })
# so no per-class triplestore read is needed.
# so no per-class triplestore read is needed. Batches are sent without a
# commit; a single commit is issued *between* ontologies (when no updates are
# in flight) to keep the transaction log bounded without pausing replicas
# mid-stream — committing during the update stream backs up SolrCloud's
# leader->replica forwarding queue and causes "distributed update stalled"
# 500s. Each Solr call is retried with backoff so a transient stall is
# survived rather than aborting the whole run.
#
# Ontologies whose rank is unchanged since the last successful propagation
# are skipped (tracked per acronym in Redis). Pass force: true to ignore the
# skip cache and re-propagate everything (e.g. after a collection rebuild).
#
# See https://github.com/ncbo/ncbo_cron/issues/132
class RankSolrPropagator
DEFAULT_BATCH_SIZE = 1000
DEFAULT_BATCH_SIZE = 2_500
PROGRESS_EVERY = 50_000 # log per-ontology progress every N docs
MAX_RETRIES = 5
RETRY_BASE_SLEEP = 5 # seconds; exponential backoff: 5, 10, 20, 40, 80
# Redis key holding the last successfully propagated rank per acronym.
LAST_PROPAGATED_REDIS_FIELD = 'ontology_rank_solr_propagated'

def initialize(logger: nil, batch_size: DEFAULT_BATCH_SIZE)
# batch_size precedence: explicit arg > RANK_SOLR_BATCH_SIZE env > default.
# The env override lets the batch size be tuned on staging without a deploy.
def initialize(logger: nil, batch_size: nil)
@logger = logger || Logger.new($stdout)
@batch_size = batch_size
@batch_size = (batch_size || ENV['RANK_SOLR_BATCH_SIZE'] || DEFAULT_BATCH_SIZE).to_i
@retry_count = 0
end

# Reads the rank map and writes each ontology's normalizedScore onto all
# of its term_search docs. Returns the count of ontologies whose docs were
# updated.
def propagate(rank_map = nil)
# Reads the rank map and writes each ontology's normalizedScore onto all of
# its term_search docs. Unchanged ontologies are skipped unless force: true.
# Returns the number of ontologies whose docs were updated.
def propagate(rank_map = nil, force: false)
rank_map ||= LinkedData::Models::Ontology.rank

if rank_map.nil? || rank_map.empty?
@logger.warn('RankSolrPropagator: empty rank map; nothing to propagate')
return 0
end

last_propagated = force ? {} : load_last_propagated
total_onts = rank_map.size
updated = 0
rank_map.each do |acronym, rank_info|
skipped = 0
@retry_count = 0

rank_map.each_with_index do |(acronym, rank_info), i|
score = rank_info[:normalizedScore].to_f

if !force && last_propagated[acronym] == score
skipped += 1
next
end

log("[#{i + 1}/#{total_onts}] #{acronym} rank=#{score}")
count = update_ontology_rank(acronym, score)
@logger.info("RankSolrPropagator: #{acronym} rank=#{score} (#{count} docs)")

last_propagated[acronym] = score
save_last_propagated(last_propagated)
updated += 1 if count.positive?
end

LinkedData::Models::Class.indexCommit(nil)
@logger.info("RankSolrPropagator: committed; updated #{updated} ontologies")
summary = "done; updated #{updated}, skipped #{skipped} unchanged (of #{total_onts}); Solr retries: #{@retry_count}"
if @retry_count.zero?
log(summary)
else
@logger.warn("RankSolrPropagator: BACKPRESSURE — #{summary}. " \
'Solr stalled and was retried; consider a smaller RANK_SOLR_BATCH_SIZE.')
@logger.flush if @logger.respond_to?(:flush)
end
updated
end

private

# Cursor-scans all term_search docs for the acronym and issues batched
# atomic updates to ontologyRank. Returns the number of docs updated.
# Cursor-scans all term_search docs for the acronym, issues batched atomic
# updates to ontologyRank (no commit), then commits once. Returns the
# number of docs updated.
def update_ontology_rank(acronym, score)
query = "submissionAcronym:\"#{acronym}\""
num_found = (solr_search(query, rows: 0).dig('response', 'numFound') || 0).to_i

if num_found.zero?
log(" #{acronym}: no term_search docs; skipping")
return 0
end
log(" #{acronym}: #{num_found} docs to update")

cursor = '*'
total = 0
query = "submissionAcronym:\"#{acronym}\""
last_logged = 0

loop do
resp = LinkedData::Models::Class.search(
query,
{ fl: 'id', rows: @batch_size, sort: 'id asc', cursorMark: cursor }
)
resp = solr_search(query, fl: 'id', rows: @batch_size, sort: 'id asc', cursorMark: cursor)
docs = resp.dig('response', 'docs') || []

unless docs.empty?
batch = docs.map { |d| { id: d['id'], ontologyRank: { set: score } } }
LinkedData::Models::Class.search_client.index_document(batch, commit: false)
with_retry("update #{acronym}") do
LinkedData::Models::Class.search_client.index_document(batch, commit: false)
end
total += batch.size

if total - last_logged >= PROGRESS_EVERY
log(" #{acronym}: #{total}/#{num_found} docs…")
last_logged = total
end
end

next_cursor = resp['nextCursorMark']
Expand All @@ -77,8 +131,71 @@ def update_ontology_rank(acronym, score)
cursor = next_cursor
end

# Commit between ontologies (no updates in flight) — keeps the tlog
# bounded without stalling replica forwarding mid-stream.
log(" #{acronym}: committing #{total} docs…")
with_retry("commit #{acronym}") { LinkedData::Models::Class.indexCommit(nil) }
log(" #{acronym}: done (#{total} docs)")
total
end

def solr_search(query, **params)
with_retry('search') { LinkedData::Models::Class.search(query, params) }
end

# Retries transient Solr errors (e.g. distributed-update stalls under load)
# with exponential backoff. Re-raises once retries are exhausted.
def with_retry(what)
attempts = 0
begin
yield
rescue RSolr::Error::Http, RSolr::Error::ConnectionRefused,
Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE, Errno::ETIMEDOUT,
Net::ReadTimeout, Net::OpenTimeout, Net::WriteTimeout,
SocketError => e
attempts += 1
@retry_count += 1
first_line = e.message.to_s.lines.first&.strip

if attempts > MAX_RETRIES
@logger.error("RankSolrPropagator: BACKPRESSURE — #{what} still failing after " \
"#{MAX_RETRIES} retries; aborting — #{first_line}")
@logger.flush if @logger.respond_to?(:flush)
raise
end

wait = RETRY_BASE_SLEEP * (2**(attempts - 1))
@logger.warn("RankSolrPropagator: BACKPRESSURE — #{what} failed (attempt " \
"#{attempts}/#{MAX_RETRIES}); retrying in #{wait}s — #{first_line}")
@logger.flush if @logger.respond_to?(:flush)
sleep(wait)
retry
end
end

def log(msg)
@logger.info("RankSolrPropagator: #{msg}")
@logger.flush if @logger.respond_to?(:flush)
end

def redis
@redis ||= Redis.new(host: LinkedData.settings.ontology_analytics_redis_host,
port: LinkedData.settings.ontology_analytics_redis_port)
end

def load_last_propagated
raw = redis.get(LAST_PROPAGATED_REDIS_FIELD)
raw ? Marshal.load(raw) : {}
rescue StandardError => e
@logger.warn("RankSolrPropagator: could not load skip cache (#{e.class}: #{e.message}); propagating all")
{}
end

def save_last_propagated(map)
redis.set(LAST_PROPAGATED_REDIS_FIELD, Marshal.dump(map))
rescue StandardError => e
@logger.warn("RankSolrPropagator: could not save skip cache (#{e.class}: #{e.message})")
end
end
end
end
77 changes: 71 additions & 6 deletions test/services/test_rank_solr_propagator.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require_relative '../models/test_ontology_common'
require 'mocha/minitest'
require 'stringio'

class TestRankSolrPropagator < LinkedData::TestOntologyCommon
ACRONYM = 'BRO'
Expand All @@ -12,6 +13,7 @@ def self.after_suite

def setup
self.class.after_suite
clear_propagation_cache
# Index a small ontology's terms into the real local term_search collection.
submission_parse(ACRONYM, 'BRO Ontology',
'./test/data/ontology_files/BRO_v3.5.owl', 1,
Expand All @@ -22,6 +24,19 @@ def setup

def teardown
self.class.after_suite
clear_propagation_cache
end

def clear_propagation_cache
Redis.new(host: LinkedData.settings.ontology_analytics_redis_host,
port: LinkedData.settings.ontology_analytics_redis_port)
.del(LinkedData::Services::RankSolrPropagator::LAST_PROPAGATED_REDIS_FIELD)
end

def stub_rank(score)
LinkedData::Models::Ontology.stubs(:rank).returns(
{ ACRONYM => { bioportalScore: 0.5, umlsScore: 0.0, normalizedScore: score } }
)
end

def term_docs
Expand All @@ -35,9 +50,7 @@ def test_propagate_writes_normalized_score_to_all_term_docs
docs_before = term_docs
refute_empty docs_before, 'expected BRO terms to be indexed in term_search'

LinkedData::Models::Ontology.stubs(:rank).returns(
{ ACRONYM => { bioportalScore: 0.5, umlsScore: 0.0, normalizedScore: 0.642 } }
)
stub_rank(0.642)

updated = LinkedData::Services::RankSolrPropagator.new.propagate
assert_equal 1, updated
Expand All @@ -57,9 +70,7 @@ def test_propagate_preserves_searchable_fields
)['response']['numFound']
refute_equal 0, before, 'expected a prefLabel match before propagation'

LinkedData::Models::Ontology.stubs(:rank).returns(
{ ACRONYM => { bioportalScore: 0.5, umlsScore: 0.0, normalizedScore: 0.642 } }
)
stub_rank(0.642)
LinkedData::Services::RankSolrPropagator.new.propagate

# The same search still works after atomic updates (copyField targets and
Expand All @@ -75,4 +86,58 @@ def test_propagate_returns_zero_for_empty_rank_map
LinkedData::Models::Ontology.stubs(:rank).returns({})
assert_equal 0, LinkedData::Services::RankSolrPropagator.new.propagate
end

def test_propagate_skips_unchanged_ontology_on_second_run
stub_rank(0.642)

first = LinkedData::Services::RankSolrPropagator.new.propagate
assert_equal 1, first, 'first run should propagate the ontology'

# Rank unchanged -> the second run should skip it entirely.
second = LinkedData::Services::RankSolrPropagator.new.propagate
assert_equal 0, second, 'unchanged ontology should be skipped on the second run'
end

def test_force_repropagates_even_when_unchanged
stub_rank(0.642)

LinkedData::Services::RankSolrPropagator.new.propagate
# force: true ignores the skip cache and re-propagates.
forced = LinkedData::Services::RankSolrPropagator.new.propagate(nil, force: true)
assert_equal 1, forced, 'force should re-propagate even when rank is unchanged'
end

# Manifests the Solr-stall condition: the first atomic-update POST raises a
# retryable error, and we assert the run recovers AND emits a visible signal
# (a BACKPRESSURE warning plus a non-zero retry count in the summary).
def test_backpressure_is_retried_and_logged
stub_rank(0.642)

log_io = StringIO.new
propagator = LinkedData::Services::RankSolrPropagator.new(logger: Logger.new(log_io))
propagator.stubs(:sleep) # skip the backoff wait in the test

# RSolr::Error::ConnectionRefused is the error that previously slipped past
# the retry list and aborted a live run; assert it is now retried.
client = LinkedData::Models::Class.search_client
conn_refused = RSolr::Error::ConnectionRefused.new(uri: URI('http://solr.test/term_search/select'))
client.stubs(:index_document).raises(conn_refused).then.returns(true)

updated = propagator.propagate
assert_equal 1, updated, 'run should recover from the transient error'

log = log_io.string
assert_match(/BACKPRESSURE/, log, 'a retry must emit a filterable BACKPRESSURE warning')
assert_match(/Solr retries: 1/, log, 'summary must report the retry count')
end

# A clean run reports zero retries, so "Solr retries: 0" is the all-clear.
def test_clean_run_reports_zero_retries
stub_rank(0.642)

log_io = StringIO.new
LinkedData::Services::RankSolrPropagator.new(logger: Logger.new(log_io)).propagate

assert_match(/Solr retries: 0/, log_io.string, 'clean run must report zero retries')
end
end
Loading