Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions plugins/repository-hdfs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,6 @@ tasks.register("javaRestTestHdfs2", RestIntegTestTask) {
classpath = sourceSets.javaRestTest.runtimeClasspath + configurations.hdfsFixture2
}

tasks.named('yamlRestTest').configure {
classpath = sourceSets.yamlRestTest.runtimeClasspath + configurations.hdfsFixture3
}

tasks.register("yamlRestTestHdfs2", RestIntegTestTask) {
description = "Runs yaml rest tests against an elasticsearch cluster with HDFS version 2"
testClassesDirs = sourceSets.yamlRestTest.output.classesDirs
classpath = sourceSets.yamlRestTest.runtimeClasspath + configurations.hdfsFixture2
}

tasks.named("test").configure {
onlyIf("Not running on windows") {
OS.current().equals(OS.WINDOWS) == false
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.repositories.hdfs;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.test.fixtures.hdfs.HdfsClientThreadLeakFilter;
import org.elasticsearch.test.fixtures.hdfs.HdfsFixture;
import org.elasticsearch.test.fixtures.testcontainers.TestContainersThreadFilter;
import org.elasticsearch.test.rest.ESRestTestCase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.READONLY_SETTING_KEY;

@ThreadLeakFilters(filters = { HdfsClientThreadLeakFilter.class, TestContainersThreadFilter.class })
abstract class AbstractRepositoryHdfsRestIT extends ESRestTestCase {

abstract HdfsFixture hdfsFixture();

public void testBadUrl() throws IOException {
final var ex = assertThrows(
ResponseException.class,
() -> registerHdfsRepository("file://does-not-matter", randomRepoName(), randomPath(), randomBoolean(), randomBoolean())
);
assertEquals("expect bad URL response, got: " + ex.getResponse(), 500, ex.getResponse().getStatusLine().getStatusCode());
}

public void testCreateGetDeleteRepository() throws IOException {
final var repoName = randomRepoName();
final var path = randomPath();
for (int i = 0; i < between(2, 5); i++) {
registerHdfsRepository(hdfsUri0(), repoName, path, randomBoolean(), randomBoolean());
final var repo = getRepository(repoName);
assertEquals("hdfs", repo.get(repoName + ".type"));
assertEquals(hdfsUri0(), repo.get(repoName + ".settings.uri"));
assertEquals(path, repo.get(repoName + ".settings.path"));
deleteRepository(repoName);
assertRepositoryNotFound(repoName);
}
}

public void testCreateAndVerifyRepository() throws IOException {
registerHdfsRepository(hdfsUri0(), randomRepoName(), randomPath(), true, false);
}

public void testCreateListDeleteSnapshots() throws IOException {
final var repoName = randomRepoName();
final var path = randomPath();
registerHdfsRepository(hdfsUri0(), repoName, path, false, false);

final var indexName = randomIndexName();
final var primaries = between(1, 3);
createIndex(indexName, indexSettings(primaries, between(0, 2)).build());

final var snapshotNames = randomList(1, 10, ESRestTestCase::randomIdentifier);
for (var snapshotName : snapshotNames) {
final var createSnapshot = createSnapshot(repoName, snapshotName, true);
assertEquals(snapshotName, createSnapshot.get("snapshot.snapshot"));
assertEquals("SUCCESS", createSnapshot.get("snapshot.state"));
assertEquals(Integer.valueOf(primaries), createSnapshot.get("snapshot.shards.successful"));
assertEquals(Integer.valueOf(0), createSnapshot.get("snapshot.shards.failed"));
}

// unregister and re-register the repo to confirm the snapshots persist
final var maybeReregisterRepo = randomBoolean();
if (maybeReregisterRepo) {
deleteRepository(repoName);
registerHdfsRepository(hdfsUri0(), repoName, path, false, false);
}

assertEquals(
"list API must return all snapshots",
snapshotNames.stream().sorted().toList(),
listAllSnapshotNames(repoName).stream().sorted().toList()
);

final var remainingSnapshots = new ArrayList<>(snapshotNames);
Collections.sort(remainingSnapshots);
while (remainingSnapshots.isEmpty() == false) {
deleteSnapshot(repoName, remainingSnapshots.removeLast(), false);
assertEquals(remainingSnapshots, listAllSnapshotNames(repoName).stream().sorted().toList());
}

deleteRepository(repoName);
}

private List<String> listAllSnapshotNames(String repoName) throws IOException {
return listAllSnapshots(repoName).<List<Map<String, ?>>>get("snapshots")
.stream()
.map(info -> (String) info.get("snapshot"))
.toList();
}

public void testCreateReadOnlyRepo() throws IOException {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we confirm in this case that creating or deleting a snapshot fails with an appropriate error?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stumbled over error codes. I was expecting 400, but got 500. Fixing in separate PR #140200, once merged will proceed with current one

final var repoName = randomRepoName();
final var path = hdfsFixture().getExistingReadonlyRepoPath();
registerHdfsRepository(hdfsUri0(), repoName, path, false, true);
final var snapshots = listAllSnapshotNames(repoName);
assertEquals("repository must contain exactly 1 snapshot", 1, snapshots.size());

final var createSnapshotError = assertThrows(ResponseException.class, () -> createSnapshot(repoName, randomIdentifier(), true));
assertEquals(400, createSnapshotError.getResponse().getStatusLine().getStatusCode());
final var deleteSnapshotError = assertThrows(ResponseException.class, () -> deleteSnapshot(repoName, snapshots.getFirst(), true));
assertEquals(400, deleteSnapshotError.getResponse().getStatusLine().getStatusCode());

deleteRepository(repoName);
}

public void testRestore() throws IOException {
final var repoName = randomRepoName();
final var path = randomPath();
registerHdfsRepository(hdfsUri0(), repoName, path, false, false);

final var indexName = randomIndexName();
createIndex(indexName, indexSettings(1, 0).build());

ensureGreen(indexName);

final var snapshotName = randomSnapshotName();
createSnapshot(repoName, snapshotName, true);

closeIndex(indexName);

restoreSnapshot(repoName, snapshotName, true);

final var indexRecovery = getIndexRecovery(indexName);
final var shard0 = indexName + ".shards.0.";
assertEquals("SNAPSHOT", indexRecovery.get(shard0 + "type"));
assertEquals("DONE", indexRecovery.get(shard0 + "stage"));
assertEquals(Integer.valueOf(1), indexRecovery.get(shard0 + "index.files.recovered"));
assertTrue((int) indexRecovery.get(shard0 + "index.size.recovered_in_bytes") >= 0);
assertEquals(Integer.valueOf(0), indexRecovery.get(shard0 + "index.files.reused"));
assertEquals(Integer.valueOf(0), indexRecovery.get(shard0 + "index.size.reused_in_bytes"));
Comment on lines +141 to +148
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd kinda like us to verify that a document indexed before creating the snapshot is visible to searches at this point.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, which document, my understanding original test created only index without documents.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think this was an unnecessary omission, ideally we'd confirm that the snapshotted data was actually restored and not just an empty index. Not essential, just a nice-to-have.


deleteSnapshot(repoName, snapshotName, false);
deleteRepository(repoName);
}

protected String hdfsUri0() {
return hdfsUri(0);
}

protected String hdfsUri(int nodeId) {
return "hdfs://localhost:" + hdfsFixture().getPort(nodeId);
}

protected void registerHdfsRepository(String uri, String repoName, String path, boolean verify, boolean readOnly) throws IOException {
final var settings = Settings.builder().put("uri", uri).put("path", path).put(READONLY_SETTING_KEY, readOnly);
final var principal = securityPrincipal();
if (principal != null) {
settings.put("security.principal", principal);
settings.put("conf.dfs.data.transfer.protection", "authentication");
}
registerRepository(repoName, "hdfs", verify, settings.build());
}

@Nullable
String securityPrincipal() {
return null;
}

String basePath() {
// meaningless but different bases
return randomFrom("", randomIdentifier(), "/test", "/user/elasticsearch/test");
}

private String randomPath() {
return basePath() + "/" + randomIdentifier();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,16 @@

package org.elasticsearch.repositories.hdfs;

import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.fixtures.hdfs.HdfsClientThreadLeakFilter;
import org.elasticsearch.test.fixtures.hdfs.HdfsFixture;
import org.elasticsearch.test.fixtures.testcontainers.TestContainersThreadFilter;
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;

import java.util.Map;

@ThreadLeakFilters(filters = { HdfsClientThreadLeakFilter.class, TestContainersThreadFilter.class })
public class RepositoryHdfsClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
public class RepositoryHdfsRestIT extends AbstractRepositoryHdfsRestIT {

public static HdfsFixture hdfsFixture = new HdfsFixture();
private static final HdfsFixture hdfsFixture = new HdfsFixture();

public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
Expand All @@ -41,17 +30,13 @@ public class RepositoryHdfsClientYamlTestSuiteIT extends ESClientYamlSuiteTestCa
@ClassRule
public static TestRule ruleChain = RuleChain.outerRule(hdfsFixture).around(cluster);

public RepositoryHdfsClientYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) {
super(testCandidate);
}

@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}

@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return createParameters(Map.of("hdfs_port", hdfsFixture.getPort()), "hdfs_repository");
@Override
HdfsFixture hdfsFixture() {
return hdfsFixture;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,55 +9,51 @@

package org.elasticsearch.repositories.hdfs;

import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.cluster.util.resource.Resource;
import org.elasticsearch.test.fixtures.hdfs.HdfsClientThreadLeakFilter;
import org.elasticsearch.test.fixtures.hdfs.HdfsFixture;
import org.elasticsearch.test.fixtures.krb5kdc.Krb5kDcContainer;
import org.elasticsearch.test.fixtures.testcontainers.TestContainersThreadFilter;
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;

import java.util.Map;
public class SecureRepositoryHdfsRestIT extends AbstractRepositoryHdfsRestIT {

@ThreadLeakFilters(filters = { HdfsClientThreadLeakFilter.class, TestContainersThreadFilter.class })
public class SecureRepositoryHdfsClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
public static Krb5kDcContainer krb5Fixture = new Krb5kDcContainer();
static final Krb5kDcContainer krb5Fixture = new Krb5kDcContainer();

public static HdfsFixture hdfsFixture = new HdfsFixture().withKerberos(() -> krb5Fixture.getPrincipal(), () -> krb5Fixture.getKeytab());
static final HdfsFixture hdfsFixture = new HdfsFixture().withKerberos(krb5Fixture::getPrincipal, krb5Fixture::getKeytab);

public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.plugin("repository-hdfs")
.setting("xpack.license.self_generated.type", "trial")
.setting("xpack.security.enabled", "false")
.systemProperty("java.security.krb5.conf", () -> krb5Fixture.getConfPath().toString())
.configFile("repository-hdfs/krb5.conf", Resource.fromString(() -> krb5Fixture.getConf()))
.configFile("repository-hdfs/krb5.keytab", Resource.fromFile(() -> krb5Fixture.getEsKeytab()))
.configFile("repository-hdfs/krb5.conf", Resource.fromString(krb5Fixture::getConf))
.configFile("repository-hdfs/krb5.keytab", Resource.fromFile(krb5Fixture::getEsKeytab))
.build();

@ClassRule
public static TestRule ruleChain = RuleChain.outerRule(krb5Fixture).around(hdfsFixture).around(cluster);

public SecureRepositoryHdfsClientYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) {
super(testCandidate);
}

@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}

@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return createParameters(Map.of("secure_hdfs_port", hdfsFixture.getPort()), "secure_hdfs_repository");
@Override
HdfsFixture hdfsFixture() {
return hdfsFixture;
}

@Override
String securityPrincipal() {
return krb5Fixture.getEsPrincipal();
}

@Override
String basePath() {
return "/user/elasticsearch";
}
}

This file was deleted.

This file was deleted.

Loading