Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertThrows;

public class PostgresDocStoreTest {

public static final String ID = "id";
Expand Down Expand Up @@ -304,22 +306,98 @@ public void testIgnoreCaseLikeQuery() throws IOException {
}

@Test
public void testSearch() throws IOException {
public void testInQuery() throws IOException {
Collection collection = datastore.getCollection(COLLECTION_NAME);
String documentString = "{\"attributes\":{\"trace_id\":{\"value\":{\"string\":\"00000000000000005e194fdf9fbf5101\"}},\"span_id\":{\"value\":{\"string\":\"6449f1f720c93a67\"}},\"service_type\":{\"value\":{\"string\":\"JAEGER_SERVICE\"}},\"FQN\":{\"value\":{\"string\":\"driver\"}}},\"createdTime\":1605692185945,\"entityId\":\"e3ffc6f0-fc92-3a9c-9fa0-26269184d1aa\",\"entityName\":\"driver\",\"entityType\":\"SERVICE\",\"identifyingAttributes\":{\"FQN\":{\"value\":{\"string\":\"driver\"}}},\"tenantId\":\"__default\"}";
Document document = new JSONDocument(documentString);
SingleValueKey key = new SingleValueKey("default", "testKey1");
collection.upsert(key, document);
collection.upsert(new SingleValueKey("default", "testKey1"), createDocument("name", "Bob"));
collection.upsert(new SingleValueKey("default", "testKey2"), createDocument("name", "Alice"));
collection.upsert(new SingleValueKey("default", "testKey3"), createDocument("name", "Halo"));

List<String> inArray = new ArrayList<>();
inArray.add("Bob");
inArray.add("Alice");

// Search _id field in the document
Query query = new Query();
query.setFilter(new Filter(Filter.Op.EQ, DOCUMENT_ID, key.toString()));
query.setFilter(new Filter(Filter.Op.IN, "name", inArray));
Iterator<Document> results = collection.search(query);
List<Document> documents = new ArrayList<>();
while (results.hasNext()) {
documents.add(results.next());
}
Assertions.assertEquals(documents.size(), 1);
Assertions.assertEquals(documents.size(), 2);
}

@Test
public void testSearch() throws IOException {
Collection collection = datastore.getCollection(COLLECTION_NAME);
String docStr1 = "{\"amount\":1234.5,\"attributes\":{\"trace_id\":{\"value\":{\"string\":\"00000000000000005e194fdf9fbf5101\"}},\"span_id\":{\"value\":{\"string\":\"6449f1f720c93a67\"}},\"service_type\":{\"value\":{\"string\":\"JAEGER_SERVICE\"}},\"FQN\":{\"value\":{\"string\":\"driver\"}}},\"createdTime\":1605692185945,\"entityId\":\"e3ffc6f0-fc92-3a9c-9fa0-26269184d1aa\",\"entityName\":\"driver\",\"entityType\":\"SERVICE\",\"identifyingAttributes\":{\"FQN\":{\"value\":{\"string\":\"driver\"}}},\"tenantId\":\"__default\"}";
Document document1 = new JSONDocument(docStr1);
SingleValueKey key1 = new SingleValueKey("default", "testKey1");
collection.upsert(key1, document1);

String docStr2 = "{\"amount\":1234,\"attributes\":{\"trace_id\":{\"value\":{\"string\":\"00000000000000005e194fdf9fbf5101\"}},\"span_id\":{\"value\":{\"string\":\"6449f1f720c93a67\"}},\"service_type\":{\"value\":{\"string\":\"JAEGER_SERVICE\"}},\"FQN\":{\"value\":{\"string\":\"driver\"}}},\"createdTime\":1605692185945,\"entityId\":\"e3ffc6f0-fc92-3a9c-9fa0-26269184d1aa\",\"entityName\":\"driver\",\"entityType\":\"SERVICE\",\"identifyingAttributes\":{\"FQN\":{\"value\":{\"string\":\"driver\"}}},\"tenantId\":\"__default\"}";
Document document2 = new JSONDocument(docStr2);
SingleValueKey key2 = new SingleValueKey("default", "testKey2");
collection.upsert(key2, document2);

// Search integer field
{
Query query = new Query();
query.setFilter(new Filter(Filter.Op.EQ, "amount", 1234));
Iterator<Document> results = collection.search(query);
List<Document> documents = new ArrayList<>();
for (; results.hasNext(); ) {
documents.add(results.next());
}
Assertions.assertEquals(documents.size(), 1);
}

// Search float field
{
Query query = new Query();
query.setFilter(new Filter(Filter.Op.EQ, "amount", 1234.5));
Iterator<Document> results = collection.search(query);
List<Document> documents = new ArrayList<>();
for (; results.hasNext(); ) {
documents.add(results.next());
}
Assertions.assertEquals(documents.size(), 1);
}

// Search integer and float field
{
Query query = new Query();
query.setFilter(new Filter(Filter.Op.GTE, "amount", 123));
Iterator<Document> results = collection.search(query);
List<Document> documents = new ArrayList<>();
for (; results.hasNext(); ) {
documents.add(results.next());
}
Assertions.assertEquals(documents.size(), 2);
}

// Search _id field in the document
{
Query query = new Query();
query.setFilter(new Filter(Filter.Op.EQ, DOCUMENT_ID, key1.toString()));
Iterator<Document> results = collection.search(query);
List<Document> documents = new ArrayList<>();
for (; results.hasNext(); ) {
documents.add(results.next());
}
Assertions.assertEquals(documents.size(), 1);
}

// Unsupported Object Type in Filter, should throw an UnsupportedOperationException
{
Query query = new Query();
query.setFilter(new Filter(Filter.Op.EQ, "amount", new Filter()));
String expected = "Un-supported object types in filter";
Exception exception = assertThrows(UnsupportedOperationException.class,
() -> collection.search(query));
String actualMessage = exception.getMessage();
Assertions.assertTrue(actualMessage.contains(expected));
}

}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.hypertrace.core.documentstore.postgres;

import java.util.HashMap;
import java.util.Map;

/**
* Holds the params that need to be set in the PreparedStatement for constructing the final SQL
* query
*/
public class Params {

// Map of index to the corresponding param value
private final Map<Integer, Object> objectParams;

private Params(
Map<Integer, Object> objectParams) {
this.objectParams = objectParams;
}

@Override
public String toString() {
return "Params{" +
"objectParams=" + objectParams +
'}';
}

public Map<Integer, Object> getObjectParams() {
return objectParams;
}

public static Builder newBuilder() {
return new Builder();
}

public static class Builder {

private int nextIndex;
private final Map<Integer, Object> objectParams;

private Builder() {
nextIndex = 1;
objectParams = new HashMap<>();
}

public Builder addObjectParam(Object paramValue) {
objectParams.put(nextIndex++, paramValue);
return this;
}

public Params build() {
return new Params(objectParams);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class PostgresCollection implements Collection {
}};
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final String DOC_PATH_SEPARATOR = "\\.";
private static String QUESTION_MARK = "?";


private final Connection client;
private final String collectionName;
Expand Down Expand Up @@ -147,13 +149,13 @@ public boolean updateSubDoc(Key key, String subDocPath, Document subDocument) {
@Override
public Iterator<Document> search(Query query) {
String filters = null;
String space = " ";
StringBuilder searchSQLBuilder = new StringBuilder("SELECT * FROM")
.append(space).append(collectionName);
StringBuilder sqlBuilder = new StringBuilder("SELECT * FROM ")
.append(collectionName);
Params.Builder paramsBuilder = Params.newBuilder();

// If there is a filter in the query, parse it fully.
if (query.getFilter() != null) {
filters = parseQuery(query.getFilter());
filters = parseFilter(query.getFilter(), paramsBuilder);
}

LOGGER.debug(
Expand All @@ -162,27 +164,27 @@ public Iterator<Document> search(Query query) {
filters);

if (filters != null) {
searchSQLBuilder
sqlBuilder
.append(" WHERE ").append(filters);
}

if (!query.getOrderBys().isEmpty()) {
String orderBySQL = parseOrderByQuery(query.getOrderBys());
searchSQLBuilder.append(" ORDER BY ").append(orderBySQL);
sqlBuilder.append(" ORDER BY ").append(orderBySQL);
}

Integer limit = query.getLimit();
if (limit != null && limit >= 0) {
searchSQLBuilder.append(" LIMIT ").append(limit);
sqlBuilder.append(" LIMIT ").append(limit);
}

Integer offset = query.getOffset();
if (offset != null && offset >= 0) {
searchSQLBuilder.append(" OFFSET ").append(offset);
sqlBuilder.append(" OFFSET ").append(offset);
}

try {
PreparedStatement preparedStatement = client.prepareStatement(searchSQLBuilder.toString());
PreparedStatement preparedStatement = buildPreparedStatement(sqlBuilder.toString(), paramsBuilder.build());
ResultSet resultSet = preparedStatement.executeQuery();
return new PostgresResultIterator(resultSet);
} catch (SQLException e) {
Expand All @@ -193,16 +195,16 @@ public Iterator<Document> search(Query query) {
}

@VisibleForTesting
protected String parseQuery(Filter filter) {
protected String parseFilter(Filter filter, Params.Builder paramsBuilder) {
if (filter.isComposite()) {
return parseQueryForCompositeFilter(filter);
return parseCompositeFilter(filter, paramsBuilder);
} else {
return parseQueryForNonCompositeFilter(filter);
return parseNonCompositeFilter(filter, paramsBuilder);
}
}

@VisibleForTesting
protected String parseQueryForNonCompositeFilter(Filter filter) {
protected String parseNonCompositeFilter(Filter filter, Params.Builder paramsBuilder) {
Filter.Op op = filter.getOp();
Object value = filter.getValue();
String fieldName = filter.getFieldName();
Expand Down Expand Up @@ -234,7 +236,10 @@ protected String parseQueryForNonCompositeFilter(Filter filter) {
List<Object> values = (List<Object>) value;
String collect = values
.stream()
.map(val -> "'" + val + "'")
.map(val -> {
paramsBuilder.addObjectParam(val);
return QUESTION_MARK;
})
.collect(Collectors.joining(", "));
return filterString.append("(").append(collect).append(")").toString();
case CONTAINS:
Expand All @@ -244,22 +249,23 @@ protected String parseQueryForNonCompositeFilter(Filter filter) {
case NOT_EXISTS:
// TODO: Checks if key does not exist
case NEQ:
throw new UnsupportedOperationException("Only Equality predicate is supported");
default:
throw new UnsupportedOperationException(
String.format("Query operation:%s not supported", op));
}
return filterString.append("'").append(value).append("'").toString();
String filters = filterString.append(QUESTION_MARK).toString();
paramsBuilder.addObjectParam(value);
return filters;
}

@VisibleForTesting
protected String parseQueryForCompositeFilter(Filter filter) {
protected String parseCompositeFilter(Filter filter, Params.Builder paramsBuilder) {
Filter.Op op = filter.getOp();
switch (op) {
case OR: {
String childList =
Arrays.stream(filter.getChildFilters())
.map(this::parseQuery)
.map(childFilter -> parseFilter(childFilter, paramsBuilder))
.filter(str -> !StringUtils.isEmpty(str))
.map(str -> "(" + str + ")")
.collect(Collectors.joining(" OR "));
Expand All @@ -268,18 +274,46 @@ protected String parseQueryForCompositeFilter(Filter filter) {
case AND: {
String childList =
Arrays.stream(filter.getChildFilters())
.map(this::parseQuery)
.map(childFilter -> parseFilter(childFilter, paramsBuilder))
.filter(str -> !StringUtils.isEmpty(str))
.map(str -> "(" + str + ")")
.collect(Collectors.joining(" AND "));
return !childList.isEmpty() ? childList : null;
}
default:
throw new UnsupportedOperationException(
String.format("Boolean operation:%s not supported", op));
String.format("Query operation:%s not supported", op));
}
}

@VisibleForTesting
protected PreparedStatement buildPreparedStatement(String sqlQuery, Params params) throws SQLException, RuntimeException {
PreparedStatement preparedStatement = client.prepareStatement(sqlQuery);
params.getObjectParams().forEach((k, v) -> {
try {
if (isValidType(v)) {
preparedStatement.setString(k, String.valueOf(v));
} else {
throw new UnsupportedOperationException("Un-supported object types in filter");
}
} catch (SQLException e) {
LOGGER.error("SQLException setting Param. key: {}, value: {}", k, v);
}
});
return preparedStatement;
}

private boolean isValidType(Object v) {
Set<Class<?>> validClassez = new HashSet<>() {{
add(Double.class);
add(Float.class);
add(Integer.class);
add(Long.class);
add(String.class);
}};
return validClassez.contains(v.getClass());
}

@VisibleForTesting
private String getJsonSubDocPath(String subDocPath) {
return "{" + subDocPath.replaceAll(DOC_PATH_SEPARATOR, ",") + "}";
Expand Down Expand Up @@ -379,10 +413,12 @@ public long count() {
public long total(Query query) {
StringBuilder totalSQLBuilder = new StringBuilder("SELECT COUNT(*) FROM ")
.append(collectionName);
Params.Builder paramsBuilder = Params.newBuilder();

long count = -1;
// on any in-correct filter input, it will return total without filtering
if (query.getFilter() != null) {
String parsedQuery = parseQuery(query.getFilter());
String parsedQuery = parseFilter(query.getFilter(), paramsBuilder);
if (parsedQuery != null) {
totalSQLBuilder.append(" WHERE ").append(parsedQuery);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.hypertrace.core.documentstore.postgres;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class ParamsTest {

Params.Builder paramBuilder;

@BeforeEach
void setUp() {
paramBuilder = Params.newBuilder();
}

@Test
public void testAllParamsAndIndex() {
paramBuilder.addObjectParam(1);
paramBuilder.addObjectParam(2L);
paramBuilder.addObjectParam("Alice");
paramBuilder.addObjectParam(3L);
paramBuilder.addObjectParam(4L);
Params params = paramBuilder.build();
int index = 1;
Assertions.assertEquals(params.getObjectParams().get(index++), 1);
Assertions.assertEquals(params.getObjectParams().get(index++), 2L);
Assertions.assertEquals(params.getObjectParams().get(index++), "Alice");
Assertions.assertEquals(params.getObjectParams().get(index++), 3L);
Assertions.assertEquals(params.getObjectParams().get(index), 4L);
Assertions.assertEquals(index, 5);
}

}
Loading