Skip to content

Commit 5908276

Browse files
HBASE-25839 Bulk Import fails with java.io.IOException: Type mismatch in value from map
1 parent e5b04a3 commit 5908276

File tree

2 files changed

+47
-3
lines changed

2 files changed

+47
-3
lines changed

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,11 @@ public CellWritableComparable(Cell kv) {
202202

203203
@Override
204204
public void write(DataOutput out) throws IOException {
205-
out.writeInt(PrivateCellUtil.estimatedSerializedSizeOfKey(kv));
206-
out.writeInt(0);
205+
int keyLen = PrivateCellUtil.estimatedSerializedSizeOfKey(kv);
206+
int valueLen = 0; // We avoid writing value here. So just serialize as if an empty value.
207+
out.writeInt(keyLen + valueLen + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE);
208+
out.writeInt(keyLen);
209+
out.writeInt(valueLen);
207210
PrivateCellUtil.writeFlatKey(kv, out);
208211
}
209212

@@ -413,7 +416,7 @@ public void map(ImmutableBytesWritable row, Result value, Context context) throw
413416
// skip if we filtered it out
414417
if (kv == null) continue;
415418
Cell ret = convertKv(kv, cfRenameMap);
416-
context.write(new CellWritableComparable(ret), ret);
419+
context.write(new CellWritableComparable(ret), new MapReduceExtendedCell(ret));
417420
}
418421
}
419422
} catch (InterruptedException e) {

hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java

+41
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,47 @@ public void testWithFilter() throws Throwable {
508508
importTable.close();
509509
}
510510

511+
/**
512+
* Create a simple table, run an Export Job on it, Import with bulk output and enable largeResult
513+
*/
514+
@Test
515+
public void testBulkImportAndLargeResult() throws Throwable {
516+
// Create simple table to export
517+
TableDescriptor desc = TableDescriptorBuilder
518+
.newBuilder(TableName.valueOf(name.getMethodName()))
519+
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build())
520+
.build();
521+
UTIL.getAdmin().createTable(desc);
522+
Table exportTable = UTIL.getConnection().getTable(desc.getTableName());
523+
524+
Put p1 = new Put(ROW1);
525+
p1.addColumn(FAMILYA, QUAL, now, QUAL);
526+
527+
// Having another row would actually test the filter.
528+
Put p2 = new Put(ROW2);
529+
p2.addColumn(FAMILYA, QUAL, now, QUAL);
530+
531+
exportTable.put(Arrays.asList(p1, p2));
532+
533+
// Export the simple table
534+
String[] args = new String[] { name.getMethodName(), FQ_OUTPUT_DIR, "1000" };
535+
assertTrue(runExport(args));
536+
537+
// Import to a new table
538+
final String IMPORT_TABLE = name.getMethodName() + "import";
539+
desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(IMPORT_TABLE))
540+
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILYA).setMaxVersions(5).build())
541+
.build();
542+
UTIL.getAdmin().createTable(desc);
543+
544+
String O_OUTPUT_DIR =
545+
new Path(OUTPUT_DIR + 1).makeQualified(FileSystem.get(UTIL.getConfiguration())).toString();
546+
547+
args = new String[] { "-D" + Import.BULK_OUTPUT_CONF_KEY + "=" + O_OUTPUT_DIR,
548+
"-D" + Import.HAS_LARGE_RESULT + "=" + true, IMPORT_TABLE, FQ_OUTPUT_DIR, "1000" };
549+
assertTrue(runImport(args));
550+
}
551+
511552
/**
512553
* Count the number of keyvalues in the specified table with the given filter
513554
* @param table the table to scan

0 commit comments

Comments
 (0)