SAI-6231: Bucketed segment flush handling on only single doc iterator#41
Conversation
This reverts commit f7f46a5. no longer necessary now that we peek the first doc and pull a corresponding DWPT.
…c per updateDocuments call
…c per updateDocuments call
…Now, which now takes a date string and use that directly as now
magibney
left a comment
There was a problem hiding this comment.
Looking good I think!
I wonder if it'd be easy/worth pulling out as much as possible into its own class, as opposed to static fields/methods on DWPT (and PeekIterable from DocumentsWriter as well). As it stands it's kinda obvious where the changes are, but putting in a dedicated file would make it a bit clearer/cleaner?
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 39a3420. Configure here.
Thanks! Did a quick refactor at here . For now keeping |
2. Enable routing for unit testing 3. Added comments on parent field handling
…stments to SegmentRoutingUtil to allow setting some constants for tests
| } | ||
|
|
||
| static long mapToBucket(Iterable<? extends IndexableField> doc) { | ||
| return mapToBucket(doc, TEMPORAL_ADJUST_NOW != null ? TEMPORAL_ADJUST_NOW : System.currentTimeMillis(), defaultBucket()); |
There was a problem hiding this comment.
we shouldn't really need to repeateedly invoke System.currentTimeMillis(). Ideally we could set a TEMPORAL_ADJUST_NANOS based on diff between System.nanoTime() and configured sysprop lucene.temporalField.adjustNow?
There was a problem hiding this comment.
are u thinking about optimization of calling System.nanoTime() instead of System.currentTimeMillis() all the time? like calculate the base diff at init time (vs System.currentTimeMillis() or TEMPORAL_ADJUST_NOW). Then later on we can just use base diff + System.nanoTime() (ie cheaper?)
There was a problem hiding this comment.
I have made the changes here, can u please 👀 ? e9809bc
It's slightly different for the handling of lucene.temporalField.adjustNow. In such case I think we don't even want the time to advance at all, since having a static now time will be the most consistent for testing.
|
ping @hiteshk25 before we merge (think it should be close to approval from @magibney), please let us know if there are any concerns. A TL;DR of the changes:
|
|
👁️ |
| if (ADJUST_NOW != null) { //explicitly defined a static now time. Use it for all calls | ||
| return ADJUST_NOW; | ||
| } else { | ||
| return NOW_BASE_IN_MILLI_SEC + TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); |
There was a problem hiding this comment.
I think this is not formally correct. System.nanoTime() can rollover and be negative. I think if we're going to do this, we have to convert fully to nanos -- e.g.
NOW_BASE_MILLIS = ADJUST_NOW == null ? System.currentTimeMillis() : ADJUST_NOW;
NOW_BASE_NANOS = System.nanoTime();
getNow() {
return NOW_BASE_MILLIS + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - NOW_BASE_NANOS);
}There was a problem hiding this comment.
I was trying to save one arithmetic but good point on the wrap-around issue!
Im not going to care about ADJUST_NOW, since if it's defined, we will NOT do any computation and use the value directly

Description
Built on top of this #40, with changes to only support delegating to a bucketed
DocumentWriterPerThread(DWPT) if the input toDocumentWriter#updateDocumentsis a single doc. This simplifies the logic flowThe bucket key will be the value of the input doc's field (defined by sysprop
lucene.temporalField.name) mapped to the boundary of the bucket. The default boundaries are (in days)[-9, 3, 9, 32, 94, 184]. For example for-Dlucene.temporalField.name=EventStart,SessionStart,UserLastIndexedEventStart,UserTipLastEventStart, a doc withEventStarta day ago will be mapped to the bucket3(up to 3 days ago)This is related to the
TemporalMergePolicywork which group segments of the same bucket (using the same boundaries as defined in here) for merges.Solution
Mostly work from @magibney from this PR
lucene.temporalField.nameand map input doc into bucket keyDocumentWriterFlushControlandDocumentWriterPerThreadPoolto support creating/managing DWPT instances with a bucket keyPeekIterableto peek first (and only doc) to delegate to corresponding bucket DWPTMergePolicyto expose bucket mapping logic to higher layer (SolrTemporalMergePolicyetc)Take note that we only support bucket mapping for
DocumentWriter#updateDocumentswith single doc for now. As supporting multiple document get complicated, as demonstrated in this original PR:Note
Medium Risk
Changes the hot indexing path (DWPT acquisition and pooling) when routing is on, though behavior is opt-in via system properties and limited to single-document updates without a parent field.
Overview
Adds time-bucket routing at flush time so single-document
updateDocumentspaths can pin in-RAM segments to a bucket derived from a document’s temporal numeric field (primary + optional fallbacks), driven by JVM properties such aslucene.temporalField.name,lucene.temporalField.boundaries, andlucene.temporalField.adjustNow. Multi-document batches and indexes with a parent field skip routing and use the default bucket.DocumentsWriterpeeks the lone document (viaPeekIterable) when routing is enabled, maps it withSegmentRoutingUtil, and callsobtainAndLock(bucket).DocumentsWriterPerThreadPoolkeeps separate free queues per bucket; eachDocumentsWriterPerThreadcarries itsbucketfor checkout/return.MergePolicy.mapToBucketre-exports the boundary logic for Solr-style temporal merge policies.A concurrent
testUpdateDocumentRoutingchecks that live docs land in one temporal bucket per leaf after adds/updates; merge-policy reflection tests ignore the new static helper.Reviewed by Cursor Bugbot for commit 5d28831. Bugbot is set up for automated code reviews on this repo. Configure here.