2020package org .elasticsearch .common .lucene .uid ;
2121
2222import org .apache .lucene .index .IndexReader ;
23- import org .apache .lucene .index .LeafReader ;
2423import org .apache .lucene .index .LeafReaderContext ;
24+ import org .apache .lucene .index .NumericDocValues ;
2525import org .apache .lucene .index .Term ;
2626import org .apache .lucene .util .CloseableThreadLocal ;
2727import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
28+ import org .elasticsearch .index .mapper .SeqNoFieldMapper ;
2829
2930import java .io .IOException ;
3031import java .util .List ;
3637/** Utility class to resolve the Lucene doc ID, version, seqNo and primaryTerms for a given uid. */
3738public final class VersionsAndSeqNoResolver {
3839
39- static final ConcurrentMap <Object , CloseableThreadLocal <PerThreadIDVersionAndSeqNoLookup >> lookupStates =
40+ static final ConcurrentMap <IndexReader . CacheKey , CloseableThreadLocal <PerThreadIDVersionAndSeqNoLookup [] >> lookupStates =
4041 ConcurrentCollections .newConcurrentMapWithAggressiveConcurrency ();
4142
4243 // Evict this reader from lookupStates once it's closed:
4344 private static final IndexReader .ClosedListener removeLookupState = key -> {
44- CloseableThreadLocal <PerThreadIDVersionAndSeqNoLookup > ctl = lookupStates .remove (key );
45+ CloseableThreadLocal <PerThreadIDVersionAndSeqNoLookup [] > ctl = lookupStates .remove (key );
4546 if (ctl != null ) {
4647 ctl .close ();
4748 }
4849 };
4950
50- private static PerThreadIDVersionAndSeqNoLookup getLookupState (LeafReader reader , String uidField ) throws IOException {
51- IndexReader .CacheHelper cacheHelper = reader .getCoreCacheHelper ();
52- CloseableThreadLocal <PerThreadIDVersionAndSeqNoLookup > ctl = lookupStates .get (cacheHelper .getKey ());
51+ private static PerThreadIDVersionAndSeqNoLookup [] getLookupState (IndexReader reader , String uidField ) throws IOException {
52+ // We cache on the top level
53+ // This means cache entries have a shorter lifetime, maybe as low as 1s with the
54+ // default refresh interval and a steady indexing rate, but on the other hand it
55+ // proved to be cheaper than having to perform a CHM and a TL get for every segment.
56+ // See https://github.com/elastic/elasticsearch/pull/19856.
57+ IndexReader .CacheHelper cacheHelper = reader .getReaderCacheHelper ();
58+ CloseableThreadLocal <PerThreadIDVersionAndSeqNoLookup []> ctl = lookupStates .get (cacheHelper .getKey ());
5359 if (ctl == null ) {
5460 // First time we are seeing this reader's core; make a new CTL:
5561 ctl = new CloseableThreadLocal <>();
56- CloseableThreadLocal <PerThreadIDVersionAndSeqNoLookup > other = lookupStates .putIfAbsent (cacheHelper .getKey (), ctl );
62+ CloseableThreadLocal <PerThreadIDVersionAndSeqNoLookup [] > other = lookupStates .putIfAbsent (cacheHelper .getKey (), ctl );
5763 if (other == null ) {
58- // Our CTL won, we must remove it when the core is closed:
64+ // Our CTL won, we must remove it when the reader is closed:
5965 cacheHelper .addClosedListener (removeLookupState );
6066 } else {
6167 // Another thread beat us to it: just use their CTL:
6268 ctl = other ;
6369 }
6470 }
6571
66- PerThreadIDVersionAndSeqNoLookup lookupState = ctl .get ();
72+ PerThreadIDVersionAndSeqNoLookup [] lookupState = ctl .get ();
6773 if (lookupState == null ) {
68- lookupState = new PerThreadIDVersionAndSeqNoLookup (reader , uidField );
74+ lookupState = new PerThreadIDVersionAndSeqNoLookup [reader .leaves ().size ()];
75+ for (LeafReaderContext leaf : reader .leaves ()) {
76+ lookupState [leaf .ord ] = new PerThreadIDVersionAndSeqNoLookup (leaf , uidField );
77+ }
6978 ctl .set (lookupState );
70- } else if (Objects .equals (lookupState .uidField , uidField ) == false ) {
79+ }
80+
81+ if (lookupState .length != reader .leaves ().size ()) {
82+ throw new AssertionError ("Mismatched numbers of leaves: " + lookupState .length + " != " + reader .leaves ().size ());
83+ }
84+
85+ if (lookupState .length > 0 && Objects .equals (lookupState [0 ].uidField , uidField ) == false ) {
7186 throw new AssertionError ("Index does not consistently use the same uid field: ["
72- + uidField + "] != [" + lookupState .uidField + "]" );
87+ + uidField + "] != [" + lookupState [ 0 ] .uidField + "]" );
7388 }
7489
7590 return lookupState ;
@@ -112,17 +127,13 @@ public static class DocIdAndSeqNo {
112127 * </ul>
113128 */
114129 public static DocIdAndVersion loadDocIdAndVersion (IndexReader reader , Term term ) throws IOException {
130+ PerThreadIDVersionAndSeqNoLookup [] lookups = getLookupState (reader , term .field ());
115131 List <LeafReaderContext > leaves = reader .leaves ();
116- if (leaves .isEmpty ()) {
117- return null ;
118- }
119132 // iterate backwards to optimize for the frequently updated documents
120133 // which are likely to be in the last segments
121134 for (int i = leaves .size () - 1 ; i >= 0 ; i --) {
122- LeafReaderContext context = leaves .get (i );
123- LeafReader leaf = context .reader ();
124- PerThreadIDVersionAndSeqNoLookup lookup = getLookupState (leaf , term .field ());
125- DocIdAndVersion result = lookup .lookupVersion (term .bytes (), leaf .getLiveDocs (), context );
135+ PerThreadIDVersionAndSeqNoLookup lookup = lookups [leaves .get (i ).ord ];
136+ DocIdAndVersion result = lookup .lookupVersion (term .bytes ());
126137 if (result != null ) {
127138 return result ;
128139 }
@@ -137,17 +148,13 @@ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term)
137148 * </ul>
138149 */
139150 public static DocIdAndSeqNo loadDocIdAndSeqNo (IndexReader reader , Term term ) throws IOException {
151+ PerThreadIDVersionAndSeqNoLookup [] lookups = getLookupState (reader , term .field ());
140152 List <LeafReaderContext > leaves = reader .leaves ();
141- if (leaves .isEmpty ()) {
142- return null ;
143- }
144153 // iterate backwards to optimize for the frequently updated documents
145154 // which are likely to be in the last segments
146155 for (int i = leaves .size () - 1 ; i >= 0 ; i --) {
147- LeafReaderContext context = leaves .get (i );
148- LeafReader leaf = context .reader ();
149- PerThreadIDVersionAndSeqNoLookup lookup = getLookupState (leaf , term .field ());
150- DocIdAndSeqNo result = lookup .lookupSeqNo (term .bytes (), leaf .getLiveDocs (), context );
156+ PerThreadIDVersionAndSeqNoLookup lookup = lookups [leaves .get (i ).ord ];
157+ DocIdAndSeqNo result = lookup .lookupSeqNo (term .bytes ());
151158 if (result != null ) {
152159 return result ;
153160 }
@@ -159,9 +166,13 @@ public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) thr
159166 * Load the primaryTerm associated with the given {@link DocIdAndSeqNo}
160167 */
161168 public static long loadPrimaryTerm (DocIdAndSeqNo docIdAndSeqNo , String uidField ) throws IOException {
162- LeafReader leaf = docIdAndSeqNo .context .reader ();
163- PerThreadIDVersionAndSeqNoLookup lookup = getLookupState (leaf , uidField );
164- long result = lookup .lookUpPrimaryTerm (docIdAndSeqNo .docId , leaf );
169+ NumericDocValues primaryTerms = docIdAndSeqNo .context .reader ().getNumericDocValues (SeqNoFieldMapper .PRIMARY_TERM_NAME );
170+ long result ;
171+ if (primaryTerms != null && primaryTerms .advanceExact (docIdAndSeqNo .docId )) {
172+ result = primaryTerms .longValue ();
173+ } else {
174+ result = 0 ;
175+ }
165176 assert result > 0 : "should always resolve a primary term for a resolved sequence number. primary_term [" + result + "]"
166177 + " docId [" + docIdAndSeqNo .docId + "] seqNo [" + docIdAndSeqNo .seqNo + "]" ;
167178 return result ;
0 commit comments