diff --git a/ambari-infra/ambari-infra-manager-it/pom.xml b/ambari-infra/ambari-infra-manager-it/pom.xml index 97e8ea0f44f..6dcb4b67c91 100644 --- a/ambari-infra/ambari-infra-manager-it/pom.xml +++ b/ambari-infra/ambari-infra-manager-it/pom.xml @@ -89,6 +89,18 @@ 1.3 test + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + test + + + org.apache.hadoop + hadoop-hdfs-client + ${hadoop.version} + diff --git a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java index 0e391a3e6e6..b798ce1c016 100644 --- a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java +++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java @@ -18,6 +18,10 @@ */ package org.apache.ambari.infra; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.io.IOUtils; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.methods.CloseableHttpResponse; @@ -36,6 +40,8 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; import static org.apache.commons.lang.StringUtils.isBlank; @@ -77,13 +83,28 @@ private String execute(HttpRequestBase post) { } } - // TODO: return job data - public void startJob(String jobName, String parameters) { + public String startJob(String jobName, String parameters) { URIBuilder uriBuilder = new URIBuilder(baseUrl); uriBuilder.setScheme("http"); uriBuilder.setPath(uriBuilder.getPath() + "/" + jobName); if (!isBlank(parameters)) uriBuilder.addParameter("params", parameters); + try { + String responseText = execute(new HttpPost(uriBuilder.build())); + Map responseContent = new ObjectMapper().readValue(responseText, new TypeReference>() {}); + return responseContent.get("jobId").toString(); + } catch (URISyntaxException | JsonParseException | JsonMappingException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public void restartJob(String jobName, String jobId) { + URIBuilder uriBuilder = new URIBuilder(baseUrl); + uriBuilder.setScheme("http"); + uriBuilder.setPath(String.format("%s/%s/%s/executions", uriBuilder.getPath(), jobName, jobId)); + uriBuilder.addParameter("operation", "RESTART"); try { execute(new HttpPost(uriBuilder.build())); } catch (URISyntaxException e) { diff --git a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java index f48d4c2b491..ece1c59a223 100644 --- a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java +++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java @@ -23,8 +23,14 @@ import com.amazonaws.services.s3.model.ListObjectsRequest; import com.amazonaws.services.s3.model.ObjectListing; import org.apache.ambari.infra.InfraClient; +import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; @@ -56,6 +62,7 @@ public abstract class AbstractInfraSteps { private static final int SOLR_PORT = 8983; private static final int INFRA_MANAGER_PORT = 61890; private static final int FAKE_S3_PORT = 4569; + private static final int HDFS_PORT = 9000; private static final String AUDIT_LOGS_COLLECTION = "audit_logs"; protected static final String S3_BUCKET_NAME = "testbucket"; private String ambariFolder; @@ -77,13 +84,22 @@ public AmazonS3Client getS3client() { return s3client; } + public String getLocalDataFolder() { + return ambariFolder + "/ambari-infra/ambari-infra-manager/docker/test-out"; + } + @BeforeStories public void initDockerContainer() throws Exception { - LOG.info("Create new docker container for testing Ambari Infra Manager ..."); + System.setProperty("HADOOP_USER_NAME", "root"); + URL location = AbstractInfraSteps.class.getProtectionDomain().getCodeSource().getLocation(); ambariFolder = new File(location.toURI()).getParentFile().getParentFile().getParentFile().getParent(); - shellScriptLocation = ambariFolder + "/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh"; + LOG.info("Clean local data folder {}", getLocalDataFolder()); + FileUtils.cleanDirectory(new File(getLocalDataFolder())); + + shellScriptLocation = ambariFolder + "/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh"; + LOG.info("Create new docker container for testing Ambari Infra Manager ..."); runCommand(new String[]{shellScriptLocation, "start"}); dockerHost = System.getProperty("docker.host") != null ? System.getProperty("docker.host") : "localhost"; @@ -106,7 +122,7 @@ public void initDockerContainer() throws Exception { checkInfraManagerReachable(); } - protected void runCommand(String[] command) { + private void runCommand(String[] command) { try { LOG.info("Exec command: {}", StringUtils.join(command, " ")); Process process = Runtime.getRuntime().exec(command); @@ -130,7 +146,7 @@ protected void doWithin(int sec, String actionName, BooleanSupplier predicate) { }); } - protected void doWithin(int sec, String actionName, Runnable runnable) { + private void doWithin(int sec, String actionName, Runnable runnable) { long start = currentTimeMillis(); Exception exception; while (true) { @@ -215,9 +231,26 @@ public void shutdownContainers() throws Exception { ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME); ObjectListing objectListing = getS3client().listObjects(listObjectsRequest); LOG.info("Found {} files on s3.", objectListing.getObjectSummaries().size()); - objectListing.getObjectSummaries().forEach(s3ObjectSummary -> LOG.info("Found file in s3 with key {}", s3ObjectSummary.getKey())); + objectListing.getObjectSummaries().forEach(s3ObjectSummary -> LOG.info("Found file on s3 with key {}", s3ObjectSummary.getKey())); + + LOG.info("Listing files on hdfs."); + try (FileSystem fileSystem = getHdfs()) { + int count = 0; + RemoteIterator it = fileSystem.listFiles(new Path("/test_audit_logs"), true); + while (it.hasNext()) { + LOG.info("Found file on hdfs with name {}", it.next().getPath().getName()); + ++count; + } + LOG.info("{} files found on hfds", count); + } LOG.info("shutdown containers"); runCommand(new String[]{shellScriptLocation, "stop"}); } + + protected FileSystem getHdfs() throws IOException { + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", String.format("hdfs://%s:%d/", dockerHost, HDFS_PORT)); + return FileSystem.get(conf); + } } diff --git a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java index 22826a01fcb..7e54a3198ae 100644 --- a/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java +++ b/ambari-infra/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java @@ -23,9 +23,12 @@ import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.ObjectMetadata; import org.apache.ambari.infra.InfraClient; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrServerException; -import org.jbehave.core.annotations.Alias; import org.jbehave.core.annotations.Given; import org.jbehave.core.annotations.Then; import org.jbehave.core.annotations.When; @@ -33,11 +36,16 @@ import org.slf4j.LoggerFactory; import java.io.ByteArrayInputStream; +import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; import java.time.Duration; import java.time.OffsetDateTime; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import static java.util.Objects.requireNonNull; import static org.apache.ambari.infra.OffsetDateTimeConverter.SOLR_DATETIME_FORMATTER; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.containsString; @@ -48,6 +56,8 @@ public class ExportJobsSteps extends AbstractInfraSteps { private static final Logger LOG = LoggerFactory.getLogger(ExportJobsSteps.class); + private Map launchedJobs = new HashMap<>(); + @Given("$count documents in solr") public void addDocuments(int count) throws Exception { OffsetDateTime intervalEnd = OffsetDateTime.now(); @@ -79,10 +89,18 @@ public void startJob(String jobName) throws Exception { } @When("start $jobName job with parameters $parameters") - @Alias("restart $jobName job with parameters $parameters") public void startJob(String jobName, String parameters) throws Exception { try (InfraClient httpClient = getInfraClient()) { - httpClient.startJob(jobName, parameters); + String jobId = httpClient.startJob(jobName, parameters); + LOG.info("Job {} started jobId: {}", jobName, jobId); + launchedJobs.put(jobName, jobId); + } + } + + @When("restart $jobName job") + public void restartJob(String jobName) throws Exception { + try (InfraClient httpClient = getInfraClient()) { + httpClient.restartJob(jobName, launchedJobs.get(jobName)); } } @@ -103,7 +121,7 @@ public void checkS3After(String text, int waitSec) { } @Then("Check $count files exists on s3 server with filenames containing the text $text after $waitSec seconds") - public void checkNumberOfFilesOnS3(int count, String text, int waitSec) { + public void checkNumberOfFilesOnS3(long count, String text, int waitSec) { AmazonS3Client s3Client = getS3client(); ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME); doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME) @@ -145,4 +163,37 @@ private boolean isSolrEmpty(SolrQuery query) { throw new UncheckedIOException(e); } } + + @Then("Check $count files exists on hdfs with filenames containing the text $text in the folder $path after $waitSec seconds") + public void checkNumberOfFilesOnHdfs(int count, String text, String path, int waitSec) throws Exception { + try (FileSystem fileSystem = getHdfs()) { + doWithin(waitSec, "check uploaded files to hdfs", () -> { + try { + int fileCount = 0; + RemoteIterator it = fileSystem.listFiles(new Path(path), true); + while (it.hasNext()) { + if (it.next().getPath().getName().contains(text)) + ++fileCount; + } + return fileCount == count; + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + } + + @Then("Check $count files exists on local filesystem with filenames containing the text $text in the folder $path") + public void checkNumberOfFilesOnLocalFilesystem(long count, String text, String path) { + File destinationDirectory = new File(getLocalDataFolder(), path); + LOG.info("Destination directory path: {}", destinationDirectory.getAbsolutePath()); + doWithin(5, "Destination directory exists", destinationDirectory::exists); + + File[] files = requireNonNull(destinationDirectory.listFiles(), + String.format("Path %s is not a directory or an I/O error occurred!", destinationDirectory.getAbsolutePath())); + assertThat(Arrays.stream(files) + .filter(file -> file.getName().contains(text)) + .count(), is(count)); + } } diff --git a/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story b/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story index 10442588161..d1eb4a43099 100644 --- a/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story +++ b/ambari-infra/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story @@ -12,6 +12,7 @@ When start archive_audit_logs job with parameters writeBlockSize=3,start=2010-10 Then Check 4 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2010-10-09 after 20 seconds And solr does not contain documents between 2010-10-09T05:00:00.000Z and 2010-10-09T20:00:00.000Z after 5 seconds + Scenario: Running archiving job with a bigger start value than end value exports and deletes 0 documents Given 10 documents in solr with logtime from 2010-01-01T05:00:00.000Z to 2010-01-04T05:00:00.000Z @@ -19,6 +20,7 @@ When start archive_audit_logs job with parameters writeBlockSize=3,start=2010-01 Then No file exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2010-01-0 And solr contains 10 documents between 2010-01-01T05:00:00.000Z and 2010-01-04T05:00:00.000Z + Scenario: Archiving job fails when part of the data is exported. After resolving the issue and restarting the job exports the rest of the data. Given 200 documents in solr with logtime from 2011-10-09T05:00:00.000Z to 2011-10-09T20:00:00.000Z @@ -27,12 +29,29 @@ When start archive_audit_logs job with parameters writeBlockSize=20,start=2010-1 Then Check 3 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2011-10-09 after 20 seconds And solr does not contain documents between 2011-10-09T05:00:00.000Z and 2011-10-09T07:59:59.999Z after 5 seconds When delete file with key solr_archive_audit_logs_-_2011-10-09T08-00-00.000Z.json.tar.gz from s3 -And restart archive_audit_logs job with parameters writeBlockSize=20,start=2010-11-09T00:00:00.000Z,end=2011-10-11T00:00:00.000Z +And restart archive_audit_logs job Then Check 10 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2011-10-09 after 20 seconds And solr does not contain documents between 2011-10-09T05:00:00.000Z and 2011-10-09T20:00:00.000Z after 5 seconds + Scenario: After Deleting job deletes documents from solr no document found in the specified interval Given 10 documents in solr with logtime from 2012-10-09T05:00:00.000Z to 2012-10-09T20:00:00.000Z When start delete_audit_logs job with parameters start=2012-10-09T05:00:00.000Z,end=2012-10-09T20:00:00.000Z -Then solr does not contain documents between 2012-10-09T05:00:00.000Z and 2012-10-09T20:00:00.000Z after 5 seconds \ No newline at end of file +Then solr does not contain documents between 2012-10-09T05:00:00.000Z and 2012-10-09T20:00:00.000Z after 5 seconds + + +Scenario: Archiving documents to hdfs + +Given 1000 documents in solr with logtime from 2014-01-04T05:00:00.000Z to 2014-01-06T20:00:00.000Z +When start archive_audit_logs job with parameters start=2014-01-04T05:00:00.000Z,end=2014-01-06T20:00:00.000Z,destination=HDFS +Then Check 7 files exists on hdfs with filenames containing the text audit_logs_-_2014-01-0 in the folder /test_audit_logs after 10 seconds +And solr does not contain documents between 2014-01-04T05:00:00.000Z and 2014-01-06T20:00:00.000Z after 10 seconds + + +Scenario: Archiving documents to local filesystem + +Given 200 documents in solr with logtime from 2014-02-04T05:00:00.000Z to 2014-02-06T20:00:00.000Z +When start archive_audit_logs job with parameters start=2014-02-04T05:00:00.000Z,end=2014-02-06T20:00:00.000Z,destination=LOCAL,localDestinationDirectory=/root/archive +Then Check 2 files exists on local filesystem with filenames containing the text audit_logs_-_2014-02-0 in the folder audit_logs_8_2014-02-06T20-00-00.000Z +And solr does not contain documents between 2014-02-04T05:00:00.000Z and 2014-02-06T20:00:00.000Z after 10 seconds diff --git a/ambari-infra/ambari-infra-manager/.gitignore b/ambari-infra/ambari-infra-manager/.gitignore index ba4e51d69be..94b38299dda 100644 --- a/ambari-infra/ambari-infra-manager/.gitignore +++ b/ambari-infra/ambari-infra-manager/.gitignore @@ -1,2 +1,5 @@ out/* -*.pid \ No newline at end of file +*.pid +Profile +.env +test-out \ No newline at end of file diff --git a/ambari-infra/ambari-infra-manager/docker/docker-compose.yml b/ambari-infra/ambari-infra-manager/docker/docker-compose.yml index 1172631917c..4f1febd43f4 100644 --- a/ambari-infra/ambari-infra-manager/docker/docker-compose.yml +++ b/ambari-infra/ambari-infra-manager/docker/docker-compose.yml @@ -45,19 +45,39 @@ services: - ${ZOOKEEPER_CONNECTION_STRING} volumes: - $AMBARI_LOCATION/ambari-logsearch/ambari-logsearch-server/src/main/configsets:/opt/solr/configsets - localstack-s3: + fakes3: image: localstack/localstack + hostname: fakes3 ports: - "4569:4569" environment: - SERVICES=s3:4569 - hostname: fakes3 networks: infra-network: aliases: - testbucket.fakes3 env_file: - Profile + namenode: + image: flokkr/hadoop-hdfs-namenode:${HADOOP_VERSION:-3.0.0} + hostname: namenode + ports: + - 9870:9870 + - 9000:9000 + env_file: + - Profile + environment: + ENSURE_NAMENODE_DIR: "/tmp/hadoop-hdfs/dfs/name" + networks: + - infra-network + datanode: + image: flokkr/hadoop-hdfs-datanode:${HADOOP_VERSION:-3.0.0} + links: + - namenode + env_file: + - Profile + networks: + - infra-network inframanager: image: ambari-infra-manager:v1.0 restart: always @@ -76,6 +96,7 @@ services: DISPLAY: $DOCKERIP:0 volumes: - $AMBARI_LOCATION/ambari-infra/ambari-infra-manager/target/package:/root/ambari-infra-manager + - $AMBARI_LOCATION/ambari-infra/ambari-infra-manager/docker/test-out:/root/archive networks: infra-network: driver: bridge diff --git a/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh b/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh index 6ddb7c292af..e5df48cc559 100755 --- a/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh +++ b/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh @@ -20,12 +20,12 @@ command="$1" function start_containers() { check_env_files - echo "Start containers ..." + kill_containers pushd $sdir/../ local AMBARI_INFRA_MANAGER_LOCATION=$(pwd) echo $AMBARI_INFRA_MANAGER_LOCATION - kill_containers cd $AMBARI_INFRA_MANAGER_LOCATION/docker + echo "Start containers ..." docker-compose up -d popd echo "Containers started" @@ -73,25 +73,37 @@ ZOOKEEPER_VERSION=3.4.10 ZOOKEEPER_CONNECTION_STRING=zookeeper:2181 SOLR_VERSION=6.6.2 + +HADOOP_VERSION=3.0.0 EOF } function setup_profile() { - pushd $sdir/../../ - local AMBARI_LOCATION=$(pwd) - popd cat << EOF > $sdir/Profile AWS_ACCESS_KEY_ID=test AWS_SECRET_ACCESS_KEY=test +HADOOP_USER_NAME=root + +CORE-SITE.XML_fs.default.name=hdfs://namenode:9000 +CORE-SITE.XML_fs.defaultFS=hdfs://namenode:9000 +HDFS-SITE.XML_dfs.namenode.rpc-address=namenode:9000 +HDFS-SITE.XML_dfs.replication=1 EOF } function kill_containers() { + pushd $sdir/../ + local AMBARI_INFRA_MANAGER_LOCATION=$(pwd) echo "Try to remove containers if exists ..." - docker rm -f docker_inframanager_1 - docker rm -f docker_solr_1 - docker rm -f docker_zookeeper_1 - docker rm -f docker_localstack-s3_1 + echo $AMBARI_INFRA_MANAGER_LOCATION + cd $AMBARI_INFRA_MANAGER_LOCATION/docker + docker-compose rm -f -s inframanager + docker-compose rm -f -s solr + docker-compose rm -f -s zookeeper + docker-compose rm -f -s fakes3 + docker-compose rm -f -s namenode + docker-compose rm -f -s datanode + popd } case $command in diff --git a/ambari-infra/ambari-infra-manager/pom.xml b/ambari-infra/ambari-infra-manager/pom.xml index de131b03f6b..75adb8dc43d 100644 --- a/ambari-infra/ambari-infra-manager/pom.xml +++ b/ambari-infra/ambari-infra-manager/pom.xml @@ -277,7 +277,7 @@ org.apache.hadoop hadoop-common - 2.7.0 + ${hadoop.version} javax.servlet @@ -311,6 +311,33 @@ org.slf4j slf4j-log4j12 + + org.eclipse.jetty + jetty-server + + + org.eclipse.jetty + jetty-util + + + org.eclipse.jetty + jetty-servlet + + + org.eclipse.jetty + jetty-security + + + + + org.apache.hadoop + hadoop-hdfs-client + ${hadoop.version} + + + com.fasterxml.jackson.core + jackson-annotations + diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java index 0841dd7fd9c..292e15e280a 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java @@ -18,7 +18,7 @@ */ package org.apache.ambari.infra.job; -import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.batch.core.JobParameters; import java.io.IOException; diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java index 7588b9992a3..4fdc5dab408 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java @@ -18,8 +18,10 @@ */ package org.apache.ambari.infra.job.archive; +import org.apache.ambari.infra.conf.InfraManagerDataConfig; import org.apache.ambari.infra.job.JobPropertyMap; import org.apache.ambari.infra.job.ObjectSource; +import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.Job; @@ -37,11 +39,13 @@ import javax.annotation.PostConstruct; import javax.inject.Inject; import java.io.File; -import java.nio.file.Paths; + +import static org.apache.commons.lang.StringUtils.isBlank; @Configuration public class DocumentArchivingConfiguration { private static final Logger LOG = LoggerFactory.getLogger(DocumentArchivingConfiguration.class); + private static final DocumentWiper NOT_DELETE = (firstDocument, lastDocument) -> { }; @Inject private DocumentExportPropertyMap propertyMap; @@ -62,6 +66,9 @@ public class DocumentArchivingConfiguration { @PostConstruct public void createJobs() { + if (propertyMap == null || propertyMap.getSolrDataExport() == null) + return; + propertyMap.getSolrDataExport().values().forEach(DocumentExportProperties::validate); propertyMap.getSolrDataExport().keySet().forEach(jobName -> { @@ -88,30 +95,57 @@ public Step exportStep(DocumentExporter documentExporter) { public DocumentExporter documentExporter(DocumentItemReader documentItemReader, @Value("#{stepExecution.jobExecution.id}") String jobId, @Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentExportProperties properties, - SolrDAO solrDAO) { - File path = Paths.get( - properties.getDestinationDirectoryPath(), - // TODO: jobId should remain the same after continuing job - String.format("%s_%s", properties.getSolr().getCollection(), jobId)).toFile(); // TODO: add end date - LOG.info("Destination directory path={}", path); - if (!path.exists()) { - if (!path.mkdirs()) { - LOG.warn("Unable to create directory {}", path); - } - } + InfraManagerDataConfig infraManagerDataConfig, + @Value("#{jobParameters[end]}") String intervalEnd, + DocumentWiper documentWiper) { + File baseDir = new File(infraManagerDataConfig.getDataFolder(), "exporting"); CompositeFileAction fileAction = new CompositeFileAction(new TarGzCompressor()); - properties.s3Properties().ifPresent(s3Properties -> fileAction.add(new S3Uploader(s3Properties))); + switch (properties.getDestination()) { + case S3: + fileAction.add(new S3Uploader(properties.s3Properties().orElseThrow(() -> new IllegalStateException("S3 properties are not provided!")))); + break; + case HDFS: + org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); + conf.set("fs.defaultFS", properties.getHdfsEndpoint()); + fileAction.add(new HdfsUploader(conf, new Path(properties.getHdfsDestinationDirectory()))); + break; + case LOCAL: + baseDir = new File(properties.getLocalDestinationDirectory()); + break; + } + FileNameSuffixFormatter fileNameSuffixFormatter = FileNameSuffixFormatter.from(properties); - LocalItemWriterListener itemWriterListener = new LocalItemWriterListener(fileAction, solrDAO); + LocalItemWriterListener itemWriterListener = new LocalItemWriterListener(fileAction, documentWiper); + File destinationDirectory = new File( + baseDir, + String.format("%s_%s_%s", + properties.getSolr().getCollection(), + jobId, + isBlank(intervalEnd) ? "" : fileNameSuffixFormatter.format(intervalEnd))); + LOG.info("Destination directory path={}", destinationDirectory); + if (!destinationDirectory.exists()) { + if (!destinationDirectory.mkdirs()) { + LOG.warn("Unable to create directory {}", destinationDirectory); + } + } return new DocumentExporter( documentItemReader, firstDocument -> new LocalDocumentItemWriter( - outFile(properties.getSolr().getCollection(), path, fileNameSuffixFormatter.format(firstDocument)), itemWriterListener), + outFile(properties.getSolr().getCollection(), destinationDirectory, fileNameSuffixFormatter.format(firstDocument)), itemWriterListener), properties.getWriteBlockSize()); } + @Bean + @StepScope + public DocumentWiper documentWiper(@Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentExportProperties properties, + SolrDAO solrDAO) { + if (isBlank(properties.getSolr().getDeleteQueryText())) + return NOT_DELETE; + return solrDAO; + } + @Bean @StepScope public SolrDAO solrDAO(@Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentExportProperties properties) { diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java index 1484eedfe35..d37edf4cc8d 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExportProperties.java @@ -31,13 +31,18 @@ import java.util.Optional; import java.util.function.Supplier; +import static java.util.Objects.requireNonNull; +import static org.apache.ambari.infra.job.archive.ExportDestination.HDFS; +import static org.apache.ambari.infra.job.archive.ExportDestination.LOCAL; +import static org.apache.ambari.infra.job.archive.ExportDestination.S3; import static org.apache.commons.csv.CSVFormat.DEFAULT; import static org.apache.commons.lang.StringUtils.isBlank; public class DocumentExportProperties extends JobProperties { private int readBlockSize; private int writeBlockSize; - private String destinationDirectoryPath; + private ExportDestination destination; + private String localDestinationDirectory; private String fileNameSuffixColumn; private String fileNameSuffixDateFormat; private SolrProperties solr; @@ -47,6 +52,9 @@ public class DocumentExportProperties extends JobProperties> s3Properties; + private String hdfsEndpoint; + private String hdfsDestinationDirectory; + public DocumentExportProperties() { super(DocumentExportProperties.class); s3Properties = this::loadS3Properties; @@ -101,12 +109,20 @@ public void setWriteBlockSize(int writeBlockSize) { this.writeBlockSize = writeBlockSize; } - public String getDestinationDirectoryPath() { - return destinationDirectoryPath; + public ExportDestination getDestination() { + return destination; + } + + public void setDestination(ExportDestination destination) { + this.destination = destination; + } + + public String getLocalDestinationDirectory() { + return localDestinationDirectory; } - public void setDestinationDirectoryPath(String destinationDirectoryPath) { - this.destinationDirectoryPath = destinationDirectoryPath; + public void setLocalDestinationDirectory(String localDestinationDirectory) { + this.localDestinationDirectory = localDestinationDirectory; } public String getFileNameSuffixColumn() { @@ -170,11 +186,34 @@ public Optional s3Properties() { return s3Properties.get(); } + public String getHdfsEndpoint() { + return hdfsEndpoint; + } + + public void setHdfsEndpoint(String hdfsEndpoint) { + this.hdfsEndpoint = hdfsEndpoint; + } + + public String getHdfsDestinationDirectory() { + return hdfsDestinationDirectory; + } + + public void setHdfsDestinationDirectory(String hdfsDestinationDirectory) { + this.hdfsDestinationDirectory = hdfsDestinationDirectory; + } + @Override public void apply(JobParameters jobParameters) { readBlockSize = getIntJobParameter(jobParameters, "readBlockSize", readBlockSize); writeBlockSize = getIntJobParameter(jobParameters, "writeBlockSize", writeBlockSize); - destinationDirectoryPath = jobParameters.getString("destinationDirectoryPath", destinationDirectoryPath); + destination = ExportDestination.valueOf(jobParameters.getString("destination", destination.name())); + localDestinationDirectory = jobParameters.getString("localDestinationDirectory", localDestinationDirectory); + s3AccessFile = jobParameters.getString("s3AccessFile", s3AccessFile); + s3BucketName = jobParameters.getString("s3BucketName", s3BucketName); + s3KeyPrefix = jobParameters.getString("s3KeyPrefix", s3KeyPrefix); + s3Endpoint = jobParameters.getString("s3Endpoint", s3Endpoint); + hdfsEndpoint = jobParameters.getString("hdfsEndpoint", hdfsEndpoint); + hdfsDestinationDirectory = jobParameters.getString("hdfsDestinationDirectory", hdfsDestinationDirectory); solr.apply(jobParameters); } @@ -193,13 +232,33 @@ public void validate() { if (writeBlockSize == 0) throw new IllegalArgumentException("The property writeBlockSize must be greater than 0!"); - if (isBlank(destinationDirectoryPath)) - throw new IllegalArgumentException("The property destinationDirectoryPath can not be null or empty string!"); - - if (isBlank(fileNameSuffixColumn)) + if (isBlank(fileNameSuffixColumn)) { throw new IllegalArgumentException("The property fileNameSuffixColumn can not be null or empty string!"); + } + + requireNonNull(destination, "The property destination can not be null!"); + switch (destination) { + case LOCAL: + if (isBlank(localDestinationDirectory)) + throw new IllegalArgumentException(String.format( + "The property localDestinationDirectory can not be null or empty string when destination is set to %s!", LOCAL.name())); + break; + + case S3: + s3Properties() + .orElseThrow(() -> new IllegalArgumentException("S3 related properties must be set if the destination is " + S3.name())) + .validate(); + break; + + case HDFS: + if (isBlank(hdfsEndpoint)) + throw new IllegalArgumentException(String.format( + "The property hdfsEndpoint can not be null or empty string when destination is set to %s!", HDFS.name())); + if (isBlank(hdfsDestinationDirectory)) + throw new IllegalArgumentException(String.format( + "The property hdfsDestinationDirectory can not be null or empty string when destination is set to %s!", HDFS.name())); + } solr.validate(); - s3Properties().ifPresent(S3Properties::validate); } } diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ExportDestination.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ExportDestination.java new file mode 100644 index 00000000000..a143e4c546a --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ExportDestination.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +public enum ExportDestination { + LOCAL, + HDFS, + S3 +} diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java index af48ab9aaae..9b4e662e100 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java @@ -21,6 +21,7 @@ import java.time.OffsetDateTime; import java.time.format.DateTimeFormatter; +import static java.util.Objects.requireNonNull; import static org.apache.ambari.infra.job.archive.SolrDocumentIterator.SOLR_DATE_FORMAT_TEXT; import static org.apache.commons.lang.StringUtils.isBlank; @@ -42,15 +43,21 @@ public FileNameSuffixFormatter(String columnName, String dateTimeFormat) { } public String format(Document document) { - if (document == null) - throw new NullPointerException("Can not format file name suffix: input document is null!"); + requireNonNull(document, "Can not format file name suffix: input document is null!"); if (isBlank(document.get(columnName))) throw new IllegalArgumentException("The specified document does not have a column " + columnName + " or it's value is blank!"); + return format(document.get(columnName)); + } + + public String format(String value) { + if (isBlank(value)) + throw new IllegalArgumentException("The specified value is blank!"); + if (dateFormat == null) - return document.get(columnName); - OffsetDateTime date = OffsetDateTime.parse(document.get(columnName), SOLR_DATETIME_FORMATTER); + return value; + OffsetDateTime date = OffsetDateTime.parse(value, SOLR_DATETIME_FORMATTER); return date.format(dateFormat); } } diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsUploader.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsUploader.java new file mode 100644 index 00000000000..0f7b99fcc24 --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsUploader.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; + +public class HdfsUploader extends AbstractFileAction { + + private final Configuration configuration; + private final Path destinationDirectory; + + public HdfsUploader(Configuration configuration, Path destinationDirectory) { + this.destinationDirectory = destinationDirectory; + this.configuration = configuration; + } + + @Override + protected File onPerform(File inputFile) { + try (FileSystem fileSystem = FileSystem.get(configuration)) { + Path destination = new Path(destinationDirectory, inputFile.getName()); + if (fileSystem.exists(destination)) { + throw new UnsupportedOperationException(String.format("File '%s' already exists!", destination)); + } + + fileSystem.copyFromLocalFile(new Path(inputFile.getAbsolutePath()), destination); + + return inputFile; + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingConfiguration.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingConfiguration.java index 4fce4b677f6..ce8970d87e6 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingConfiguration.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingConfiguration.java @@ -58,6 +58,9 @@ public class DocumentDeletingConfiguration { @PostConstruct public void createJobs() { + if (propertyMap == null || propertyMap.getSolrDataDeleting() == null) + return; + propertyMap.getSolrDataDeleting().values().forEach(DocumentDeletingProperties::validate); propertyMap.getSolrDataDeleting().keySet().forEach(jobName -> { diff --git a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/model/JobExecutionRestartRequest.java b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/model/JobExecutionRestartRequest.java index 88687e72600..3eab25f4e42 100644 --- a/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/model/JobExecutionRestartRequest.java +++ b/ambari-infra/ambari-infra-manager/src/main/java/org/apache/ambari/infra/model/JobExecutionRestartRequest.java @@ -18,12 +18,22 @@ */ package org.apache.ambari.infra.model; +import javax.validation.constraints.NotNull; +import javax.ws.rs.PathParam; +import javax.ws.rs.QueryParam; + public class JobExecutionRestartRequest { + @PathParam("jobName") + @NotNull private String jobName; + @PathParam("jobInstanceId") + @NotNull private Long jobInstanceId; + @QueryParam("operation") + @NotNull private JobOperationParams.JobRestartOperationParam operation; public String getJobName() { diff --git a/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties b/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties index 9103d09e7e6..61c08e5631d 100644 --- a/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties +++ b/ambari-infra/ambari-infra-manager/src/main/resources/infra-manager.properties @@ -18,7 +18,7 @@ infra-manager.batch.db.username=admin infra-manager.batch.db.password=admin management.security.enabled=false management.health.solr.enabled=false -infra-manager.server.data.folder=/tmp +infra-manager.server.data.folder=/tmp/ambariInfraManager infra-manager.jobs.solr_data_export.export_service_logs.solr.zoo_keeper_connection_string=zookeeper:2181 infra-manager.jobs.solr_data_export.export_service_logs.solr.collection=hadoop_logs @@ -28,9 +28,10 @@ infra-manager.jobs.solr_data_export.export_service_logs.solr.sort_column[0]=logt infra-manager.jobs.solr_data_export.export_service_logs.solr.sort_column[1]=id infra-manager.jobs.solr_data_export.export_service_logs.read_block_size=100 infra-manager.jobs.solr_data_export.export_service_logs.write_block_size=150 +infra-manager.jobs.solr_data_export.export_service_logs.destination=LOCAL +infra-manager.jobs.solr_data_export.export_service_logs.local_destination_directory=/tmp/ambariInfraManager infra-manager.jobs.solr_data_export.export_service_logs.file_name_suffix_column=logtime infra-manager.jobs.solr_data_export.export_service_logs.file_name_suffix_date_format=yyyy-MM-dd'T'HH-mm-ss.SSSX -infra-manager.jobs.solr_data_export.export_service_logs.destination_directory_path=/tmp/ambariInfraManager infra-manager.jobs.solr_data_export.archive_audit_logs.solr.zoo_keeper_connection_string=zookeeper:2181 infra-manager.jobs.solr_data_export.archive_audit_logs.solr.collection=audit_logs infra-manager.jobs.solr_data_export.archive_audit_logs.solr.query_text=logtime:[${start} TO ${end}] @@ -40,10 +41,12 @@ infra-manager.jobs.solr_data_export.archive_audit_logs.solr.sort_column[1]=id infra-manager.jobs.solr_data_export.archive_audit_logs.solr.delete_query_text=logtime:[${start.logtime} TO ${end.logtime}} OR (logtime:${end.logtime} AND id:[* TO ${end.id}]) infra-manager.jobs.solr_data_export.archive_audit_logs.read_block_size=100 infra-manager.jobs.solr_data_export.archive_audit_logs.write_block_size=150 +infra-manager.jobs.solr_data_export.archive_audit_logs.destination=S3 # TODO: logtime may not be enough: The same filename can be generated when more than write_block_size count docs has the same logtime value infra-manager.jobs.solr_data_export.archive_audit_logs.file_name_suffix_column=logtime infra-manager.jobs.solr_data_export.archive_audit_logs.file_name_suffix_date_format=yyyy-MM-dd'T'HH-mm-ss.SSSX -infra-manager.jobs.solr_data_export.archive_audit_logs.destination_directory_path=/tmp/ambariInfraManager +infra-manager.jobs.solr_data_export.archive_audit_logs.hdfs_endpoint=hdfs://namenode:9000/ +infra-manager.jobs.solr_data_export.archive_audit_logs.hdfs_destination_directory=/test_audit_logs #infra-manager.jobs.solr_data_export.archive_audit_logs.s3_access_file=.csv infra-manager.jobs.solr_data_export.archive_audit_logs.s3_key_prefix=solr_archive_ infra-manager.jobs.solr_data_export.archive_audit_logs.s3_bucket_name=testbucket diff --git a/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobPropertiesTest.java b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobPropertiesTest.java index 6a7622954f5..21bcdb798e0 100644 --- a/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobPropertiesTest.java +++ b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobPropertiesTest.java @@ -29,7 +29,7 @@ public class JobPropertiesTest { @Test public void testDeepCopy() throws Exception { DocumentExportProperties documentExportProperties = new DocumentExportProperties(); - documentExportProperties.setDestinationDirectoryPath("/tmp"); + documentExportProperties.setLocalDestinationDirectory("/tmp"); documentExportProperties.setFileNameSuffixColumn(".json"); documentExportProperties.setReadBlockSize(10); documentExportProperties.setWriteBlockSize(20); @@ -43,7 +43,7 @@ public void testDeepCopy() throws Exception { DocumentExportProperties parsed = documentExportProperties.deepCopy(); - assertThat(parsed.getDestinationDirectoryPath(), is(documentExportProperties.getDestinationDirectoryPath())); + assertThat(parsed.getLocalDestinationDirectory(), is(documentExportProperties.getLocalDestinationDirectory())); assertThat(parsed.getFileNameSuffixColumn(), is(documentExportProperties.getFileNameSuffixColumn())); assertThat(parsed.getReadBlockSize(), is(documentExportProperties.getReadBlockSize())); assertThat(parsed.getWriteBlockSize(), is(documentExportProperties.getWriteBlockSize())); diff --git a/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatterTest.java b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatterTest.java index 34e679f298c..cca2c1a503e 100644 --- a/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatterTest.java +++ b/ambari-infra/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatterTest.java @@ -32,7 +32,7 @@ public class FileNameSuffixFormatterTest { @Test(expected = NullPointerException.class) public void testFormatWhenDocumentIsNullThrowingException() throws Exception { - formatter.format(null); + formatter.format((Document) null); } @Test(expected = IllegalArgumentException.class) diff --git a/ambari-infra/ambari-infra-manager/src/test/resoruces/vagrant-infra-manager.properties.sample b/ambari-infra/ambari-infra-manager/src/test/resoruces/vagrant-infra-manager.properties.sample new file mode 100644 index 00000000000..f008a5319de --- /dev/null +++ b/ambari-infra/ambari-infra-manager/src/test/resoruces/vagrant-infra-manager.properties.sample @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +infra-manager.batch.db.file=job-repository.db +infra-manager.batch.db.init=true +infra-manager.batch.db.username=admin +infra-manager.batch.db.password=admin +management.security.enabled=false +management.health.solr.enabled=false +infra-manager.server.data.folder=/tmp/ambariInfraManager + +infra-manager.jobs.solr_data_export.archive_service_logs.solr.zoo_keeper_connection_string=c6401.ambari.apache.org:2181/infra-solr +infra-manager.jobs.solr_data_export.archive_service_logs.solr.collection=hadoop_logs +infra-manager.jobs.solr_data_export.archive_service_logs.solr.query_text=logtime:[${start} TO ${end}] +infra-manager.jobs.solr_data_export.archive_service_logs.solr.filter_query_text=(logtime:${logtime} AND id:{${id} TO *]) OR logtime:{${logtime} TO ${end}] +infra-manager.jobs.solr_data_export.archive_service_logs.solr.sort_column[0]=logtime +infra-manager.jobs.solr_data_export.archive_service_logs.solr.sort_column[1]=id +infra-manager.jobs.solr_data_export.archive_service_logs.solr.delete_query_text=logtime:[${start.logtime} TO ${end.logtime}} OR (logtime:${end.logtime} AND id:[* TO ${end.id}]) +infra-manager.jobs.solr_data_export.archive_service_logs.read_block_size=2000 +infra-manager.jobs.solr_data_export.archive_service_logs.write_block_size=1000 +infra-manager.jobs.solr_data_export.archive_service_logs.destination=HDFS +infra-manager.jobs.solr_data_export.archive_service_logs.file_name_suffix_column=logtime +infra-manager.jobs.solr_data_export.archive_service_logs.file_name_suffix_date_format=yyyy-MM-dd'T'HH-mm-ss.SSSX +infra-manager.jobs.solr_data_export.archive_service_logs.hdfs_endpoint=hdfs://c6401.ambari.apache.org:8020 +infra-manager.jobs.solr_data_export.archive_service_logs.hdfs_destination_directory=/archived_service_logs +# Note: set hdfs user using the HADOOP_USER_NAME environmental variable. Value: hdfs \ No newline at end of file diff --git a/ambari-infra/pom.xml b/ambari-infra/pom.xml index 9e7a71bfa7b..240049336ce 100644 --- a/ambari-infra/pom.xml +++ b/ambari-infra/pom.xml @@ -36,6 +36,7 @@ python (>= 2.6) amd64 ${deb.python.ver} + 3.0.0