Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions ambari-infra/ambari-infra-manager-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@
<version>1.3</version>
<scope>test</scope>
</dependency>
<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
*/
package org.apache.ambari.infra;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.CloseableHttpResponse;
Expand All @@ -36,6 +40,8 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;

import static org.apache.commons.lang.StringUtils.isBlank;

Expand Down Expand Up @@ -77,13 +83,28 @@ private String execute(HttpRequestBase post) {
}
}

// TODO: return job data
public void startJob(String jobName, String parameters) {
public String startJob(String jobName, String parameters) {
URIBuilder uriBuilder = new URIBuilder(baseUrl);
uriBuilder.setScheme("http");
uriBuilder.setPath(uriBuilder.getPath() + "/" + jobName);
if (!isBlank(parameters))
uriBuilder.addParameter("params", parameters);
try {
String responseText = execute(new HttpPost(uriBuilder.build()));
Map<String, Object> responseContent = new ObjectMapper().readValue(responseText, new TypeReference<HashMap<String,Object>>() {});
return responseContent.get("jobId").toString();
} catch (URISyntaxException | JsonParseException | JsonMappingException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public void restartJob(String jobName, String jobId) {
URIBuilder uriBuilder = new URIBuilder(baseUrl);
uriBuilder.setScheme("http");
uriBuilder.setPath(String.format("%s/%s/%s/executions", uriBuilder.getPath(), jobName, jobId));
uriBuilder.addParameter("operation", "RESTART");
try {
execute(new HttpPost(uriBuilder.build()));
} catch (URISyntaxException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,14 @@
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import org.apache.ambari.infra.InfraClient;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
Expand Down Expand Up @@ -56,6 +62,7 @@ public abstract class AbstractInfraSteps {
private static final int SOLR_PORT = 8983;
private static final int INFRA_MANAGER_PORT = 61890;
private static final int FAKE_S3_PORT = 4569;
private static final int HDFS_PORT = 9000;
private static final String AUDIT_LOGS_COLLECTION = "audit_logs";
protected static final String S3_BUCKET_NAME = "testbucket";
private String ambariFolder;
Expand All @@ -77,13 +84,22 @@ public AmazonS3Client getS3client() {
return s3client;
}

public String getLocalDataFolder() {
return ambariFolder + "/ambari-infra/ambari-infra-manager/docker/test-out";
}

@BeforeStories
public void initDockerContainer() throws Exception {
LOG.info("Create new docker container for testing Ambari Infra Manager ...");
System.setProperty("HADOOP_USER_NAME", "root");

URL location = AbstractInfraSteps.class.getProtectionDomain().getCodeSource().getLocation();
ambariFolder = new File(location.toURI()).getParentFile().getParentFile().getParentFile().getParent();
shellScriptLocation = ambariFolder + "/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh";

LOG.info("Clean local data folder {}", getLocalDataFolder());
FileUtils.cleanDirectory(new File(getLocalDataFolder()));

shellScriptLocation = ambariFolder + "/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh";
LOG.info("Create new docker container for testing Ambari Infra Manager ...");
runCommand(new String[]{shellScriptLocation, "start"});

dockerHost = System.getProperty("docker.host") != null ? System.getProperty("docker.host") : "localhost";
Expand All @@ -106,7 +122,7 @@ public void initDockerContainer() throws Exception {
checkInfraManagerReachable();
}

protected void runCommand(String[] command) {
private void runCommand(String[] command) {
try {
LOG.info("Exec command: {}", StringUtils.join(command, " "));
Process process = Runtime.getRuntime().exec(command);
Expand All @@ -130,7 +146,7 @@ protected void doWithin(int sec, String actionName, BooleanSupplier predicate) {
});
}

protected void doWithin(int sec, String actionName, Runnable runnable) {
private void doWithin(int sec, String actionName, Runnable runnable) {
long start = currentTimeMillis();
Exception exception;
while (true) {
Expand Down Expand Up @@ -215,9 +231,26 @@ public void shutdownContainers() throws Exception {
ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME);
ObjectListing objectListing = getS3client().listObjects(listObjectsRequest);
LOG.info("Found {} files on s3.", objectListing.getObjectSummaries().size());
objectListing.getObjectSummaries().forEach(s3ObjectSummary -> LOG.info("Found file in s3 with key {}", s3ObjectSummary.getKey()));
objectListing.getObjectSummaries().forEach(s3ObjectSummary -> LOG.info("Found file on s3 with key {}", s3ObjectSummary.getKey()));

LOG.info("Listing files on hdfs.");
try (FileSystem fileSystem = getHdfs()) {
int count = 0;
RemoteIterator<LocatedFileStatus> it = fileSystem.listFiles(new Path("/test_audit_logs"), true);
while (it.hasNext()) {
LOG.info("Found file on hdfs with name {}", it.next().getPath().getName());
++count;
}
LOG.info("{} files found on hfds", count);
}

LOG.info("shutdown containers");
runCommand(new String[]{shellScriptLocation, "stop"});
}

protected FileSystem getHdfs() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", String.format("hdfs://%s:%d/", dockerHost, HDFS_PORT));
return FileSystem.get(conf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,29 @@
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import org.apache.ambari.infra.InfraClient;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.jbehave.core.annotations.Alias;
import org.jbehave.core.annotations.Given;
import org.jbehave.core.annotations.Then;
import org.jbehave.core.annotations.When;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import static java.util.Objects.requireNonNull;
import static org.apache.ambari.infra.OffsetDateTimeConverter.SOLR_DATETIME_FORMATTER;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.containsString;
Expand All @@ -48,6 +56,8 @@
public class ExportJobsSteps extends AbstractInfraSteps {
private static final Logger LOG = LoggerFactory.getLogger(ExportJobsSteps.class);

private Map<String, String> launchedJobs = new HashMap<>();

@Given("$count documents in solr")
public void addDocuments(int count) throws Exception {
OffsetDateTime intervalEnd = OffsetDateTime.now();
Expand Down Expand Up @@ -79,10 +89,18 @@ public void startJob(String jobName) throws Exception {
}

@When("start $jobName job with parameters $parameters")
@Alias("restart $jobName job with parameters $parameters")
public void startJob(String jobName, String parameters) throws Exception {
try (InfraClient httpClient = getInfraClient()) {
httpClient.startJob(jobName, parameters);
String jobId = httpClient.startJob(jobName, parameters);
LOG.info("Job {} started jobId: {}", jobName, jobId);
launchedJobs.put(jobName, jobId);
}
}

@When("restart $jobName job")
public void restartJob(String jobName) throws Exception {
try (InfraClient httpClient = getInfraClient()) {
httpClient.restartJob(jobName, launchedJobs.get(jobName));
}
}

Expand All @@ -103,7 +121,7 @@ public void checkS3After(String text, int waitSec) {
}

@Then("Check $count files exists on s3 server with filenames containing the text $text after $waitSec seconds")
public void checkNumberOfFilesOnS3(int count, String text, int waitSec) {
public void checkNumberOfFilesOnS3(long count, String text, int waitSec) {
AmazonS3Client s3Client = getS3client();
ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME);
doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME)
Expand Down Expand Up @@ -145,4 +163,37 @@ private boolean isSolrEmpty(SolrQuery query) {
throw new UncheckedIOException(e);
}
}

@Then("Check $count files exists on hdfs with filenames containing the text $text in the folder $path after $waitSec seconds")
public void checkNumberOfFilesOnHdfs(int count, String text, String path, int waitSec) throws Exception {
try (FileSystem fileSystem = getHdfs()) {
doWithin(waitSec, "check uploaded files to hdfs", () -> {
try {
int fileCount = 0;
RemoteIterator<LocatedFileStatus> it = fileSystem.listFiles(new Path(path), true);
while (it.hasNext()) {
if (it.next().getPath().getName().contains(text))
++fileCount;
}
return fileCount == count;
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
});
}
}

@Then("Check $count files exists on local filesystem with filenames containing the text $text in the folder $path")
public void checkNumberOfFilesOnLocalFilesystem(long count, String text, String path) {
File destinationDirectory = new File(getLocalDataFolder(), path);
LOG.info("Destination directory path: {}", destinationDirectory.getAbsolutePath());
doWithin(5, "Destination directory exists", destinationDirectory::exists);

File[] files = requireNonNull(destinationDirectory.listFiles(),
String.format("Path %s is not a directory or an I/O error occurred!", destinationDirectory.getAbsolutePath()));
assertThat(Arrays.stream(files)
.filter(file -> file.getName().contains(text))
.count(), is(count));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ When start archive_audit_logs job with parameters writeBlockSize=3,start=2010-10
Then Check 4 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2010-10-09 after 20 seconds
And solr does not contain documents between 2010-10-09T05:00:00.000Z and 2010-10-09T20:00:00.000Z after 5 seconds


Scenario: Running archiving job with a bigger start value than end value exports and deletes 0 documents

Given 10 documents in solr with logtime from 2010-01-01T05:00:00.000Z to 2010-01-04T05:00:00.000Z
When start archive_audit_logs job with parameters writeBlockSize=3,start=2010-01-03T05:00:00.000Z,end=2010-01-02T05:00:00.000Z
Then No file exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2010-01-0
And solr contains 10 documents between 2010-01-01T05:00:00.000Z and 2010-01-04T05:00:00.000Z


Scenario: Archiving job fails when part of the data is exported. After resolving the issue and restarting the job exports the rest of the data.

Given 200 documents in solr with logtime from 2011-10-09T05:00:00.000Z to 2011-10-09T20:00:00.000Z
Expand All @@ -27,12 +29,29 @@ When start archive_audit_logs job with parameters writeBlockSize=20,start=2010-1
Then Check 3 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2011-10-09 after 20 seconds
And solr does not contain documents between 2011-10-09T05:00:00.000Z and 2011-10-09T07:59:59.999Z after 5 seconds
When delete file with key solr_archive_audit_logs_-_2011-10-09T08-00-00.000Z.json.tar.gz from s3
And restart archive_audit_logs job with parameters writeBlockSize=20,start=2010-11-09T00:00:00.000Z,end=2011-10-11T00:00:00.000Z
And restart archive_audit_logs job
Then Check 10 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2011-10-09 after 20 seconds
And solr does not contain documents between 2011-10-09T05:00:00.000Z and 2011-10-09T20:00:00.000Z after 5 seconds


Scenario: After Deleting job deletes documents from solr no document found in the specified interval

Given 10 documents in solr with logtime from 2012-10-09T05:00:00.000Z to 2012-10-09T20:00:00.000Z
When start delete_audit_logs job with parameters start=2012-10-09T05:00:00.000Z,end=2012-10-09T20:00:00.000Z
Then solr does not contain documents between 2012-10-09T05:00:00.000Z and 2012-10-09T20:00:00.000Z after 5 seconds
Then solr does not contain documents between 2012-10-09T05:00:00.000Z and 2012-10-09T20:00:00.000Z after 5 seconds


Scenario: Archiving documents to hdfs

Given 1000 documents in solr with logtime from 2014-01-04T05:00:00.000Z to 2014-01-06T20:00:00.000Z
When start archive_audit_logs job with parameters start=2014-01-04T05:00:00.000Z,end=2014-01-06T20:00:00.000Z,destination=HDFS
Then Check 7 files exists on hdfs with filenames containing the text audit_logs_-_2014-01-0 in the folder /test_audit_logs after 10 seconds
And solr does not contain documents between 2014-01-04T05:00:00.000Z and 2014-01-06T20:00:00.000Z after 10 seconds


Scenario: Archiving documents to local filesystem

Given 200 documents in solr with logtime from 2014-02-04T05:00:00.000Z to 2014-02-06T20:00:00.000Z
When start archive_audit_logs job with parameters start=2014-02-04T05:00:00.000Z,end=2014-02-06T20:00:00.000Z,destination=LOCAL,localDestinationDirectory=/root/archive
Then Check 2 files exists on local filesystem with filenames containing the text audit_logs_-_2014-02-0 in the folder audit_logs_8_2014-02-06T20-00-00.000Z
And solr does not contain documents between 2014-02-04T05:00:00.000Z and 2014-02-06T20:00:00.000Z after 10 seconds
5 changes: 4 additions & 1 deletion ambari-infra/ambari-infra-manager/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
out/*
*.pid
*.pid
Profile
.env
test-out
25 changes: 23 additions & 2 deletions ambari-infra/ambari-infra-manager/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,39 @@ services:
- ${ZOOKEEPER_CONNECTION_STRING}
volumes:
- $AMBARI_LOCATION/ambari-logsearch/ambari-logsearch-server/src/main/configsets:/opt/solr/configsets
localstack-s3:
fakes3:
image: localstack/localstack
hostname: fakes3
ports:
- "4569:4569"
environment:
- SERVICES=s3:4569
hostname: fakes3
networks:
infra-network:
aliases:
- testbucket.fakes3
env_file:
- Profile
namenode:
image: flokkr/hadoop-hdfs-namenode:${HADOOP_VERSION:-3.0.0}
hostname: namenode
ports:
- 9870:9870
- 9000:9000
env_file:
- Profile
environment:
ENSURE_NAMENODE_DIR: "/tmp/hadoop-hdfs/dfs/name"
networks:
- infra-network
datanode:
image: flokkr/hadoop-hdfs-datanode:${HADOOP_VERSION:-3.0.0}
links:
- namenode
env_file:
- Profile
networks:
- infra-network
inframanager:
image: ambari-infra-manager:v1.0
restart: always
Expand All @@ -76,6 +96,7 @@ services:
DISPLAY: $DOCKERIP:0
volumes:
- $AMBARI_LOCATION/ambari-infra/ambari-infra-manager/target/package:/root/ambari-infra-manager
- $AMBARI_LOCATION/ambari-infra/ambari-infra-manager/docker/test-out:/root/archive
networks:
infra-network:
driver: bridge
Loading