|
21 | 21 | import java.io.DataInput;
|
22 | 22 | import java.io.DataInputStream;
|
23 | 23 | import java.io.DataOutput;
|
| 24 | +import java.io.DataOutputStream; |
24 | 25 | import java.io.IOException;
|
25 | 26 | import java.lang.reflect.InvocationTargetException;
|
26 | 27 | import java.lang.reflect.Method;
|
@@ -202,9 +203,12 @@ public CellWritableComparable(Cell kv) {
|
202 | 203 |
|
203 | 204 | @Override
|
204 | 205 | public void write(DataOutput out) throws IOException {
|
205 |
| - out.writeInt(PrivateCellUtil.estimatedSerializedSizeOfKey(kv)); |
206 |
| - out.writeInt(0); |
207 |
| - PrivateCellUtil.writeFlatKey(kv, out); |
| 206 | + int keyLen = CellUtil.estimatedSerializedSizeOfKey(kv); |
| 207 | + int valueLen = 0; // We avoid writing value here. So just serialize as if an empty value. |
| 208 | + out.writeInt(keyLen + valueLen + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE); |
| 209 | + out.writeInt(keyLen); |
| 210 | + out.writeInt(valueLen); |
| 211 | + CellUtil.writeFlatKey(kv, (DataOutputStream) out); |
208 | 212 | }
|
209 | 213 |
|
210 | 214 | @Override
|
@@ -413,7 +417,7 @@ public void map(ImmutableBytesWritable row, Result value, Context context) throw
|
413 | 417 | // skip if we filtered it out
|
414 | 418 | if (kv == null) continue;
|
415 | 419 | Cell ret = convertKv(kv, cfRenameMap);
|
416 |
| - context.write(new CellWritableComparable(ret), ret); |
| 420 | + context.write(new CellWritableComparable(ret), new MapReduceExtendedCell(ret)); |
417 | 421 | }
|
418 | 422 | }
|
419 | 423 | } catch (InterruptedException e) {
|
|
0 commit comments