Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 6 additions & 76 deletions presto-accumulo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,6 @@
<artifactId>guice</artifactId>
</dependency>

<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>

<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
Expand Down Expand Up @@ -271,75 +265,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-minicluster</artifactId>
<version>${dep.accumulo.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
</exclusion>
<exclusion>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-utils</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-utils</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-monitor</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-impl</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.maven.scm</groupId>
<artifactId>maven-scm-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.maven.scm</groupId>
<artifactId>maven-scm-provider-svnexe</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand All @@ -349,7 +274,12 @@
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>13.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,45 +22,26 @@
import io.prestosql.plugin.accumulo.conf.AccumuloConfig;
import io.prestosql.plugin.accumulo.serializers.LexicoderRowSerializer;
import io.prestosql.plugin.tpch.TpchPlugin;
import io.prestosql.spi.PrestoException;
import io.prestosql.testing.DistributedQueryRunner;
import io.prestosql.testing.QueryRunner;
import io.prestosql.tpch.TpchTable;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text;
import org.intellij.lang.annotations.Language;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Map;

import static io.airlift.units.Duration.nanosSince;
import static io.prestosql.plugin.accumulo.AccumuloErrorCode.MINI_ACCUMULO;
import static io.prestosql.plugin.accumulo.AccumuloErrorCode.UNEXPECTED_ACCUMULO_ERROR;
import static io.prestosql.plugin.accumulo.MiniAccumuloConfigUtil.setConfigClassPath;
import static io.prestosql.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.accumulo.minicluster.MemoryUnit.MEGABYTE;

public final class AccumuloQueryRunner
{
private static final Logger LOG = Logger.get(AccumuloQueryRunner.class);
private static final String MAC_PASSWORD = "secret";
private static final String MAC_USER = "root";

private static boolean tpchLoaded;
private static Connector connector = getAccumuloConnector();

private AccumuloQueryRunner() {}

Expand All @@ -74,21 +55,22 @@ public static synchronized DistributedQueryRunner createAccumuloQueryRunner(Map<
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");

TestingAccumuloServer server = TestingAccumuloServer.getInstance();
queryRunner.installPlugin(new AccumuloPlugin());
Map<String, String> accumuloProperties =
ImmutableMap.<String, String>builder()
.put(AccumuloConfig.INSTANCE, connector.getInstance().getInstanceName())
.put(AccumuloConfig.ZOOKEEPERS, connector.getInstance().getZooKeepers())
.put(AccumuloConfig.USERNAME, MAC_USER)
.put(AccumuloConfig.PASSWORD, MAC_PASSWORD)
.put(AccumuloConfig.INSTANCE, server.getInstanceName())
.put(AccumuloConfig.ZOOKEEPERS, server.getZooKeepers())
.put(AccumuloConfig.USERNAME, server.getUser())
.put(AccumuloConfig.PASSWORD, server.getPassword())
.put(AccumuloConfig.ZOOKEEPER_METADATA_ROOT, "/presto-accumulo-test")
.build();

queryRunner.createCatalog("accumulo", "accumulo", accumuloProperties);

if (!tpchLoaded) {
copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, createSession(), TpchTable.getTables());
connector.tableOperations().addSplits("tpch.orders", ImmutableSortedSet.of(new Text(new LexicoderRowSerializer().encode(BIGINT, 7500L))));
server.getConnector().tableOperations().addSplits("tpch.orders", ImmutableSortedSet.of(new Text(new LexicoderRowSerializer().encode(BIGINT, 7500L))));
tpchLoaded = true;
}

Expand Down Expand Up @@ -158,71 +140,6 @@ public static Session createSession()
return testSessionBuilder().setCatalog("accumulo").setSchema("tpch").build();
}

/**
* Gets the AccumuloConnector singleton, starting the MiniAccumuloCluster on initialization.
* This singleton instance is required so all test cases access the same MiniAccumuloCluster.
*
* @return Accumulo connector
*/
public static Connector getAccumuloConnector()
{
if (connector != null) {
return connector;
}

try {
MiniAccumuloCluster accumulo = createMiniAccumuloCluster();
Instance instance = new ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers());
connector = instance.getConnector(MAC_USER, new PasswordToken(MAC_PASSWORD));
LOG.info("Connection to MAC instance %s at %s established, user %s password %s", accumulo.getInstanceName(), accumulo.getZooKeepers(), MAC_USER, MAC_PASSWORD);
return connector;
}
catch (AccumuloException | AccumuloSecurityException | InterruptedException | IOException e) {
throw new PrestoException(UNEXPECTED_ACCUMULO_ERROR, "Failed to get connector to Accumulo", e);
}
}

