Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract the native component without copying it into memory #198

Merged
merged 4 commits into from
Apr 10, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions java/amazon-kinesis-producer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,6 @@
<artifactId>protobuf-java</artifactId>
<version>2.6.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.13</version>
<optional>true</optional>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
Expand All @@ -109,13 +102,29 @@
<artifactId>confluex-mock-http</artifactId>
<version>0.4.3</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.2.22</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.7</version>
<scope>test</scope>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,6 @@ private void deletePipes() {
}

private void startChildProcess() throws IOException, InterruptedException {
log.info("Asking for trace");
List<String> args = new ArrayList<>(Arrays.asList(pathToExecutable, "-o", outPipe.getAbsolutePath(), "-i",
inPipe.getAbsolutePath(), "-c", protobufToHex(config.toProtobufMessage()), "-k",
protobufToHex(makeSetCredentialsMessage(config.getCredentialsProvider(), false)), "-t"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.producer;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.FileLock;
import java.security.DigestInputStream;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.util.Arrays;

import javax.xml.bind.DatatypeConverter;

import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HashedFileCopier {
private static final Logger log = LoggerFactory.getLogger(HashedFileCopier.class);

static final String MESSAGE_DIGEST_ALGORITHM = "SHA-1";

public static File copyFileFrom(InputStream sourceData, File destinationDirectory, String fileNameFormat)
throws Exception {
File tempFile = null;
try {
tempFile = File.createTempFile("kpl", ".tmp", destinationDirectory);
log.debug("Extracting file with format {}", fileNameFormat);
FileOutputStream fileOutputStream = new FileOutputStream(tempFile);

DigestOutputStream digestOutputStream = new DigestOutputStream(fileOutputStream,
MessageDigest.getInstance(MESSAGE_DIGEST_ALGORITHM));
IOUtils.copy(sourceData, digestOutputStream);
digestOutputStream.close();
byte[] digest = digestOutputStream.getMessageDigest().digest();
log.debug("Calculated digest of new file: {}", Arrays.toString(digest));
String digestHex = DatatypeConverter.printHexBinary(digest);
File finalFile = new File(destinationDirectory, String.format(fileNameFormat, digestHex));
File lockFile = new File(destinationDirectory, String.format(fileNameFormat + ".lock", digestHex));
log.debug("Preparing to check and copy {} to {}", tempFile.getAbsolutePath(), finalFile.getAbsolutePath());
try (FileOutputStream lockFOS = new FileOutputStream(lockFile);
FileLock lock = lockFOS.getChannel().lock()) {
if (finalFile.exists() && finalFile.length() == tempFile.length()) {
byte[] existingFileDigest = null;
try (DigestInputStream digestInputStream = new DigestInputStream(new FileInputStream(finalFile),
MessageDigest.getInstance(MESSAGE_DIGEST_ALGORITHM))) {
byte[] discardedBytes = new byte[8192];
while (digestInputStream.read(discardedBytes) != -1) {
//
// This is just used for the side affect of the digest input stream
//
}
existingFileDigest = digestInputStream.getMessageDigest().digest();
}
if (Arrays.equals(digest, existingFileDigest)) {
//
// The existing file matches the expected file, it's ok to just drop out now
//
log.info("'{}' already exists, and matches. Not overwriting.", finalFile.getAbsolutePath());
return finalFile;
}
log.warn(
"Detected a mismatch between the existing file, and the new file. "
+ "Will overwrite the existing file. " + "Existing: {} -- New File: {}",
Arrays.toString(existingFileDigest), Arrays.toString(digest));
}

if (!tempFile.renameTo(finalFile)) {
log.error("Failed to rename '{}' to '{}'", tempFile.getAbsolutePath(), finalFile.getAbsolutePath());
throw new IOException("Failed to rename extracted file");
}
}
return finalFile;
} finally {
if (tempFile != null && tempFile.exists()) {
if (!tempFile.delete()) {
log.warn("Unable to delete temp file: {}", tempFile.getAbsolutePath());
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -861,8 +861,21 @@ private String extractBinaries() {
if (binPath != null && !binPath.trim().isEmpty()) {
pathToExecutable = binPath.trim();
log.warn("Using non-default native binary at " + pathToExecutable);
pathToLibDir = "";
return "";

File parent = new File(binPath).getParentFile();
pathToLibDir = parent.getAbsolutePath();
CertificateExtractor certificateExtractor = new CertificateExtractor();

try {
String caDirectory = certificateExtractor
.extractCertificates(parent.getAbsoluteFile());
watchFiles.addAll(certificateExtractor.getExtractedCertificates());
FileAgeManager.instance().registerFiles(watchFiles);
return caDirectory;
} catch (IOException ioex) {
log.error("Exception while extracting certificates. Returning no CA directory", ioex);
return "";
}
} else {
log.info("Extracting binaries to " + tmpDir);
try {
Expand All @@ -873,39 +886,14 @@ private String extractBinaries() {

String extension = os.equals("windows") ? ".exe" : "";
String executableName = "kinesis_producer" + extension;
byte[] bin = IOUtils.toByteArray(
this.getClass().getClassLoader().getResourceAsStream(root + "/" + os + "/" + executableName));
MessageDigest md = MessageDigest.getInstance("SHA1");
String mdHex = DatatypeConverter.printHexBinary(md.digest(bin)).toLowerCase();

pathToExecutable = Paths.get(pathToTmpDir, "kinesis_producer_" + mdHex + extension).toString();
File extracted = new File(pathToExecutable);
watchFiles.add(extracted);

// use dedicated lock-file to limit access to executable by a single process
final String pathToLock = Paths.get(pathToTmpDir, "kinesis_producer_" + mdHex + ".lock").toString();
final File lockFile = new File(pathToLock);
try (FileOutputStream lockFOS = new FileOutputStream(lockFile);
FileLock lock = lockFOS.getChannel().lock()) {
if (extracted.exists()) {
boolean contentEqual = false;
if (extracted.length() == bin.length) {
try (InputStream executableIS = new FileInputStream(extracted)) {
byte[] existingBin = IOUtils.toByteArray(executableIS);
contentEqual = Arrays.equals(bin, existingBin);
}
}
if (!contentEqual) {
throw new SecurityException("The contents of the binary " + extracted.getAbsolutePath()
+ " is not what it's expected to be.");
}
} else {
try (OutputStream fos = new FileOutputStream(extracted)) {
IOUtils.write(bin, fos);
}
extracted.setExecutable(true);
}
}
InputStream is = this.getClass().getClassLoader().getResourceAsStream(root + "/" + os + "/" + executableName);
String resultFileFormat = "kinesis_producer_%s" + extension;

File extracted = HashedFileCopier.copyFileFrom(is, tmpDirFile, resultFileFormat);
watchFiles.add(extracted);
extracted.setExecutable(true);
pathToExecutable = extracted.getAbsolutePath();

CertificateExtractor certificateExtractor = new CertificateExtractor();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.producer;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;

import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.security.DigestInputStream;
import java.security.MessageDigest;

import javax.xml.bind.DatatypeConverter;

import org.apache.commons.io.IOUtils;
import org.junit.Before;
import org.junit.Test;

public class HashedFileCopierTest {

private File tempDir;

@Before
public void before() throws Exception {
tempDir = File.createTempFile("kpl-unit-tests", "");
tempDir.delete();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the deletion belong to @after?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File.createTempFile actually creates the file, but I really want a directory.

Thinking about it I looked for a better solution and found it on Files.createTempDirectory.

tempDir.mkdirs();
}

@Test
public void normalFileCopyTest() throws Exception {

File resultFile = HashedFileCopier.copyFileFrom(testDataInputStream(), tempDir, "res-file.%s.txt");
File expectedFile = new File(tempDir, "res-file." + hexDigestForTestData() + ".txt");

assertThat(resultFile, equalTo(expectedFile));
assertThat(expectedFile.exists(), equalTo(true));

byte[] writtenBytes = Files.readAllBytes(resultFile.toPath());
byte[] expectedBytes = IOUtils.toByteArray(testDataInputStream());

assertThat(writtenBytes, equalTo(expectedBytes));

}

@Test
public void fileExistsTest() throws Exception {
String executableFormat = "res-file.%s.txt";
File expectedFile = new File(tempDir, String.format(executableFormat, hexDigestForTestData()));
try (FileOutputStream fso = new FileOutputStream(expectedFile)) {
IOUtils.copy(testDataInputStream(), fso);
}
File resultFile = HashedFileCopier.copyFileFrom(testDataInputStream(), tempDir, executableFormat);
assertThat(resultFile, equalTo(expectedFile));

byte[] expectedData = testDataBytes();
byte[] actualData = Files.readAllBytes(resultFile.toPath());

assertThat(actualData, equalTo(expectedData));
}

@Test
public void lengthMismatchTest() throws Exception {
String executableFormat = "res-file.%s.txt";
File expectedFile = new File(tempDir, String.format(executableFormat, hexDigestForTestData()));
FileOutputStream fso = new FileOutputStream(expectedFile);
IOUtils.copy(testDataInputStream(), fso);
fso.write("This is some extra crap".getBytes(Charset.forName("UTF-8")));
fso.close();

File resultFile = HashedFileCopier.copyFileFrom(testDataInputStream(), tempDir, executableFormat);
assertThat(resultFile, equalTo(expectedFile));

byte[] expectedData = testDataBytes();
byte[] actualData = Files.readAllBytes(resultFile.toPath());

assertThat(actualData, equalTo(expectedData));
}

@Test
public void hashMismatchTest() throws Exception {
String executableFormat = "res-file.%s.txt";
File expectedFile = new File(tempDir, String.format(executableFormat, hexDigestForTestData()));
byte[] testData = testDataBytes();
testData[10] = (byte)~testData[10];

Files.write(expectedFile.toPath(), testData);

File resultFile = HashedFileCopier.copyFileFrom(testDataInputStream(), tempDir, executableFormat);
assertThat(resultFile, equalTo(expectedFile));

byte[] expectedData = testDataBytes();
byte[] actualData = Files.readAllBytes(resultFile.toPath());

assertThat(actualData, equalTo(expectedData));
}

private String hexDigestForTestData() throws Exception {
return DatatypeConverter.printHexBinary(hashForTestData());
}

private byte[] testDataBytes() throws Exception {
return IOUtils.toByteArray(testDataInputStream());
}

private byte[] hashForTestData() throws Exception {
DigestInputStream dis = new DigestInputStream(testDataInputStream(), MessageDigest.getInstance(HashedFileCopier.MESSAGE_DIGEST_ALGORITHM));
IOUtils.toByteArray(dis);
return dis.getMessageDigest().digest();
}

private InputStream testDataInputStream() {
return this.getClass().getClassLoader().getResourceAsStream("test-data/test.txt");
}
}
3 changes: 0 additions & 3 deletions java/amazon-kinesis-producer/src/test/java/log4j.properties

This file was deleted.

26 changes: 26 additions & 0 deletions java/amazon-kinesis-producer/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
~ Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
~
~ Licensed under the Amazon Software License (the "License").
~ You may not use this file except in compliance with the License.
~ A copy of the License is located at
~
~ http://aws.amazon.com/asl/
~
~ or in the "license" file accompanying this file. This file is distributed
~ on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
~ express or implied. See the License for the specific language governing
~ permissions and limitations under the License.
-->
<configuration>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d [%thread] %-5level %logger{36} [%mdc{ShardId:-NONE}] - %msg %n</pattern>
</encoder>
</appender>

<root level="DEBUG">
<appender-ref ref="CONSOLE" />
</root>
</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
This is a test