Skip to content

Commit 5d55cef

Browse files
committed
Add skeleton for Row implementation.
1 parent f03e9c1 commit 5d55cef

File tree

2 files changed

+320
-0
lines changed

2 files changed

+320
-0
lines changed
Lines changed: 308 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,308 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.expressions;
19+
20+
21+
import org.apache.spark.sql.Row;
22+
import org.apache.spark.sql.types.DataType;
23+
import org.apache.spark.sql.types.StructType;
24+
import org.apache.spark.unsafe.PlatformDependent;
25+
import org.apache.spark.unsafe.bitset.BitSetMethods;
26+
import scala.collection.Map;
27+
import scala.collection.Seq;
28+
29+
import javax.annotation.Nullable;
30+
import java.math.BigDecimal;
31+
import java.sql.Date;
32+
import java.util.List;
33+
34+
35+
// TODO: pick a better name for this class, since this is potentially confusing.
36+
37+
/**
38+
* An Unsafe implementation of Row which is backed by raw memory instead of Java objets.
39+
*
40+
* Each tuple has three parts: [null bit set] [values] [variable length portion]
41+
*
42+
* The bit set is used for null tracking and is aligned to 8-byte word boundaries. It stores
43+
* one bit per field.
44+
*
45+
* In the `values` region, we store one 8-byte word per field. For fields that hold fixed-length
46+
* primitive types, such as long, double, or int, we store the value directly in the word. For
47+
* fields with non-primitive or variable-length values, we store a relative offset (w.r.t. the
48+
* base address of the row) that points to the beginning of the variable-length field.
49+
*/
50+
public final class UnsafeRow implements MutableRow {
51+
52+
private Object baseObject;
53+
private long baseOffset;
54+
private int numFields;
55+
/** The width of the null tracking bit set, in bytes */
56+
private int bitSetWidthInBytes;
57+
@Nullable
58+
private StructType schema;
59+
60+
private long getFieldOffset(int ordinal) {
61+
return baseOffset + bitSetWidthInBytes + ordinal * 8;
62+
}
63+
64+
public UnsafeRow() { }
65+
66+
public void set(Object baseObject, long baseOffset, int numFields, StructType schema) {
67+
assert numFields >= 0 : "numFields should >= 0";
68+
assert schema == null || schema.fields().length == numFields;
69+
this.bitSetWidthInBytes = ((numFields / 64) + ((numFields % 64 == 0 ? 0 : 1))) * 8;
70+
this.baseObject = baseObject;
71+
this.baseOffset = baseOffset;
72+
this.numFields = numFields;
73+
this.schema = schema;
74+
}
75+
76+
private void assertIndexIsValid(int index) {
77+
assert index >= 0 : "index (" + index + ") should >= 0";
78+
assert index < numFields : "index (" + index + ") should <= " + numFields;
79+
}
80+
81+
@Override
82+
public void setNullAt(int i) {
83+
assertIndexIsValid(i);
84+
BitSetMethods.set(baseObject, baseOffset, i);
85+
}
86+
87+
@Override
88+
public void update(int ordinal, Object value) {
89+
assert schema != null : "schema cannot be null when calling the generic update()";
90+
final DataType type = schema.fields()[ordinal].dataType();
91+
// TODO: match based on the type, then set. This will be slow.
92+
throw new UnsupportedOperationException();
93+
}
94+
95+
@Override
96+
public void setInt(int ordinal, int value) {
97+
assertIndexIsValid(ordinal);
98+
PlatformDependent.UNSAFE.putInt(baseObject, getFieldOffset(ordinal), value);
99+
}
100+
101+
@Override
102+
public void setLong(int ordinal, long value) {
103+
assertIndexIsValid(ordinal);
104+
PlatformDependent.UNSAFE.putLong(baseObject, getFieldOffset(ordinal), value);
105+
}
106+
107+
@Override
108+
public void setDouble(int ordinal, double value) {
109+
assertIndexIsValid(ordinal);
110+
PlatformDependent.UNSAFE.putDouble(baseObject, getFieldOffset(ordinal), value);
111+
}
112+
113+
@Override
114+
public void setBoolean(int ordinal, boolean value) {
115+
assertIndexIsValid(ordinal);
116+
PlatformDependent.UNSAFE.putBoolean(baseObject, getFieldOffset(ordinal), value);
117+
}
118+
119+
@Override
120+
public void setShort(int ordinal, short value) {
121+
assertIndexIsValid(ordinal);
122+
PlatformDependent.UNSAFE.putShort(baseObject, getFieldOffset(ordinal), value);
123+
}
124+
125+
@Override
126+
public void setByte(int ordinal, byte value) {
127+
assertIndexIsValid(ordinal);
128+
PlatformDependent.UNSAFE.putByte(baseObject, getFieldOffset(ordinal), value);
129+
}
130+
131+
@Override
132+
public void setFloat(int ordinal, float value) {
133+
assertIndexIsValid(ordinal);
134+
PlatformDependent.UNSAFE.putFloat(baseObject, getFieldOffset(ordinal), value);
135+
}
136+
137+
@Override
138+
public void setString(int ordinal, String value) {
139+
// TODO: need to ensure that array has been suitably sized.
140+
throw new UnsupportedOperationException();
141+
}
142+
143+
@Override
144+
public int size() {
145+
return numFields;
146+
}
147+
148+
@Override
149+
public int length() {
150+
return size();
151+
}
152+
153+
@Override
154+
public StructType schema() {
155+
return schema;
156+
}
157+
158+
@Override
159+
public Object apply(int i) {
160+
return get(i);
161+
}
162+
163+
@Override
164+
public Object get(int i) {
165+
assertIndexIsValid(i);
166+
// TODO: dispatching based on field type
167+
throw new UnsupportedOperationException();
168+
}
169+
170+
@Override
171+
public boolean isNullAt(int i) {
172+
assertIndexIsValid(i);
173+
return BitSetMethods.isSet(baseObject, baseOffset, i);
174+
}
175+
176+
@Override
177+
public boolean getBoolean(int i) {
178+
assertIndexIsValid(i);
179+
return PlatformDependent.UNSAFE.getBoolean(baseObject, getFieldOffset(i));
180+
}
181+
182+
@Override
183+
public byte getByte(int i) {
184+
assertIndexIsValid(i);
185+
return PlatformDependent.UNSAFE.getByte(baseObject, getFieldOffset(i));
186+
}
187+
188+
@Override
189+
public short getShort(int i) {
190+
assertIndexIsValid(i);
191+
return PlatformDependent.UNSAFE.getShort(baseObject, getFieldOffset(i));
192+
}
193+
194+
@Override
195+
public int getInt(int i) {
196+
assertIndexIsValid(i);
197+
return PlatformDependent.UNSAFE.getInt(baseObject, getFieldOffset(i));
198+
}
199+
200+
@Override
201+
public long getLong(int i) {
202+
assertIndexIsValid(i);
203+
return PlatformDependent.UNSAFE.getLong(baseObject, getFieldOffset(i));
204+
}
205+
206+
@Override
207+
public float getFloat(int i) {
208+
assertIndexIsValid(i);
209+
return PlatformDependent.UNSAFE.getFloat(baseObject, getFieldOffset(i));
210+
}
211+
212+
@Override
213+
public double getDouble(int i) {
214+
assertIndexIsValid(i);
215+
return PlatformDependent.UNSAFE.getDouble(baseObject, getFieldOffset(i));
216+
}
217+
218+
@Override
219+
public String getString(int i) {
220+
assertIndexIsValid(i);
221+
// TODO
222+
223+
throw new UnsupportedOperationException();
224+
}
225+
226+
@Override
227+
public BigDecimal getDecimal(int i) {
228+
// TODO
229+
throw new UnsupportedOperationException();
230+
}
231+
232+
@Override
233+
public Date getDate(int i) {
234+
// TODO
235+
throw new UnsupportedOperationException();
236+
}
237+
238+
@Override
239+
public <T> Seq<T> getSeq(int i) {
240+
// TODO
241+
throw new UnsupportedOperationException();
242+
}
243+
244+
@Override
245+
public <T> List<T> getList(int i) {
246+
// TODO
247+
throw new UnsupportedOperationException();
248+
}
249+
250+
@Override
251+
public <K, V> Map<K, V> getMap(int i) {
252+
// TODO
253+
throw new UnsupportedOperationException();
254+
}
255+
256+
@Override
257+
public <K, V> java.util.Map<K, V> getJavaMap(int i) {
258+
// TODO
259+
throw new UnsupportedOperationException();
260+
}
261+
262+
@Override
263+
public Row getStruct(int i) {
264+
// TODO
265+
throw new UnsupportedOperationException();
266+
}
267+
268+
@Override
269+
public <T> T getAs(int i) {
270+
// TODO
271+
throw new UnsupportedOperationException();
272+
}
273+
274+
@Override
275+
public Row copy() {
276+
// TODO
277+
throw new UnsupportedOperationException();
278+
}
279+
280+
@Override
281+
public boolean anyNull() {
282+
return BitSetMethods.anySet(baseObject, baseOffset, bitSetWidthInBytes);
283+
}
284+
285+
@Override
286+
public Seq<Object> toSeq() {
287+
// TODO
288+
throw new UnsupportedOperationException();
289+
}
290+
291+
@Override
292+
public String mkString() {
293+
// TODO
294+
throw new UnsupportedOperationException();
295+
}
296+
297+
@Override
298+
public String mkString(String sep) {
299+
// TODO
300+
throw new UnsupportedOperationException();
301+
}
302+
303+
@Override
304+
public String mkString(String start, String sep, String end) {
305+
// TODO
306+
throw new UnsupportedOperationException();
307+
}
308+
}

unsafe/src/main/java/org/apache/spark/unsafe/bitset/BitSetMethods.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,18 @@ public static boolean isSet(Object baseObject, long baseOffset, long index) {
6969
return (word & mask) != 0;
7070
}
7171

72+
/**
73+
* Returns {@code true} if any bit is set.
74+
*/
75+
public static boolean anySet(Object baseObject, long baseOffset, long bitSetWidthInBytes) {
76+
for (int i = 0; i <= bitSetWidthInBytes; i++) {
77+
if (PlatformDependent.UNSAFE.getByte(baseObject, baseOffset + i) != 0) {
78+
return true;
79+
}
80+
}
81+
return false;
82+
}
83+
7284
/**
7385
* Returns the index of the first bit that is set to true that occurs on or after the
7486
* specified starting index. If no such bit exists then {@code -1} is returned.

0 commit comments

Comments
 (0)