Skip to content

Commit

Permalink
fixes slow bulk import with many tablets and file (#5044)
Browse files Browse the repository at this point in the history
* fixes slow bulk import with many tablets and file

The bulk import code was reading all tablets in the bulk import range
for each range being bulk imported. This resulted in O(N^2) metadata
table scans which made really large bulk imports really slow.

Added a new test that bulk imports thousands of files into thousands of
tablets.  Running this test w/o the fixes in this PR the following time
is seen for the fate step.

```
DEBUG: Running LoadFiles.isReady() FATE:USER:6320e73d-e661-4c66-bf25-c0c27a0a79d5 took 289521 ms and returned 0
```

With this fix in this PR seeing the following times for the new test,
so goes from 290s to 1.2s.

```
DEBUG: Running LoadFiles.isReady() FATE:USER:18e52fc2-5876-4b01-ba7b-3b3c099a82be took 1225 ms and returned 0
```

This bug does not seem to exists in 2.1 or 3.1.  Did not run the test
though, may be worthwhile to backport the test.
  • Loading branch information
keith-turner authored Nov 8, 2024
1 parent 34dcdd9 commit cfae5e9
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -338,11 +338,15 @@ private long loadFiles(TableId tableId, Path bulkDir, LoadMappingIterator loadMa
TabletsMetadata.builder(manager.getContext()).forTable(tableId).overlapping(startRow, null)
.checkConsistency().fetch(PREV_ROW, LOCATION, LOADED, TIME).build()) {

// The tablet iterator and load mapping iterator are both iterating over data that is sorted
// in the same way. The two iterators are each independently advanced to find common points in
// the sorted data.
var tabletIter = tabletsMetadata.iterator();

t1 = System.currentTimeMillis();
while (lmi.hasNext()) {
loadMapEntry = lmi.next();
List<TabletMetadata> tablets =
findOverlappingTablets(loadMapEntry.getKey(), tabletsMetadata.iterator());
List<TabletMetadata> tablets = findOverlappingTablets(loadMapEntry.getKey(), tabletIter);
loader.load(tablets, loadMapEntry.getValue());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -49,6 +51,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -829,6 +832,59 @@ public void testAvailability() throws Exception {
}
}

@Test
public void testManyTabletAndFiles() throws Exception {
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
String dir = getDir("/testBulkFile-");
FileSystem fs = getCluster().getFileSystem();
fs.mkdirs(new Path(dir));

TreeSet<Text> splits = IntStream.range(1, 9000).mapToObj(BulkNewIT::row).map(Text::new)
.collect(Collectors.toCollection(TreeSet::new));
c.tableOperations().addSplits(tableName, splits);

var executor = Executors.newFixedThreadPool(16);
var futures = new ArrayList<Future<?>>();

var loadPlanBuilder = LoadPlan.builder();
var rowsExpected = new HashSet<>();
var imports = IntStream.range(2, 8999).boxed().collect(Collectors.toList());
// The order in which imports are added to the load plan should not matter so test that.
Collections.shuffle(imports);
for (var data : imports) {
String filename = "f" + data + ".";
loadPlanBuilder.loadFileTo(filename + RFile.EXTENSION, RangeType.TABLE, row(data - 1),
row(data));
var future = executor.submit(() -> {
writeData(dir + "/" + filename, aconf, data, data);
return null;
});
futures.add(future);
rowsExpected.add(row(data));
}

for (var future : futures) {
future.get();
}

executor.shutdown();

var loadPlan = loadPlanBuilder.build();

c.tableOperations().importDirectory(dir).to(tableName).plan(loadPlan).load();

// using a batch scanner can read from lots of tablets w/ less RPCs
try (var scanner = c.createBatchScanner(tableName)) {
// use a scan server so that tablets do not need to be hosted
scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL);
scanner.setRanges(List.of(new Range()));
var rowsSeen = scanner.stream().map(e -> e.getKey().getRowData().toString())
.collect(Collectors.toSet());
assertEquals(rowsExpected, rowsSeen);
}
}
}

/**
* @return Map w/ keys that are end rows of tablets and the value is a true when the tablet has a
* current location.
Expand Down

0 comments on commit cfae5e9

Please sign in to comment.