Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public UnsafeRow appendRow(Object kbase, long koff, int klen,

keyRowId = numRows;
keyRow.pointTo(base, recordOffset, klen);
valueRow.pointTo(base, recordOffset + klen, vlen + 4);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering why we did this before. Was it a mistake?

Copy link
Member Author

@kiszk kiszk Jul 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same question.
@sameeragarwal had similar question one year ago. However, no response from @ooq

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recall it being intentional.
See discussion here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ooq thank you for pointing out interesting discussion.
This discussion seems to make sense for page management. The question of @cloud-fan and me is whether valueRow uses only vlen. I think that +4 is for page management.

valueRow.pointTo(base, recordOffset + klen, vlen);
numRows++;
return valueRow;
}
Expand Down Expand Up @@ -95,7 +95,7 @@ protected UnsafeRow getValueFromKey(int rowId) {
getKeyRow(rowId);
}
assert(rowId >= 0);
valueRow.pointTo(base, keyRow.getBaseOffset() + klen, vlen + 4);
valueRow.pointTo(base, keyRow.getBaseOffset() + klen, vlen);
return valueRow;
}

Expand Down Expand Up @@ -131,7 +131,7 @@ public boolean next() {
}

key.pointTo(base, offsetInPage, klen);
value.pointTo(base, offsetInPage + klen, vlen + 4);
value.pointTo(base, offsetInPage + klen, vlen);

offsetInPage += recordLength;
recordsInPage -= 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ public UnsafeRow() {}
*/
public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) {
assert numFields >= 0 : "numFields (" + numFields + ") should >= 0";
assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we only need the assertion here, in pointTo, and in setTotalSize. Other places are just checking length for existing unsafe rows, which is unnecessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, done.

this.baseObject = baseObject;
this.baseOffset = baseOffset;
this.sizeInBytes = sizeInBytes;
Expand All @@ -183,6 +184,7 @@ public void pointTo(byte[] buf, int sizeInBytes) {
}

public void setTotalSize(int sizeInBytes) {
assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8";
this.sizeInBytes = sizeInBytes;
}

Expand Down Expand Up @@ -538,6 +540,7 @@ public void copyFrom(UnsafeRow row) {
row.baseObject, row.baseOffset, this.baseObject, this.baseOffset, row.sizeInBytes);
// update the sizeInBytes.
this.sizeInBytes = row.sizeInBytes;
assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8";
}

