Skip to content

Commit

Permalink
Add partial support for object mappings
Browse files Browse the repository at this point in the history
Signed-off-by: John Mazanec <[email protected]>
  • Loading branch information
jmazanec15 committed Jan 29, 2025
1 parent 74387f9 commit d449151
Show file tree
Hide file tree
Showing 9 changed files with 440 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,29 @@ public KnnVectorsFormat knnVectorsFormat() {

@Override
public StoredFieldsFormat storedFieldsFormat() {
DerivedSourceReadersSupplier derivedSourceReadersSupplier = new DerivedSourceReadersSupplier(
(segmentReadState) -> knnVectorsFormat().fieldsReader(segmentReadState),
(segmentReadState) -> docValuesFormat().fieldsProducer(segmentReadState),
(segmentReadState) -> postingsFormat().fieldsProducer(segmentReadState)
);
DerivedSourceReadersSupplier derivedSourceReadersSupplier = new DerivedSourceReadersSupplier((segmentReadState) -> {
if (segmentReadState.fieldInfos.hasVectorValues()) {
return knnVectorsFormat().fieldsReader(segmentReadState);
}
return null;
}, (segmentReadState) -> {
if (segmentReadState.fieldInfos.hasDocValues()) {
return docValuesFormat().fieldsProducer(segmentReadState);
}
return null;

}, (segmentReadState) -> {
if (segmentReadState.fieldInfos.hasPostings()) {
return postingsFormat().fieldsProducer(segmentReadState);
}
return null;

}, (segmentReadState -> {
if (segmentReadState.fieldInfos.hasNorms()) {
return normsFormat().normsProducer(segmentReadState);
}
return null;
}));
return new DerivedSourceStoredFieldsFormat(delegate.storedFieldsFormat(), derivedSourceReadersSupplier, mapperService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,26 @@
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.codecs.FieldsProducer;
import org.apache.lucene.codecs.KnnVectorsReader;
import org.apache.lucene.codecs.NormsProducer;
import org.apache.lucene.util.IOUtils;

import java.io.Closeable;
import java.io.IOException;

/**
* Class holds the readers necessary to implement derived source.
* Class holds the readers necessary to implement derived source. Important to note that if a segment does not have
* any of these fields, the values will be null. Caller needs to check if these are null before using.
*/
@RequiredArgsConstructor
@Getter
public class DerivedSourceReaders implements Closeable {
private final KnnVectorsReader knnVectorsReader;
private final DocValuesProducer docValuesProducer;
private final FieldsProducer fieldsProducer;
private final NormsProducer normsProducer;

@Override
public void close() throws IOException {
IOUtils.close(knnVectorsReader, docValuesProducer, fieldsProducer);
IOUtils.close(knnVectorsReader, docValuesProducer, fieldsProducer, normsProducer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.codecs.FieldsProducer;
import org.apache.lucene.codecs.KnnVectorsReader;
import org.apache.lucene.codecs.NormsProducer;
import org.apache.lucene.index.SegmentReadState;

import java.io.IOException;
Expand All @@ -23,6 +24,7 @@ public class DerivedSourceReadersSupplier {
private final DerivedSourceReaderSupplier<KnnVectorsReader> knnVectorsReaderSupplier;
private final DerivedSourceReaderSupplier<DocValuesProducer> docValuesProducerSupplier;
private final DerivedSourceReaderSupplier<FieldsProducer> fieldsProducerSupplier;
private final DerivedSourceReaderSupplier<NormsProducer> normsProducer;

/**
* Get the readers for the segment
Expand All @@ -35,7 +37,8 @@ public DerivedSourceReaders getReaders(SegmentReadState state) throws IOExceptio
return new DerivedSourceReaders(
knnVectorsReaderSupplier.apply(state),
docValuesProducerSupplier.apply(state),
fieldsProducerSupplier.apply(state)
fieldsProducerSupplier.apply(state),
normsProducer.apply(state)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.SegmentReadState;
Expand Down Expand Up @@ -36,7 +37,19 @@ public class NestedPerFieldDerivedVectorInjector implements PerFieldDerivedVecto

@Override
public void inject(int parentDocId, Map<String, Object> sourceAsMap) throws IOException {
// Setup the iterator. Return if not-relevant
// If the parent has the field, then it is just an object field.
if (getLowestDocIdForField(childFieldInfo.name, parentDocId) == parentDocId) {
injectObject(parentDocId, sourceAsMap);
return;
}

if (ParentChildHelper.splitPath(childFieldInfo.name).length > 2) {
// We do not support nested fields beyond one level
log.warn("Nested fields beyond one level are not supported. Field: {}", childFieldInfo.name);
return;
}

// Setup the iterator. Return if no parent
String childFieldName = ParentChildHelper.getChildField(childFieldInfo.name);
String parentFieldName = ParentChildHelper.getParentField(childFieldInfo.name);
if (parentFieldName == null) {
Expand All @@ -49,7 +62,7 @@ public void inject(int parentDocId, Map<String, Object> sourceAsMap) throws IOEx
parentDocId
);

// Initializes the parent field so that there is a map to put each of the children
// Initializes the parent field so that there is a list to put each of the children
Object originalParentValue = sourceAsMap.get(parentFieldName);
List<Map<String, Object>> reconstructedSource;
if (originalParentValue instanceof Map) {
Expand All @@ -58,16 +71,17 @@ public void inject(int parentDocId, Map<String, Object> sourceAsMap) throws IOEx
reconstructedSource = (List<Map<String, Object>>) originalParentValue;
}

// Contains the positions of existing objects in the map. This is used to help figure out the best play to put back the vectors
List<Integer> positions = mapObjectsToPositionInSource(
// Contains the docIds of existing objects in the map in order. This is used to help figure out the best play
// to put back the vectors
List<Integer> positions = mapObjectsToPositionInNestedList(
reconstructedSource,
nestedPerFieldParentToDocIdIterator.firstChild(),
parentDocId
);

// Finally, inject children for the document into the source. This code is non-trivial because filtering out
// the vectors during write could mean that children docs disappear from the source. So, to properly put
// everything back, we need to igure out where the existing fields in the original map to
// everything back, we need to figure out where the existing fields in the original map to
KNNVectorValues<?> vectorValues = KNNVectorValuesFactory.getVectorValues(
childFieldInfo,
derivedSourceReaders.getDocValuesProducer(),
Expand All @@ -83,10 +97,8 @@ public void inject(int parentDocId, Map<String, Object> sourceAsMap) throws IOEx
if (vectorValues.docId() != nestedPerFieldParentToDocIdIterator.childId()) {
continue;
}
int docId = vectorValues.docId();
if (docId >= parentDocId) {
break;
}

int docId = nestedPerFieldParentToDocIdIterator.childId();
boolean isInsert = true;
int position = positions.size(); // by default we insert it at the end
for (int i = offsetPositionsIndex; i < positions.size(); i++) {
Expand All @@ -111,39 +123,110 @@ public void inject(int parentDocId, Map<String, Object> sourceAsMap) throws IOEx
sourceAsMap.put(parentFieldName, reconstructedSource);
}

private List<Integer> mapObjectsToPositionInSource(List<Map<String, Object>> originals, int firstChild, int parent) throws IOException {
private void injectObject(int docId, Map<String, Object> sourceAsMap) throws IOException {
KNNVectorValues<?> vectorValues = KNNVectorValuesFactory.getVectorValues(
childFieldInfo,
derivedSourceReaders.getDocValuesProducer(),
derivedSourceReaders.getKnnVectorsReader()
);
if (vectorValues.docId() != docId && vectorValues.advance(docId) != docId) {
return;
}
String[] fields = ParentChildHelper.splitPath(childFieldInfo.name);
Map<String, Object> currentMap = sourceAsMap;
for (int i = 0; i < fields.length - 1; i++) {
String field = fields[i];
currentMap = (Map<String, Object>) currentMap.computeIfAbsent(field, k -> new HashMap<>());
}
currentMap.put(fields[fields.length - 1], vectorValues.getVector());
}

/**
* Given a list of maps, map each map to a position in the nested list. This is used to help figure out where to put
* the vectors back in the source.
*
* @param originals list of maps
* @param firstChild first child docId
* @param parent parent docId
* @return list of positions in the nested list
* @throws IOException if there is an issue reading from the formats
*/
private List<Integer> mapObjectsToPositionInNestedList(List<Map<String, Object>> originals, int firstChild, int parent)
throws IOException {
List<Integer> positions = new ArrayList<>();
int offset = firstChild;
for (Map<String, Object> docWithFields : originals) {
int fieldMapping = docToOrdinal(docWithFields, offset, parent);
int fieldMapping = mapToDocId(docWithFields, offset, parent);
assert fieldMapping != -1;
positions.add(fieldMapping);
offset = fieldMapping + 1;
}
return positions;
}

// Offset is first eligible object
private Integer docToOrdinal(Map<String, Object> doc, int offset, int parent) throws IOException {
String keyToCheck = doc.keySet().iterator().next();
int position = getFieldsForDoc(keyToCheck, offset);
// Advancing past the parent means something went horribly wrong
/**
* Given a doc as a map and the offset it has to be, find the ordinal of the first field that is greater than the
* offset.
*
* @param doc doc to find the ordinal for
* @param offset offset to start searching from
* @return id of the first field that is greater than the offset
* @throws IOException if there is an issue reading from the formats
*/
private int mapToDocId(Map<String, Object> doc, int offset, int parent) throws IOException {
// For all the fields, we look for the first doc that matches any of the fields.
int position = NO_MORE_DOCS;
for (String key : doc.keySet()) {
position = getLowestDocIdForField(ParentChildHelper.constructSiblingField(childFieldInfo.name, key), offset);
if (position < parent) {
break;
}
}

// Advancing past the parent means something went wrong
assert position < parent;
return position;
}

private int getFieldsForDoc(String fieldToMatch, int offset) throws IOException {
// TODO: Fix this up to follow
// https://github.com/apache/lucene/blob/main/lucene/core/src/java/org/apache/lucene/search/FieldExistsQuery.java#L170-L218.
// In a perfect world, it would try everything and fall through to the field exists stuff
FieldInfo fieldInfo = segmentReadState.fieldInfos.fieldInfo(
ParentChildHelper.constructSiblingField(childFieldInfo.name, fieldToMatch)
);
/**
* Get the lowest docId for a field that is greater than the offset.
*
* @param fieldToMatch field to find the lowest docId for
* @param offset offset to start searching from
* @return lowest docId for the field that is greater than the offset. Returns {@link DocIdSetIterator#NO_MORE_DOCS} if doc cannot be found
* @throws IOException if there is an issue reading from the formats
*/
private int getLowestDocIdForField(String fieldToMatch, int offset) throws IOException {
// This method implementation is inspired by the FieldExistsQuery in Lucene and the FieldNamesMapper in
// Opensearch. We first mimic the logic in the FieldExistsQuery in order to identify the docId of the nested
// doc. If that fails, we rely on
// References:
// 1. https://github.com/apache/lucene/blob/main/lucene/core/src/java/org/apache/lucene/search/FieldExistsQuery.java#L170-L218.
// 2.
// https://github.com/opensearch-project/OpenSearch/blob/main/server/src/main/java/org/opensearch/index/mapper/FieldMapper.java#L316-L324
FieldInfo fieldInfo = segmentReadState.fieldInfos.fieldInfo(fieldToMatch);

if (fieldInfo == null) {
return NO_MORE_DOCS;
}

DocIdSetIterator iterator = null;
if (fieldInfo != null) {
switch (fieldInfo.getDocValuesType()) {
case NONE:
if (fieldInfo.hasNorms() && derivedSourceReaders.getNormsProducer() != null) { // the field indexes norms
iterator = derivedSourceReaders.getNormsProducer().getNorms(fieldInfo);
} else if (fieldInfo.getVectorDimension() != 0 && derivedSourceReaders.getKnnVectorsReader() != null) { // the field indexes vectors
switch (fieldInfo.getVectorEncoding()) {
case FLOAT32:
iterator = derivedSourceReaders.getKnnVectorsReader().getFloatVectorValues(fieldInfo.name);
break;
case BYTE:
iterator = derivedSourceReaders.getKnnVectorsReader().getByteVectorValues(fieldInfo.name);
break;
}
} else if (fieldInfo.getDocValuesType() != DocValuesType.NONE && derivedSourceReaders.getDocValuesProducer() != null) { // the field
// indexes
// doc
// values
switch (fieldInfo.getDocValuesType()) {
case NUMERIC:
iterator = derivedSourceReaders.getDocValuesProducer().getNumeric(fieldInfo);
break;
Expand All @@ -159,6 +242,7 @@ private int getFieldsForDoc(String fieldToMatch, int offset) throws IOException
case SORTED_SET:
iterator = derivedSourceReaders.getDocValuesProducer().getSortedSet(fieldInfo);
break;
case NONE:
default:
throw new AssertionError();
}
Expand All @@ -167,9 +251,16 @@ private int getFieldsForDoc(String fieldToMatch, int offset) throws IOException
return iterator.advance(offset);
}

// Check the field names field type for matches
if (derivedSourceReaders.getFieldsProducer() == null) {
return NO_MORE_DOCS;
}
Terms terms = derivedSourceReaders.getFieldsProducer().terms(FieldNamesFieldMapper.NAME);
if (terms == null) {
return NO_MORE_DOCS;
}
TermsEnum fieldNameFieldsTerms = terms.iterator();
BytesRef fieldToMatchRef = new BytesRef(fieldToMatch);
BytesRef fieldToMatchRef = new BytesRef(fieldInfo.name);
PostingsEnum postingsEnum = null;
while (fieldNameFieldsTerms.next() != null) {
BytesRef currentTerm = fieldNameFieldsTerms.term();
Expand All @@ -178,7 +269,9 @@ private int getFieldsForDoc(String fieldToMatch, int offset) throws IOException
break;
}
}
assert postingsEnum != null;
if (postingsEnum == null) {
return NO_MORE_DOCS;
}
return postingsEnum.advance(offset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ private List<Integer> getChildren() throws IOException {
String parentField = ParentChildHelper.getParentField(childField);

Terms terms = derivedSourceReaders.getFieldsProducer().terms("_nested_path");
if (terms == null) {
return Collections.emptyList();
}
TermsEnum nestedFieldsTerms = terms.iterator();
BytesRef childPathRef = new BytesRef(parentField);
PostingsEnum postingsEnum = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,15 @@ public static String getChildField(String field) {
public static String constructSiblingField(String field, String sibling) {
return getParentField(field) + "." + sibling;
}

/**
* Split a nested field path into an array of strings. For instance, if the field is "parent.to.child", this would
* return ["parent", "to", "child"].
*
* @param field nested field path
* @return array of strings representing the nested field path
*/
public static String[] splitPath(String field) {
return field.split("\\.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,18 @@ public static <T> KNNVectorValues<T> getVectorValues(
final KnnVectorsReader knnVectorsReader
) throws IOException {
final DocIdSetIterator docIdSetIterator;
if (fieldInfo.hasVectorValues()) {
if (fieldInfo.hasVectorValues() && knnVectorsReader != null) {
if (fieldInfo.getVectorEncoding() == VectorEncoding.BYTE) {
docIdSetIterator = knnVectorsReader.getByteVectorValues(fieldInfo.getName());
} else if (fieldInfo.getVectorEncoding() == VectorEncoding.FLOAT32) {
docIdSetIterator = knnVectorsReader.getFloatVectorValues(fieldInfo.getName());
} else {
throw new IllegalArgumentException("Invalid Vector encoding provided, hence cannot return VectorValues");
}
} else {
} else if (docValuesProducer != null) {
docIdSetIterator = docValuesProducer.getBinary(fieldInfo);
} else {
throw new IllegalArgumentException("Field does not have vector values and DocValues");
}
final KNNVectorValuesIterator vectorValuesIterator = new KNNVectorValuesIterator.DocIdsIteratorValues(docIdSetIterator);
return getVectorValues(FieldInfoExtractor.extractVectorDataType(fieldInfo), vectorValuesIterator);
Expand Down
Loading

0 comments on commit d449151

Please sign in to comment.