Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve BulkSplitOptimizationIT #5179

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,63 @@
*/
package org.apache.accumulo.test.functional;

import static java.util.concurrent.TimeUnit.SECONDS;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.test.VerifyIngest;
import org.apache.accumulo.test.VerifyIngest.VerifyParams;
import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This test verifies that when a lot of files are bulk imported into a table with one tablet and
* then splits that not all data files go to the children tablets.
*/
public class BulkSplitOptimizationIT extends AccumuloClusterHarness {
private static final Logger log = LoggerFactory.getLogger(BulkSplitOptimizationIT.class);

Path testDir;

@Override
protected Duration defaultTimeout() {
return Duration.ofMinutes(2);
return Duration.ofMinutes(5);
}

@BeforeEach
public void alterConfig() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
getClusterControl().startAllServers(ServerType.TABLET_SERVER);

FileSystem fs = cluster.getFileSystem();
testDir = new Path(cluster.getTemporaryPath(), "testmf");
fs.deleteOnExit(testDir);
FunctionalTestUtils.createRFiles(client, fs, testDir.toString(), ROWS, SPLITS, 8);
FileStatus[] stats = fs.listStatus(testDir);

log.info("Number of generated files: {}", stats.length);
}
}

@AfterEach
public void resetConfig() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
getClusterControl().startAllServers(ServerType.TABLET_SERVER);
}
getClusterControl().stopAllServers(ServerType.TABLET_SERVER);
getClusterControl().startAllServers(ServerType.TABLET_SERVER);
}

private static final int ROWS = 100000;
Expand All @@ -70,33 +84,36 @@ public void resetConfig() throws Exception {
public void testBulkSplitOptimization() throws Exception {
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
final String tableName = getUniqueNames(1)[0];
c.tableOperations().create(tableName);
c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "1000");
c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "1000");
c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "1G");
FileSystem fs = cluster.getFileSystem();
Path testDir = new Path(cluster.getTemporaryPath(), "testmf");
fs.deleteOnExit(testDir);
FunctionalTestUtils.createRFiles(c, fs, testDir.toString(), ROWS, SPLITS, 8);
FileStatus[] stats = fs.listStatus(testDir);

System.out.println("Number of generated files: " + stats.length);
Map<String,String> tableProps = new HashMap<>();
tableProps.put(Property.TABLE_MAJC_RATIO.getKey(), "1000");
tableProps.put(Property.TABLE_FILE_MAX.getKey(), "1000");
tableProps.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "1G");
c.tableOperations().create(tableName, new NewTableConfiguration().setProperties(tableProps)
.withInitialTabletAvailability(TabletAvailability.HOSTED));

log.info("Starting bulk import");
c.tableOperations().importDirectory(testDir.toString()).to(tableName).load();

FunctionalTestUtils.checkSplits(c, tableName, 0, 0);
FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 100, 100);

// initiate splits
log.info("Lowering split threshold to 100K to initiate splits");
c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "100K");

Thread.sleep(SECONDS.toMillis(2));

// wait until over split threshold -- should be 78 splits
while (c.tableOperations().listSplits(tableName).size() < 50) {
Thread.sleep(500);
}
Wait.waitFor(() -> {
try {
FunctionalTestUtils.checkSplits(c, tableName, 50, 100);
} catch (Exception e) {
if (e.getMessage().contains("splits points out of range")) {
return false;
} else {
throw e;
}
}
return true;
});

FunctionalTestUtils.checkSplits(c, tableName, 50, 100);
VerifyParams params = new VerifyParams(getClientProps(), tableName, ROWS);
params.timestamp = 1;
params.dataSize = 50;
Expand Down
Loading