/**
Expand Down Expand Up @@ -664,6 +667,7 @@ public void writeExternal(ObjectOutput out) throws IOException {
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
this.baseOffset = BYTE_ARRAY_OFFSET;
this.sizeInBytes = in.readInt();
assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8";
this.numFields = in.readInt();
this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
this.baseObject = new byte[sizeInBytes];
Expand All @@ -682,6 +686,7 @@ public void write(Kryo kryo, Output out) {
public void read(Kryo kryo, Input in) {
this.baseOffset = BYTE_ARRAY_OFFSET;
this.sizeInBytes = in.readInt();
assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8";
this.numFields = in.readInt();
this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
this.baseObject = new byte[sizeInBytes];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public UnsafeRow appendRow(Object kbase, long koff, int klen,

keyRowId = numRows;
keyRow.pointTo(base, recordOffset + 8, klen);
valueRow.pointTo(base, recordOffset + 8 + klen, vlen + 4);
valueRow.pointTo(base, recordOffset + 8 + klen, vlen);
numRows++;
return valueRow;
}
Expand Down Expand Up @@ -102,7 +102,7 @@ public UnsafeRow getValueFromKey(int rowId) {
long offset = keyRow.getBaseOffset();
int klen = keyRow.getSizeInBytes();
int vlen = Platform.getInt(base, offset - 8) - klen - 4;
valueRow.pointTo(base, offset + klen, vlen + 4);
valueRow.pointTo(base, offset + klen, vlen);
return valueRow;
}

Expand Down Expand Up @@ -146,7 +146,7 @@ public boolean next() {
currentvlen = totalLength - currentklen;

key.pointTo(base, offsetInPage + 8, currentklen);
value.pointTo(base, offsetInPage + 8 + currentklen, currentvlen + 4);
value.pointTo(base, offsetInPage + 8 + currentklen, currentvlen);

offsetInPage += 8 + totalLength + 8;
recordsInPage -= 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,10 @@ private static final class RowComparator extends RecordComparator {

@Override
public int compare(Object baseObj1, long baseOff1, Object baseObj2, long baseOff2) {
// TODO: Why are the sizes -1?
row1.pointTo(baseObj1, baseOff1, -1);
row2.pointTo(baseObj2, baseOff2, -1);
// Note that since ordering doesn't need the total length of the record, we just pass 0
// into the row.
row1.pointTo(baseObj1, baseOff1, 0);
row2.pointTo(baseObj2, baseOff2, 0);
return ordering.compare(row1, row2);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,10 @@ private static final class KVComparator extends RecordComparator {

@Override
public int compare(Object baseObj1, long baseOff1, Object baseObj2, long baseOff2) {
// Note that since ordering doesn't need the total length of the record, we just pass -1
// Note that since ordering doesn't need the total length of the record, we just pass 0
// into the row.
row1.pointTo(baseObj1, baseOff1 + 4, -1);
row2.pointTo(baseObj2, baseOff2 + 4, -1);
row1.pointTo(baseObj1, baseOff1 + 4, 0);
row2.pointTo(baseObj2, baseOff2 + 4, 0);
return ordering.compare(row1, row2);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,20 +350,24 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
throw new IOException(
s"Error reading delta file $fileToRead of $this: key size cannot be $keySize")
} else {
val keyRowBuffer = new Array[Byte](keySize)
// If key size in an existing file is not a multiple of 8, round it to multiple of 8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can round. Assume the actual length of an unsafe row is 8, and previously we will append 4 bytes and have an unsafe row with 12 bytes, and save it to checkpoint. So here, when we reading old checkppint, we need to read 12 bytes, and set the length to 8.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, we only need to do this for value, not key.

val keyAllocationSize = ((keySize + 7) / 8) * 8
val keyRowBuffer = new Array[Byte](keyAllocationSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so RowBasedKeyValueBatch is the format for state store? cc @zsxwing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that RowBasedKeyValueBatch is not used for state store in HDFSBackedStateStoreProvider for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we have this logic? what do we write into the state store?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is why I added this logic.
I believe that this failure occurs since the checkpoint file does not have a value record whose size is not a multiple of 8 (i.e. 28). Thus, I always round up its size to a multiple of 8.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all unsafe rows have the size of a multiple of 8, except RowBasedKeyValueBatch in the previous code. So I'm wondering how the state store can have unsafe rows with wrong size, does state store use RowBasedKeyValueBatch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. If we are having to regenerate, then we are breaking something. And we must not since we have been guaranteeing backward compatibility. If @cloud-fan claims that all unsafe rows except RowBasedKeyValueBatch should have a size multiple of 8, then we need to understand what is going on; why does reading the checkpoint files fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok we need to figure out what's going on, seems there are other places we may have wrong size in UnsafeRow

Copy link
Member Author

@kiszk kiszk Jul 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to exactly know how these check point files were generated. Were these files generated by a method that @kunalkhamar? Or, were these files generated by another tool?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a program, which @kunalkhamar pointed out at here, as a new test case to check whether all of the size in UnsafeRow are correct or not?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan @zsxwing @tdas @kunalkhamar
I misunderstood. To store a state, HDFSBackedStateStoreProvider is used.

I added a new test suite to check HDFSBackedStateStoreProvider for storing and restoring state, as @kunalkhamar suggested here.

Do you think it makes sense?

ByteStreams.readFully(input, keyRowBuffer, 0, keySize)

val keyRow = new UnsafeRow(keySchema.fields.length)
keyRow.pointTo(keyRowBuffer, keySize)
keyRow.pointTo(keyRowBuffer, keyAllocationSize)

val valueSize = input.readInt()
if (valueSize < 0) {
map.remove(keyRow)
} else {
val valueRowBuffer = new Array[Byte](valueSize)
// If value size in an existing file is not a multiple of 8, round it to multiple of 8
val valueAllocationSize = ((valueSize + 7) / 8) * 8
val valueRowBuffer = new Array[Byte](valueAllocationSize)
ByteStreams.readFully(input, valueRowBuffer, 0, valueSize)
val valueRow = new UnsafeRow(valueSchema.fields.length)
valueRow.pointTo(valueRowBuffer, valueSize)
valueRow.pointTo(valueRowBuffer, valueAllocationSize)
map.put(keyRow, valueRow)
}
}
Expand Down Expand Up @@ -413,21 +417,25 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
throw new IOException(
s"Error reading snapshot file $fileToRead of $this: key size cannot be $keySize")
} else {
val keyRowBuffer = new Array[Byte](keySize)
// If key size in an existing file is not a multiple of 8, round it to multiple of 8
val keyAllocationSize = ((keySize + 7) / 8) * 8
val keyRowBuffer = new Array[Byte](keyAllocationSize)
ByteStreams.readFully(input, keyRowBuffer, 0, keySize)

val keyRow = new UnsafeRow(keySchema.fields.length)
keyRow.pointTo(keyRowBuffer, keySize)
keyRow.pointTo(keyRowBuffer, keyAllocationSize)

val valueSize = input.readInt()
if (valueSize < 0) {
throw new IOException(
s"Error reading snapshot file $fileToRead of $this: value size cannot be $valueSize")
} else {
val valueRowBuffer = new Array[Byte](valueSize)
// If value size in an existing file is not a multiple of 8, round it to multiple of 8
val valueAllocationSize = ((valueSize + 7) / 8) * 8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be made into utility function inside UnsafeRow? Seems like this sort of adjustment should not be a concerns of external users of UnsafeRow.

Copy link
Contributor

@tdas tdas Jul 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, how about something like this

class UnsafeRow { 
  def readFromStream(byteStream: InputStream, bytes: Int): UnsafeRow = ???
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan what do you think?

val valueRowBuffer = new Array[Byte](valueAllocationSize)
ByteStreams.readFully(input, valueRowBuffer, 0, valueSize)
val valueRow = new UnsafeRow(valueSchema.fields.length)
valueRow.pointTo(valueRowBuffer, valueSize)
valueRow.pointTo(valueRowBuffer, valueAllocationSize)
map.put(keyRow, valueRow)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,61 @@ class StreamSuite extends StreamTest {
CheckAnswer((1, 2), (2, 2), (3, 2)))
}

testQuietly("store to and recover from a checkpoint") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think this test is needed. There are existing tests that already test reading from checkpoints, etc. The critical test was reading 2.1 checkpoint files which seems to be passing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test also checks whether the length of checkpoints is a multiple of 8 or not. Does it make sense? Or, is there any other test suite to check the length?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not really check it explicitly .. does it? It tests it implicitly by creating checkpoints and then restarting. There are other tests that already do the same thing. E.g. This test is effectively same as
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala#L88

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you are right. This test currently relies on internal assert at Unsafe.pointTo for checking a multiple of 8

Copy link
Contributor

@cloud-fan cloud-fan Jul 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we remove this test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will remove this since the test that @tdas pointed out causes the same assertion failure as my test case expected.

val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath

def query(data: MemoryStream[Int], checkpointDir: String, queryName: String):
DataStreamWriter[Row] = {
data.toDF
.groupBy($"value")
.agg(count("*"))
.writeStream
.outputMode("complete")
.option("checkpointLocation", checkpointDir)
.format("memory")
.queryName(queryName)
}

withSQLConf(
SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
var writeQuery: StreamingQuery = null
try {
val data = MemoryStream[Int]
writeQuery = query(data, checkpointDir, "write").start()

data.addData(1, 2, 3, 4)
writeQuery.processAllAvailable()
data.addData(3, 4, 5, 6)
writeQuery.processAllAvailable()
data.addData(5, 6, 7, 8)
writeQuery.processAllAvailable()
} finally {
assert(writeQuery != null)
writeQuery.stop()
}

var restartQuery: StreamingQuery = null
try {
val data = MemoryStream[Int]
data.addData(1, 2, 3, 4)
data.addData(3, 4, 5, 6)
data.addData(5, 6, 7, 8)

restartQuery = query(data, checkpointDir, "counts").start()
restartQuery.processAllAvailable()
data.addData(9)
restartQuery.processAllAvailable()

QueryTest.checkAnswer(spark.table("counts").toDF,
Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Row("9", 1) :: Nil)
} finally {
assert(restartQuery != null)
restartQuery.stop()
}
}
}

testQuietly("recover from a Spark v2.1 checkpoint") {
var inputData: MemoryStream[Int] = null
var query: DataStreamWriter[Row] = null
Expand Down