Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ambari-infra/ambari-infra-manager/docker/bin/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@

export INFRA_MANAGER_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=5007,server=y,suspend=n"
touch /root/infra-manager.log
/root/ambari-infra-manager/infraManager.sh > /root/infra-manager.log
/root/ambari-infra-manager/bin/infraManager.sh start > /root/infra-manager.log
tail -f /root/infra-manager.log

Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ HADOOP_VERSION=3.0.0
EOF
}

function get_docker_ip() {
local ip=$(ifconfig en0 | grep inet | awk '$1=="inet" {print $2}')
echo $ip
}

function setup_profile() {
cat << EOF > $sdir/Profile
AWS_ACCESS_KEY_ID=test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class DocumentArchivingConfiguration {
private static final DocumentWiper NOT_DELETE = (firstDocument, lastDocument) -> { };

@Inject
private DocumentExportPropertyMap propertyMap;
private DocumentArchivingPropertyMap propertyMap;

@Inject
private StepBuilderFactory steps;
Expand All @@ -66,12 +66,12 @@ public class DocumentArchivingConfiguration {

@PostConstruct
public void createJobs() {
if (propertyMap == null || propertyMap.getSolrDataExport() == null)
if (propertyMap == null || propertyMap.getSolrDataArchiving() == null)
return;

propertyMap.getSolrDataExport().values().forEach(DocumentExportProperties::validate);
propertyMap.getSolrDataArchiving().values().forEach(DocumentArchivingProperties::validate);

propertyMap.getSolrDataExport().keySet().forEach(jobName -> {
propertyMap.getSolrDataArchiving().keySet().forEach(jobName -> {
LOG.info("Registering data archiving job {}", jobName);
Job job = logExportJob(jobName, exportStep);
jobRegistryBeanPostProcessor.postProcessAfterInitialization(job, jobName);
Expand All @@ -94,7 +94,7 @@ public Step exportStep(DocumentExporter documentExporter) {
@StepScope
public DocumentExporter documentExporter(DocumentItemReader documentItemReader,
@Value("#{stepExecution.jobExecution.id}") String jobId,
@Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentExportProperties properties,
@Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentArchivingProperties properties,
InfraManagerDataConfig infraManagerDataConfig,
@Value("#{jobParameters[end]}") String intervalEnd,
DocumentWiper documentWiper) {
Expand Down Expand Up @@ -139,7 +139,7 @@ public DocumentExporter documentExporter(DocumentItemReader documentItemReader,

@Bean
@StepScope
public DocumentWiper documentWiper(@Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentExportProperties properties,
public DocumentWiper documentWiper(@Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentArchivingProperties properties,
SolrDAO solrDAO) {
if (isBlank(properties.getSolr().getDeleteQueryText()))
return NOT_DELETE;
Expand All @@ -148,7 +148,7 @@ public DocumentWiper documentWiper(@Value("#{stepExecution.jobExecution.executio

@Bean
@StepScope
public SolrDAO solrDAO(@Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentExportProperties properties) {
public SolrDAO solrDAO(@Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentArchivingProperties properties) {
return new SolrDAO(properties.getSolr());
}

Expand All @@ -161,7 +161,7 @@ private File outFile(String collection, File directoryPath, String suffix) {
@Bean
@StepScope
public DocumentItemReader reader(ObjectSource<Document> documentSource,
@Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentExportProperties properties) {
@Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentArchivingProperties properties) {
return new DocumentItemReader(documentSource, properties.getReadBlockSize());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import static org.apache.commons.csv.CSVFormat.DEFAULT;
import static org.apache.commons.lang.StringUtils.isBlank;

public class DocumentExportProperties extends JobProperties<DocumentExportProperties> {
public class DocumentArchivingProperties extends JobProperties<DocumentArchivingProperties> {
private int readBlockSize;
private int writeBlockSize;
private ExportDestination destination;
Expand All @@ -55,8 +55,8 @@ public class DocumentExportProperties extends JobProperties<DocumentExportProper
private String hdfsEndpoint;
private String hdfsDestinationDirectory;

public DocumentExportProperties() {
super(DocumentExportProperties.class);
public DocumentArchivingProperties() {
super(DocumentArchivingProperties.class);
s3Properties = this::loadS3Properties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,19 @@

@Configuration
@ConfigurationProperties(prefix = "infra-manager.jobs")
public class DocumentExportPropertyMap implements PropertyMap<DocumentExportProperties> {
private Map<String, DocumentExportProperties> solrDataExport;
public class DocumentArchivingPropertyMap implements PropertyMap<DocumentArchivingProperties> {
private Map<String, DocumentArchivingProperties> solrDataArchiving;

public Map<String, DocumentExportProperties> getSolrDataExport() {
return solrDataExport;
public Map<String, DocumentArchivingProperties> getSolrDataArchiving() {
return solrDataArchiving;
}

public void setSolrDataExport(Map<String, DocumentExportProperties> solrDataExport) {
this.solrDataExport = solrDataExport;
public void setSolrDataArchiving(Map<String, DocumentArchivingProperties> solrDataArchiving) {
this.solrDataArchiving = solrDataArchiving;
}

@Override
public Map<String, DocumentExportProperties> getPropertyMap() {
return getSolrDataExport();
public Map<String, DocumentArchivingProperties> getPropertyMap() {
return getSolrDataArchiving();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public class FileNameSuffixFormatter {
private static final DateTimeFormatter SOLR_DATETIME_FORMATTER = DateTimeFormatter.ofPattern(SOLR_DATE_FORMAT_TEXT);

public static FileNameSuffixFormatter from(DocumentExportProperties properties) {
public static FileNameSuffixFormatter from(DocumentArchivingProperties properties) {
return new FileNameSuffixFormatter(properties.getFileNameSuffixColumn(), properties.getFileNameSuffixDateFormat());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export INFRA_MANAGER_OPTS=""

# Log Search debug options
# export INFRA_MANAGER_DEBUG=true
# exoprt INFRA_MANAGER_DEBUG_SUSPEND=n
# export INFRA_MANAGER_DEBUG_SUSPEND=n
export INFRA_MANAGER_DEBUG_PORT=5005

# Log Search memory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

infra-manager.batch.db.file=job-repository.db
infra-manager.batch.db.init=true
infra-manager.batch.db.username=admin
Expand All @@ -20,48 +21,48 @@ management.security.enabled=false
management.health.solr.enabled=false
infra-manager.server.data.folder=/tmp/ambariInfraManager

infra-manager.jobs.solr_data_export.export_service_logs.solr.zoo_keeper_connection_string=zookeeper:2181
infra-manager.jobs.solr_data_export.export_service_logs.solr.collection=hadoop_logs
infra-manager.jobs.solr_data_export.export_service_logs.solr.query_text=logtime:[${start} TO ${end}]
infra-manager.jobs.solr_data_export.export_service_logs.solr.filter_query_text=(logtime:${logtime} AND id:{${id} TO *]) OR logtime:{${logtime} TO ${end}]
infra-manager.jobs.solr_data_export.export_service_logs.solr.sort_column[0]=logtime
infra-manager.jobs.solr_data_export.export_service_logs.solr.sort_column[1]=id
infra-manager.jobs.solr_data_export.export_service_logs.read_block_size=100
infra-manager.jobs.solr_data_export.export_service_logs.write_block_size=150
infra-manager.jobs.solr_data_export.export_service_logs.destination=LOCAL
infra-manager.jobs.solr_data_export.export_service_logs.local_destination_directory=/tmp/ambariInfraManager
infra-manager.jobs.solr_data_export.export_service_logs.file_name_suffix_column=logtime
infra-manager.jobs.solr_data_export.export_service_logs.file_name_suffix_date_format=yyyy-MM-dd'T'HH-mm-ss.SSSX
infra-manager.jobs.solr_data_export.archive_audit_logs.solr.zoo_keeper_connection_string=zookeeper:2181
infra-manager.jobs.solr_data_export.archive_audit_logs.solr.collection=audit_logs
infra-manager.jobs.solr_data_export.archive_audit_logs.solr.query_text=logtime:[${start} TO ${end}]
infra-manager.jobs.solr_data_export.archive_audit_logs.solr.filter_query_text=(logtime:${logtime} AND id:{${id} TO *]) OR logtime:{${logtime} TO ${end}]
infra-manager.jobs.solr_data_export.archive_audit_logs.solr.sort_column[0]=logtime
infra-manager.jobs.solr_data_export.archive_audit_logs.solr.sort_column[1]=id
infra-manager.jobs.solr_data_export.archive_audit_logs.solr.delete_query_text=logtime:[${start.logtime} TO ${end.logtime}} OR (logtime:${end.logtime} AND id:[* TO ${end.id}])
infra-manager.jobs.solr_data_export.archive_audit_logs.read_block_size=100
infra-manager.jobs.solr_data_export.archive_audit_logs.write_block_size=150
infra-manager.jobs.solr_data_export.archive_audit_logs.destination=HDFS
infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.zoo_keeper_connection_string=zookeeper:2181
infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.collection=hadoop_logs
infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.query_text=logtime:[${start} TO ${end}]
infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.filter_query_text=(logtime:${logtime} AND id:{${id} TO *]) OR logtime:{${logtime} TO ${end}]
infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.sort_column[0]=logtime
infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.sort_column[1]=id
infra-manager.jobs.solr_data_archiving.archive_service_logs.read_block_size=100
infra-manager.jobs.solr_data_archiving.archive_service_logs.write_block_size=150
infra-manager.jobs.solr_data_archiving.archive_service_logs.destination=LOCAL
infra-manager.jobs.solr_data_archiving.archive_service_logs.local_destination_directory=/tmp/ambariInfraManager
infra-manager.jobs.solr_data_archiving.archive_service_logs.file_name_suffix_column=logtime
infra-manager.jobs.solr_data_archiving.archive_service_logs.file_name_suffix_date_format=yyyy-MM-dd'T'HH-mm-ss.SSSX
infra-manager.jobs.solr_data_archiving.archive_audit_logs.solr.zoo_keeper_connection_string=zookeeper:2181
infra-manager.jobs.solr_data_archiving.archive_audit_logs.solr.collection=audit_logs
infra-manager.jobs.solr_data_archiving.archive_audit_logs.solr.query_text=logtime:[${start} TO ${end}]
infra-manager.jobs.solr_data_archiving.archive_audit_logs.solr.filter_query_text=(logtime:${logtime} AND id:{${id} TO *]) OR logtime:{${logtime} TO ${end}]
infra-manager.jobs.solr_data_archiving.archive_audit_logs.solr.sort_column[0]=logtime
infra-manager.jobs.solr_data_archiving.archive_audit_logs.solr.sort_column[1]=id
infra-manager.jobs.solr_data_archiving.archive_audit_logs.solr.delete_query_text=logtime:[${start.logtime} TO ${end.logtime}} OR (logtime:${end.logtime} AND id:[* TO ${end.id}])
infra-manager.jobs.solr_data_archiving.archive_audit_logs.read_block_size=100
infra-manager.jobs.solr_data_archiving.archive_audit_logs.write_block_size=150
infra-manager.jobs.solr_data_archiving.archive_audit_logs.destination=S3
# TODO: logtime may not be enough: The same filename can be generated when more than write_block_size count docs has the same logtime value
infra-manager.jobs.solr_data_export.archive_audit_logs.file_name_suffix_column=logtime
infra-manager.jobs.solr_data_export.archive_audit_logs.file_name_suffix_date_format=yyyy-MM-dd'T'HH-mm-ss.SSSX
infra-manager.jobs.solr_data_export.archive_audit_logs.hdfs_endpoint=hdfs://namenode:9000/
infra-manager.jobs.solr_data_export.archive_audit_logs.hdfs_destination_directory=/test_audit_logs
#infra-manager.jobs.solr_data_export.archive_audit_logs.s3_access_file=<any>.csv
infra-manager.jobs.solr_data_export.archive_audit_logs.s3_key_prefix=solr_archive_
infra-manager.jobs.solr_data_export.archive_audit_logs.s3_bucket_name=testbucket
infra-manager.jobs.solr_data_export.archive_audit_logs.s3_endpoint=http://fakes3:4569
infra-manager.jobs.solr_data_archiving.archive_audit_logs.file_name_suffix_column=logtime
infra-manager.jobs.solr_data_archiving.archive_audit_logs.file_name_suffix_date_format=yyyy-MM-dd'T'HH-mm-ss.SSSX
infra-manager.jobs.solr_data_archiving.archive_audit_logs.hdfs_endpoint=hdfs://namenode:9000/
infra-manager.jobs.solr_data_archiving.archive_audit_logs.hdfs_destination_directory=/test_audit_logs
#infra-manager.jobs.solr_data_archiving.archive_audit_logs.s3_access_file=<any>.csv
infra-manager.jobs.solr_data_archiving.archive_audit_logs.s3_key_prefix=solr_archive_
infra-manager.jobs.solr_data_archiving.archive_audit_logs.s3_bucket_name=testbucket
infra-manager.jobs.solr_data_archiving.archive_audit_logs.s3_endpoint=http://fakes3:4569
# TODO: configure ranger audit logs
#infra-manager.jobs.solr_data_export.export_ranger_audit_logs.zoo_keeper_connection_string=zookeeper:2181
#infra-manager.jobs.solr_data_export.export_ranger_audit_logs.read_block_size=100
#infra-manager.jobs.solr_data_export.export_ranger_audit_logs.write_block_size=150
#infra-manager.jobs.solr_data_export.export_ranger_audit_logs.file_name_suffix_column=logtime
#infra-manager.jobs.solr_data_export.export_ranger_audit_logs.destination_directory_path=/tmp/ambariInfraManager
#infra-manager.jobs.solr_data_export.export_ranger_audit_logs.query.collection=hadoop_logs
#infra-manager.jobs.solr_data_export.export_ranger_audit_logs.query.query_text=logtime:[* TO "${end}"]
#infra-manager.jobs.solr_data_export.export_ranger_audit_logs.query.filter_query_text=(logtime:"${logtime}" AND id:{"${id}" TO *]) OR logtime:{"${logtime}" TO "${end}"]
#infra-manager.jobs.solr_data_export.export_ranger_audit_logs.query.sort_column[0]=logtime
#infra-manager.jobs.solr_data_export.export_ranger_audit_logs.query.sort_column[1]=id
#infra-manager.jobs.solr_data_archiving.export_ranger_audit_logs.zoo_keeper_connection_string=zookeeper:2181
#infra-manager.jobs.solr_data_archiving.export_ranger_audit_logs.read_block_size=100
#infra-manager.jobs.solr_data_archiving.export_ranger_audit_logs.write_block_size=150
#infra-manager.jobs.solr_data_archiving.export_ranger_audit_logs.file_name_suffix_column=logtime
#infra-manager.jobs.solr_data_archiving.export_ranger_audit_logs.destination_directory_path=/tmp/ambariInfraManager
#infra-manager.jobs.solr_data_archiving.export_ranger_audit_logs.query.collection=hadoop_logs
#infra-manager.jobs.solr_data_archiving.export_ranger_audit_logs.query.query_text=logtime:[* TO "${end}"]
#infra-manager.jobs.solr_data_archiving.export_ranger_audit_logs.query.filter_query_text=(logtime:"${logtime}" AND id:{"${id}" TO *]) OR logtime:{"${logtime}" TO "${end}"]
#infra-manager.jobs.solr_data_archiving.export_ranger_audit_logs.query.sort_column[0]=logtime
#infra-manager.jobs.solr_data_archiving.export_ranger_audit_logs.query.sort_column[1]=id
infra-manager.jobs.solr_data_deleting.delete_audit_logs.zoo_keeper_connection_string=zookeeper:2181
infra-manager.jobs.solr_data_deleting.delete_audit_logs.collection=audit_logs
infra-manager.jobs.solr_data_deleting.delete_audit_logs.filter_field=logtime
infra-manager.jobs.solr_data_deleting.delete_audit_logs.filter_field=logtime
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.apache.ambari.infra.job;

import org.apache.ambari.infra.job.archive.DocumentExportProperties;
import org.apache.ambari.infra.job.archive.DocumentArchivingProperties;
import org.apache.ambari.infra.job.archive.SolrProperties;
import org.junit.Test;

Expand Down Expand Up @@ -28,26 +28,26 @@
public class JobPropertiesTest {
@Test
public void testDeepCopy() throws Exception {
DocumentExportProperties documentExportProperties = new DocumentExportProperties();
documentExportProperties.setLocalDestinationDirectory("/tmp");
documentExportProperties.setFileNameSuffixColumn(".json");
documentExportProperties.setReadBlockSize(10);
documentExportProperties.setWriteBlockSize(20);
DocumentArchivingProperties documentArchivingProperties = new DocumentArchivingProperties();
documentArchivingProperties.setLocalDestinationDirectory("/tmp");
documentArchivingProperties.setFileNameSuffixColumn(".json");
documentArchivingProperties.setReadBlockSize(10);
documentArchivingProperties.setWriteBlockSize(20);
SolrProperties solr = new SolrProperties();
solr.setZooKeeperConnectionString("localhost:2181");
solr.setFilterQueryText("id:1167");
solr.setQueryText("name:'Joe'");
solr.setCollection("Users");
solr.setSortColumn(new String[] {"name"});
documentExportProperties.setSolr(solr);
documentArchivingProperties.setSolr(solr);

DocumentExportProperties parsed = documentExportProperties.deepCopy();
DocumentArchivingProperties parsed = documentArchivingProperties.deepCopy();

assertThat(parsed.getLocalDestinationDirectory(), is(documentExportProperties.getLocalDestinationDirectory()));
assertThat(parsed.getFileNameSuffixColumn(), is(documentExportProperties.getFileNameSuffixColumn()));
assertThat(parsed.getReadBlockSize(), is(documentExportProperties.getReadBlockSize()));
assertThat(parsed.getWriteBlockSize(), is(documentExportProperties.getWriteBlockSize()));
assertThat(parsed.getSolr().getZooKeeperConnectionString(), is(documentExportProperties.getSolr().getZooKeeperConnectionString()));
assertThat(parsed.getLocalDestinationDirectory(), is(documentArchivingProperties.getLocalDestinationDirectory()));
assertThat(parsed.getFileNameSuffixColumn(), is(documentArchivingProperties.getFileNameSuffixColumn()));
assertThat(parsed.getReadBlockSize(), is(documentArchivingProperties.getReadBlockSize()));
assertThat(parsed.getWriteBlockSize(), is(documentArchivingProperties.getWriteBlockSize()));
assertThat(parsed.getSolr().getZooKeeperConnectionString(), is(documentArchivingProperties.getSolr().getZooKeeperConnectionString()));
assertThat(parsed.getSolr().getQueryText(), is(solr.getQueryText()));
assertThat(parsed.getSolr().getFilterQueryText(), is(solr.getFilterQueryText()));
assertThat(parsed.getSolr().getCollection(), is(solr.getCollection()));
Expand Down