Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,17 @@

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

@EqualsAndHashCode
@RequiredArgsConstructor
public class Cursor {
public static final Cursor None = new Cursor();
public static final Cursor None = new Cursor(null);

@Getter
private final byte[] raw;

private Cursor() {
raw = new byte[] {};
}

public Cursor(byte[] raw) {
this.raw = raw;
}
private final String data;

public String toString() {
return new String(raw);
return data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,25 @@
import com.google.common.hash.HashCode;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Externalizable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.zip.Deflater;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import lombok.Data;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.expression.NamedExpression;
import org.opensearch.sql.expression.serialization.DefaultExpressionSerializer;
import org.opensearch.sql.planner.SerializablePlan;
import org.opensearch.sql.planner.physical.PaginateOperator;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.planner.physical.ProjectOperator;
import org.opensearch.sql.storage.StorageEngine;
import org.opensearch.sql.storage.TableScanOperator;

/**
* This class is entry point to paged requests. It is responsible to cursor serialization
Expand All @@ -30,132 +35,136 @@
@RequiredArgsConstructor
public class PaginatedPlanCache {
public static final String CURSOR_PREFIX = "n:";
private final StorageEngine storageEngine;

// TODO get engine from GUICE if possible
private final StorageEngine engine;

public boolean canConvertToCursor(UnresolvedPlan plan) {
return plan.accept(new CanPaginateVisitor(), null);
}

/**
* Converts a physical plan tree to a cursor. May cache plan related data somewhere.
* An auxiliary class, which provides an entry point for serialization and deserialization of
* the plan tree. It doesn't serialize itself, it calls {@link SerializablePlan#writeExternal}
* of the given plan. For deserialization, it loads a {@link SerializablePlan.PlanLoader} and
* invokes it.
*/
public Cursor convertToCursor(PhysicalPlan plan) throws IOException {
if (plan instanceof PaginateOperator) {
var cursor = plan.toCursor();
if (cursor == null) {
return Cursor.None;
}
var raw = CURSOR_PREFIX + compress(cursor);
return new Cursor(raw.getBytes());
@Data
public static class SerializationContext implements Externalizable {
private PaginateOperator plan;
/**
* If exception is thrown we don't catch it, that means something really went wrong.
* But if we can't serialize the plan, we set this flag and should return an empty cursor.
* The only case when it could happen as of now - paging is finished and there is no scroll.
*/
private boolean serializedSuccessfully = false;

public SerializationContext() {
}

public SerializationContext(PaginateOperator plan) {
this.plan = plan;
}

@Override
public void writeExternal(ObjectOutput out) throws IOException {
serializedSuccessfully = plan.writeExternal(out);
}

@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
var loader = (SerializablePlan.PlanLoader) in.readObject();
var engine = (StorageEngine) ((CursorDeserializationStream) in).resolveObject("engine");
plan = (PaginateOperator) loader.apply(in, engine);
}
return Cursor.None;
}

/**
* Compress serialized query plan.
* @param str string representing a query plan
* @return str compressed with gzip.
* Converts a physical plan tree to a cursor.
*/
String compress(String str) throws IOException {
if (str == null || str.length() == 0) {
return "";
public Cursor convertToCursor(PhysicalPlan plan) {
if (plan instanceof PaginateOperator) {
var context = new SerializationContext((PaginateOperator) plan);
var serialized = serialize(context);
return context.serializedSuccessfully ? new Cursor(CURSOR_PREFIX + serialized) : Cursor.None;
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(out);
gzip.write(str.getBytes());
gzip.close();
return HashCode.fromBytes(out.toByteArray()).toString();
return Cursor.None;
}

/**
* Decompresses a query plan that was compress with {@link PaginatedPlanCache#compress}.
* @param input compressed query plan
* @return decompressed string
* Serializes and compresses the object.
* @param object The object.
* @return Encoded binary data.
*/
String decompress(String input) throws IOException {
if (input == null || input.length() == 0) {
return "";
protected String serialize(Serializable object) {
try {
ByteArrayOutputStream output = new ByteArrayOutputStream();
ObjectOutputStream objectOutput = new ObjectOutputStream(output);
objectOutput.writeObject(object);
objectOutput.flush();

ByteArrayOutputStream out = new ByteArrayOutputStream();
// GZIP provides 35-45%, lzma from apache commons-compress has few % better compression
GZIPOutputStream gzip = new GZIPOutputStream(out) { {
this.def.setLevel(Deflater.BEST_COMPRESSION);
} };
gzip.write(output.toByteArray());
gzip.close();

return HashCode.fromBytes(out.toByteArray()).toString();
} catch (IOException e) {
throw new IllegalStateException("Failed to serialize: " + object, e);
}
GZIPInputStream gzip = new GZIPInputStream(new ByteArrayInputStream(
HashCode.fromString(input).asBytes()));
return new String(gzip.readAllBytes());
}

/**
* Parse `NamedExpression`s from cursor.
* @param listToFill List to fill with data.
* @param cursor Cursor to parse.
* @return Remaining part of the cursor.
* Decompresses and deserializes the binary data.
* @param code Encoded binary data.
* @return An object.
*/
private String parseNamedExpressions(List<NamedExpression> listToFill, String cursor) {
var serializer = new DefaultExpressionSerializer();
if (cursor.startsWith(")")) { //empty list
return cursor.substring(cursor.indexOf(',') + 1);
}
while (!cursor.startsWith("(")) {
listToFill.add((NamedExpression)
serializer.deserialize(cursor.substring(0,
Math.min(cursor.indexOf(','), cursor.indexOf(')')))));
cursor = cursor.substring(cursor.indexOf(',') + 1);
protected Serializable deserialize(String code) {
try {
GZIPInputStream gzip = new GZIPInputStream(
new ByteArrayInputStream(HashCode.fromString(code).asBytes()));
ObjectInputStream objectInput = new CursorDeserializationStream(
new ByteArrayInputStream(gzip.readAllBytes()));
return (Serializable) objectInput.readObject();
} catch (Exception e) {
throw new IllegalStateException("Failed to deserialize object", e);
}
return cursor;
}

/**
* Converts a cursor to a physical plan tree.
*/
* Converts a cursor to a physical plan tree.
*/
public PhysicalPlan convertToPlan(String cursor) {
if (!cursor.startsWith(CURSOR_PREFIX)) {
throw new UnsupportedOperationException("Unsupported cursor");
}
try {
cursor = cursor.substring(CURSOR_PREFIX.length());
cursor = decompress(cursor);

// TODO Parse with ANTLR or serialize as JSON/XML
if (!cursor.startsWith("(Paginate,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
// TODO add checks for > 0
cursor = cursor.substring(cursor.indexOf(',') + 1);
final int currentPageIndex = Integer.parseInt(cursor, 0, cursor.indexOf(','), 10);

cursor = cursor.substring(cursor.indexOf(',') + 1);
final int pageSize = Integer.parseInt(cursor, 0, cursor.indexOf(','), 10);

cursor = cursor.substring(cursor.indexOf(',') + 1);
if (!cursor.startsWith("(Project,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
cursor = cursor.substring(cursor.indexOf(',') + 1);
if (!cursor.startsWith("(namedParseExpressions,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}

cursor = cursor.substring(cursor.indexOf(',') + 1);
List<NamedExpression> namedParseExpressions = new ArrayList<>();
cursor = parseNamedExpressions(namedParseExpressions, cursor);

List<NamedExpression> projectList = new ArrayList<>();
if (!cursor.startsWith("(projectList,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
cursor = cursor.substring(cursor.indexOf(',') + 1);
cursor = parseNamedExpressions(projectList, cursor);

if (!cursor.startsWith("(OpenSearchPagedIndexScan,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
cursor = cursor.substring(cursor.indexOf(',') + 1);
var indexName = cursor.substring(0, cursor.indexOf(','));
cursor = cursor.substring(cursor.indexOf(',') + 1);
var scrollId = cursor.substring(0, cursor.indexOf(')'));
TableScanOperator scan = storageEngine.getTableScan(indexName, scrollId);

return new PaginateOperator(new ProjectOperator(scan, projectList, namedParseExpressions),
pageSize, currentPageIndex);
return ((SerializationContext) deserialize(cursor.substring(CURSOR_PREFIX.length())))
.getPlan();
} catch (Exception e) {
throw new UnsupportedOperationException("Unsupported cursor", e);
}
}

/**
* This function is used in testing only, to get access to {@link CursorDeserializationStream}.
*/
protected CursorDeserializationStream getCursorDeserializationStream(InputStream in)
throws IOException {
return new CursorDeserializationStream(in);
}

class CursorDeserializationStream extends ObjectInputStream {
public CursorDeserializationStream(InputStream in) throws IOException {
super(in);
}

@Override
protected Object resolveObject(Object obj) throws IOException {
return obj.equals("engine") ? engine : obj;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import org.apache.commons.lang3.NotImplementedException;
import org.opensearch.sql.storage.StorageEngine;

/**
* All instances of PhysicalPlan which needs to be serialized (in cursor feature) should
* override all given here methods.
* This class can't implement Externalizable, because deserialization of Externalizable objects
* works the following way:
* 1. A new object created with no-arg constructor (no PhysicalPlan has it)
* 2. Object loads data from the stream.
* {@link Externalizable} interface was split into two pars: serialization is kept with
* {@link #writeExternal}, but deserialization is provided by {@link PlanLoader}.
*/
public abstract class SerializablePlan {

// Copied from Externalizable
/**
* Each plan which supports serialization should dump itself into the stream and go recursive.
* It is good to create and dump a {@link PlanLoader} here as well. See usage samples.
* <pre>{@code
* out.writeSomething(data);
* for (var plan : getChild()) {
* plan.getPlanForSerialization().writeExternal(out);
* }
* }</pre>
*/
public boolean writeExternal(ObjectOutput out) throws IOException {
throw new NotImplementedException();
}

/**
* Override to return child or delegated plan, so parent plan should skip this one
* for serialization, but it should try to serialize grandchild plan.
* Imagine plan structure like this
* A -> this
* `- B -> child
* `- C -> this
* In that case only plans A and C should be attempted to serialize.
* It is needed to skip a `ResourceMonitorPlan` instance only, actually.
* @return Next plan for serialization.
*/
public SerializablePlan getPlanForSerialization() {
return this;
}

/**
* Each plan should serialize an instance of this function.
* The function deserializes and creates a new instance of that plan type.
* A loader of a plan X could be defined only in scope of X, because only X
* knows how to create a new X.
* Deserialization should match with serialization given in {@link #writeExternal}.
*/
@FunctionalInterface
public interface PlanLoader extends Serializable {
SerializablePlan apply(ObjectInput in, StorageEngine engine)
throws IOException, ClassNotFoundException;
}
}
Loading