Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve BulkSplitOptimizationIT #5179

Merged
merged 3 commits into from
Jan 17, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,85 +18,133 @@
*/
package org.apache.accumulo.test.functional;

import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.test.VerifyIngest;
import org.apache.accumulo.test.VerifyIngest.VerifyParams;
import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This test verifies that when a lot of files are bulk imported into a table with one tablet and
* then splits that not all data files go to the children tablets.
*/
public class BulkSplitOptimizationIT extends AccumuloClusterHarness {
private static final Logger log = LoggerFactory.getLogger(BulkSplitOptimizationIT.class);

Path testDir;

@Override
protected Duration defaultTimeout() {
return Duration.ofMinutes(2);
return Duration.ofMinutes(5);
}

@BeforeEach
public void alterConfig() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
final int initialTserverCount =
client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size();
log.info("Tserver count: {}", initialTserverCount);
Timer timer = Timer.startNew();
getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
Wait.waitFor(
() -> client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).isEmpty(),
120_000);
log.info("Took {} ms to stop all tservers", timer.elapsed(MILLISECONDS));
timer.restart();
getClusterControl().startAllServers(ServerType.TABLET_SERVER);
Wait.waitFor(() -> client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size()
< initialTserverCount, 120_000);
log.info("Took {} ms to start all tservers", timer.elapsed(MILLISECONDS));

FileSystem fs = cluster.getFileSystem();
testDir = new Path(cluster.getTemporaryPath(), "testmf");
fs.deleteOnExit(testDir);

timer.restart();
FunctionalTestUtils.createRFiles(client, fs, testDir.toString(), ROWS, SPLITS, 8);
long elapsed = timer.elapsed(MILLISECONDS);
FileStatus[] stats = fs.listStatus(testDir);
log.info("Generated {} files in {} ms", stats.length, elapsed);
}
}

@AfterEach
public void resetConfig() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
getClusterControl().startAllServers(ServerType.TABLET_SERVER);
}
getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
getClusterControl().startAllServers(ServerType.TABLET_SERVER);
}

private static final int ROWS = 100000;
private static final int SPLITS = 99;

@Test
public void testBulkSplitOptimization() throws Exception {
log.info("Starting BulkSplitOptimizationIT test");
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
final String tableName = getUniqueNames(1)[0];
c.tableOperations().create(tableName);
c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "1000");
c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "1000");
c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "1G");
FileSystem fs = cluster.getFileSystem();
Path testDir = new Path(cluster.getTemporaryPath(), "testmf");
fs.deleteOnExit(testDir);
FunctionalTestUtils.createRFiles(c, fs, testDir.toString(), ROWS, SPLITS, 8);
FileStatus[] stats = fs.listStatus(testDir);

System.out.println("Number of generated files: " + stats.length);
final String tableName = getUniqueNames(1)[0];
Map<String,String> tableProps = new HashMap<>();
tableProps.put(Property.TABLE_MAJC_RATIO.getKey(), "1000");
tableProps.put(Property.TABLE_FILE_MAX.getKey(), "1000");
tableProps.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "1G");

log.info("Creating table {}", tableName);
Timer timer = Timer.startNew();
c.tableOperations().create(tableName, new NewTableConfiguration().setProperties(tableProps)
.withInitialTabletAvailability(TabletAvailability.HOSTED));
log.info("Created table in {} ms. Starting bulk import", timer.elapsed(MILLISECONDS));

timer.restart();
c.tableOperations().importDirectory(testDir.toString()).to(tableName).load();
log.info("Imported into table {} in {} ms", tableName, timer.elapsed(MILLISECONDS));

timer.restart();
FunctionalTestUtils.checkSplits(c, tableName, 0, 0);
FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 100, 100);
log.info("Checked splits and rfiles in {} ms", timer.elapsed(MILLISECONDS));

// initiate splits
log.info("Lowering split threshold to 100K to initiate splits");
c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "100K");

Thread.sleep(SECONDS.toMillis(2));
timer.restart();

// wait until over split threshold -- should be 78 splits
while (c.tableOperations().listSplits(tableName).size() < 50) {
Thread.sleep(500);
}
Wait.waitFor(() -> {
try {
FunctionalTestUtils.checkSplits(c, tableName, 50, 100);
} catch (Exception e) {
if (e.getMessage().contains("splits points out of range")) {
return false;
} else {
throw e;
}
}
return true;
});

log.info("Took {} ms for split count to reach expected range", timer.elapsed(MILLISECONDS));

FunctionalTestUtils.checkSplits(c, tableName, 50, 100);
VerifyParams params = new VerifyParams(getClientProps(), tableName, ROWS);
params.timestamp = 1;
params.dataSize = 50;
Expand Down
Loading