|
9 | 9 | import java.io.ByteArrayInputStream; |
10 | 10 | import java.io.ByteArrayOutputStream; |
11 | 11 | import java.io.IOException; |
12 | | -import java.util.ArrayList; |
13 | | -import java.util.List; |
| 12 | +import java.io.InputStream; |
| 13 | +import java.io.NotSerializableException; |
| 14 | +import java.io.ObjectInputStream; |
| 15 | +import java.io.ObjectOutputStream; |
| 16 | +import java.io.Serializable; |
| 17 | +import java.util.zip.Deflater; |
14 | 18 | import java.util.zip.GZIPInputStream; |
15 | 19 | import java.util.zip.GZIPOutputStream; |
16 | 20 | import lombok.RequiredArgsConstructor; |
17 | 21 | import org.opensearch.sql.ast.tree.UnresolvedPlan; |
18 | | -import org.opensearch.sql.expression.NamedExpression; |
19 | | -import org.opensearch.sql.expression.serialization.DefaultExpressionSerializer; |
20 | | -import org.opensearch.sql.planner.physical.PaginateOperator; |
| 22 | +import org.opensearch.sql.exception.NoCursorException; |
| 23 | +import org.opensearch.sql.planner.SerializablePlan; |
21 | 24 | import org.opensearch.sql.planner.physical.PhysicalPlan; |
22 | | -import org.opensearch.sql.planner.physical.ProjectOperator; |
23 | 25 | import org.opensearch.sql.storage.StorageEngine; |
24 | | -import org.opensearch.sql.storage.TableScanOperator; |
25 | 26 |
|
26 | 27 | /** |
27 | 28 | * This class is entry point to paged requests. It is responsible to cursor serialization |
|
30 | 31 | @RequiredArgsConstructor |
31 | 32 | public class PaginatedPlanCache { |
32 | 33 | public static final String CURSOR_PREFIX = "n:"; |
33 | | - private final StorageEngine storageEngine; |
| 34 | + |
| 35 | + private final StorageEngine engine; |
34 | 36 |
|
35 | 37 | public boolean canConvertToCursor(UnresolvedPlan plan) { |
36 | 38 | return plan.accept(new CanPaginateVisitor(), null); |
37 | 39 | } |
38 | 40 |
|
39 | 41 | /** |
40 | | - * Converts a physical plan tree to a cursor. May cache plan related data somewhere. |
| 42 | + * Converts a physical plan tree to a cursor. |
41 | 43 | */ |
42 | | - public Cursor convertToCursor(PhysicalPlan plan) throws IOException { |
43 | | - if (plan instanceof PaginateOperator) { |
44 | | - var cursor = plan.toCursor(); |
45 | | - if (cursor == null) { |
46 | | - return Cursor.None; |
47 | | - } |
48 | | - var raw = CURSOR_PREFIX + compress(cursor); |
49 | | - return new Cursor(raw.getBytes()); |
| 44 | + public Cursor convertToCursor(PhysicalPlan plan) { |
| 45 | + try { |
| 46 | + return new Cursor(CURSOR_PREFIX |
| 47 | + + serialize(((SerializablePlan) plan).getPlanForSerialization())); |
| 48 | + // ClassCastException thrown when a plan in the tree doesn't implement SerializablePlan |
| 49 | + } catch (NotSerializableException | ClassCastException | NoCursorException e) { |
| 50 | + return Cursor.None; |
50 | 51 | } |
51 | | - return Cursor.None; |
52 | 52 | } |
53 | 53 |
|
54 | 54 | /** |
55 | | - * Compress serialized query plan. |
56 | | - * @param str string representing a query plan |
57 | | - * @return str compressed with gzip. |
| 55 | + * Serializes and compresses the object. |
| 56 | + * @param object The object. |
| 57 | + * @return Encoded binary data. |
58 | 58 | */ |
59 | | - String compress(String str) throws IOException { |
60 | | - if (str == null || str.length() == 0) { |
61 | | - return ""; |
| 59 | + protected String serialize(Serializable object) throws NotSerializableException { |
| 60 | + try { |
| 61 | + ByteArrayOutputStream output = new ByteArrayOutputStream(); |
| 62 | + ObjectOutputStream objectOutput = new ObjectOutputStream(output); |
| 63 | + objectOutput.writeObject(object); |
| 64 | + objectOutput.flush(); |
| 65 | + |
| 66 | + ByteArrayOutputStream out = new ByteArrayOutputStream(); |
| 67 | + // GZIP provides 35-45%, lzma from apache commons-compress has few % better compression |
| 68 | + GZIPOutputStream gzip = new GZIPOutputStream(out) { { |
| 69 | + this.def.setLevel(Deflater.BEST_COMPRESSION); |
| 70 | + } }; |
| 71 | + gzip.write(output.toByteArray()); |
| 72 | + gzip.close(); |
| 73 | + |
| 74 | + return HashCode.fromBytes(out.toByteArray()).toString(); |
| 75 | + } catch (NotSerializableException e) { |
| 76 | + throw e; |
| 77 | + } catch (IOException e) { |
| 78 | + throw new IllegalStateException("Failed to serialize: " + object, e); |
62 | 79 | } |
63 | | - ByteArrayOutputStream out = new ByteArrayOutputStream(); |
64 | | - GZIPOutputStream gzip = new GZIPOutputStream(out); |
65 | | - gzip.write(str.getBytes()); |
66 | | - gzip.close(); |
67 | | - return HashCode.fromBytes(out.toByteArray()).toString(); |
68 | 80 | } |
69 | 81 |
|
70 | 82 | /** |
71 | | - * Decompresses a query plan that was compress with {@link PaginatedPlanCache#compress}. |
72 | | - * @param input compressed query plan |
73 | | - * @return decompressed string |
| 83 | + * Decompresses and deserializes the binary data. |
| 84 | + * @param code Encoded binary data. |
| 85 | + * @return An object. |
74 | 86 | */ |
75 | | - String decompress(String input) throws IOException { |
76 | | - if (input == null || input.length() == 0) { |
77 | | - return ""; |
| 87 | + protected Serializable deserialize(String code) { |
| 88 | + try { |
| 89 | + GZIPInputStream gzip = new GZIPInputStream( |
| 90 | + new ByteArrayInputStream(HashCode.fromString(code).asBytes())); |
| 91 | + ObjectInputStream objectInput = new CursorDeserializationStream( |
| 92 | + new ByteArrayInputStream(gzip.readAllBytes())); |
| 93 | + return (Serializable) objectInput.readObject(); |
| 94 | + } catch (Exception e) { |
| 95 | + throw new IllegalStateException("Failed to deserialize object", e); |
78 | 96 | } |
79 | | - GZIPInputStream gzip = new GZIPInputStream(new ByteArrayInputStream( |
80 | | - HashCode.fromString(input).asBytes())); |
81 | | - return new String(gzip.readAllBytes()); |
82 | 97 | } |
83 | 98 |
|
84 | 99 | /** |
85 | | - * Parse `NamedExpression`s from cursor. |
86 | | - * @param listToFill List to fill with data. |
87 | | - * @param cursor Cursor to parse. |
88 | | - * @return Remaining part of the cursor. |
| 100 | + * Converts a cursor to a physical plan tree. |
89 | 101 | */ |
90 | | - private String parseNamedExpressions(List<NamedExpression> listToFill, String cursor) { |
91 | | - var serializer = new DefaultExpressionSerializer(); |
92 | | - if (cursor.startsWith(")")) { //empty list |
93 | | - return cursor.substring(cursor.indexOf(',') + 1); |
94 | | - } |
95 | | - while (!cursor.startsWith("(")) { |
96 | | - listToFill.add((NamedExpression) |
97 | | - serializer.deserialize(cursor.substring(0, |
98 | | - Math.min(cursor.indexOf(','), cursor.indexOf(')'))))); |
99 | | - cursor = cursor.substring(cursor.indexOf(',') + 1); |
100 | | - } |
101 | | - return cursor; |
102 | | - } |
103 | | - |
104 | | - /** |
105 | | - * Converts a cursor to a physical plan tree. |
106 | | - */ |
107 | 102 | public PhysicalPlan convertToPlan(String cursor) { |
108 | 103 | if (!cursor.startsWith(CURSOR_PREFIX)) { |
109 | 104 | throw new UnsupportedOperationException("Unsupported cursor"); |
110 | 105 | } |
111 | 106 | try { |
112 | | - cursor = cursor.substring(CURSOR_PREFIX.length()); |
113 | | - cursor = decompress(cursor); |
114 | | - |
115 | | - // TODO Parse with ANTLR or serialize as JSON/XML |
116 | | - if (!cursor.startsWith("(Paginate,")) { |
117 | | - throw new UnsupportedOperationException("Unsupported cursor"); |
118 | | - } |
119 | | - // TODO add checks for > 0 |
120 | | - cursor = cursor.substring(cursor.indexOf(',') + 1); |
121 | | - final int currentPageIndex = Integer.parseInt(cursor, 0, cursor.indexOf(','), 10); |
122 | | - |
123 | | - cursor = cursor.substring(cursor.indexOf(',') + 1); |
124 | | - final int pageSize = Integer.parseInt(cursor, 0, cursor.indexOf(','), 10); |
125 | | - |
126 | | - cursor = cursor.substring(cursor.indexOf(',') + 1); |
127 | | - if (!cursor.startsWith("(Project,")) { |
128 | | - throw new UnsupportedOperationException("Unsupported cursor"); |
129 | | - } |
130 | | - cursor = cursor.substring(cursor.indexOf(',') + 1); |
131 | | - if (!cursor.startsWith("(namedParseExpressions,")) { |
132 | | - throw new UnsupportedOperationException("Unsupported cursor"); |
133 | | - } |
134 | | - |
135 | | - cursor = cursor.substring(cursor.indexOf(',') + 1); |
136 | | - List<NamedExpression> namedParseExpressions = new ArrayList<>(); |
137 | | - cursor = parseNamedExpressions(namedParseExpressions, cursor); |
| 107 | + return (PhysicalPlan) deserialize(cursor.substring(CURSOR_PREFIX.length())); |
| 108 | + } catch (Exception e) { |
| 109 | + throw new UnsupportedOperationException("Unsupported cursor", e); |
| 110 | + } |
| 111 | + } |
138 | 112 |
|
139 | | - List<NamedExpression> projectList = new ArrayList<>(); |
140 | | - if (!cursor.startsWith("(projectList,")) { |
141 | | - throw new UnsupportedOperationException("Unsupported cursor"); |
142 | | - } |
143 | | - cursor = cursor.substring(cursor.indexOf(',') + 1); |
144 | | - cursor = parseNamedExpressions(projectList, cursor); |
| 113 | + /** |
| 114 | + * This function is used in testing only, to get access to {@link CursorDeserializationStream}. |
| 115 | + */ |
| 116 | + public CursorDeserializationStream getCursorDeserializationStream(InputStream in) |
| 117 | + throws IOException { |
| 118 | + return new CursorDeserializationStream(in); |
| 119 | + } |
145 | 120 |
|
146 | | - if (!cursor.startsWith("(OpenSearchPagedIndexScan,")) { |
147 | | - throw new UnsupportedOperationException("Unsupported cursor"); |
148 | | - } |
149 | | - cursor = cursor.substring(cursor.indexOf(',') + 1); |
150 | | - var indexName = cursor.substring(0, cursor.indexOf(',')); |
151 | | - cursor = cursor.substring(cursor.indexOf(',') + 1); |
152 | | - var scrollId = cursor.substring(0, cursor.indexOf(')')); |
153 | | - TableScanOperator scan = storageEngine.getTableScan(indexName, scrollId); |
| 121 | + public class CursorDeserializationStream extends ObjectInputStream { |
| 122 | + public CursorDeserializationStream(InputStream in) throws IOException { |
| 123 | + super(in); |
| 124 | + } |
154 | 125 |
|
155 | | - return new PaginateOperator(new ProjectOperator(scan, projectList, namedParseExpressions), |
156 | | - pageSize, currentPageIndex); |
157 | | - } catch (Exception e) { |
158 | | - throw new UnsupportedOperationException("Unsupported cursor", e); |
| 126 | + @Override |
| 127 | + public Object resolveObject(Object obj) throws IOException { |
| 128 | + return obj.equals("engine") ? engine : obj; |
159 | 129 | } |
160 | 130 | } |
161 | 131 | } |
0 commit comments