/**
* Creates and starts an instance of MiniAccumuloCluster, returning the new instance.
*
* @return New MiniAccumuloCluster
*/
private static MiniAccumuloCluster createMiniAccumuloCluster()
throws IOException, InterruptedException
{
// Create MAC directory
File macDir = Files.createTempDirectory("mac-").toFile();
LOG.info("MAC is enabled, starting MiniAccumuloCluster at %s", macDir);

// Start MAC and connect to it
MiniAccumuloCluster accumulo = new MiniAccumuloCluster(macDir, MAC_PASSWORD);
accumulo.getConfig().setDefaultMemory(512, MEGABYTE);
setConfigClassPath(accumulo.getConfig());
accumulo.start();

// Add shutdown hook to stop MAC and cleanup temporary files
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
LOG.info("Shutting down MAC");
accumulo.stop();
}
catch (IOException | InterruptedException e) {
Thread.currentThread().interrupt();
throw new PrestoException(MINI_ACCUMULO, "Failed to shut down MAC instance", e);
}

try {
LOG.info("Cleaning up MAC directory");
FileUtils.forceDelete(macDir);
}
catch (IOException e) {
throw new PrestoException(MINI_ACCUMULO, "Failed to clean up MAC directory", e);
}
}));

return accumulo;
}

public static void main(String[] args)
throws Exception
{
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public TestAccumuloClient()
.setUsername("root")
.setPassword("secret");

Connector connector = AccumuloQueryRunner.getAccumuloConnector();
Connector connector = TestingAccumuloServer.getInstance().getConnector();
config.setZooKeepers(connector.getInstance().getZooKeepers());
zooKeeperMetadataManager = new ZooKeeperMetadataManager(config, new InternalTypeManager(createTestMetadataManager(), new TypeOperators()));
client = new AccumuloClient(connector, config, zooKeeperMetadataManager, new AccumuloTableManager(connector), new IndexLookup(connector, new ColumnCardinalityCache(connector, config)));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.accumulo;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.testcontainers.containers.FixedHostPortGenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;

import java.time.Duration;

import static java.lang.String.format;

public class TestingAccumuloServer
{
private static final int ACCUMULO_MASTER_PORT = 9999;
private static final int ACCUMULO_TSERVER_PORT = 9997;
private static final int ZOOKEEPER_PORT = 2181;

private static final TestingAccumuloServer instance = new TestingAccumuloServer();

private final FixedHostPortGenericContainer<?> accumuloContainer;

public static TestingAccumuloServer getInstance()
{
return instance;
}

private TestingAccumuloServer()
{
accumuloContainer = new FixedHostPortGenericContainer<>("prestodev/accumulo:35");
accumuloContainer.withFixedExposedPort(ACCUMULO_MASTER_PORT, ACCUMULO_MASTER_PORT);
accumuloContainer.withFixedExposedPort(ACCUMULO_TSERVER_PORT, ACCUMULO_TSERVER_PORT);
accumuloContainer.withExposedPorts(ZOOKEEPER_PORT);
accumuloContainer.waitingFor(Wait.forHealthcheck().withStartupTimeout(Duration.ofMinutes(10)));

// No need for an explicit stop since this server is a singleton
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a singleton for now is fine as a transition from the old way, but we should move to a model where the Accumulo cluster is managed by AccumuloQueryRunner, the same way we do for other testing services. Shutting down resources immediately is useful since the CI machines can be resource constrained, and using a non-singleton allows things like testing multiple versions, testing with different config, etc.

Add a TODO here to make this not a singleton.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but we should move to a model where the Accumulo cluster is managed by AccumuloQueryRunner,

Let's create a github issue for this and add TODO comment (with link to the issue to the code).

// and the container will be stopped by TestContainers on shutdown
// TODO Change this class to not be a singleton
// https://github.com/prestosql/presto/issues/5842
accumuloContainer.start();
}

public String getInstanceName()
{
return "default";
}

public String getZooKeepers()
{
return format("%s:%s", accumuloContainer.getHost(), accumuloContainer.getMappedPort(ZOOKEEPER_PORT));
}

public String getUser()
{
return "root";
}

public String getPassword()
{
return "secret";
}

public Connector getConnector()
{
try {
ZooKeeperInstance instance = new ZooKeeperInstance(getInstanceName(), getZooKeepers());
return instance.getConnector(getUser(), new PasswordToken(getPassword()));
}
catch (AccumuloException | AccumuloSecurityException e) {
throw new RuntimeException(e);
}
}
}