Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
Expand Down Expand Up @@ -130,12 +132,12 @@ private boolean hasPermission(UserGroupInformation user) {
}
}

private static void logSstFileList(List<String>sstList, String msg, int sampleSize) {
private static void logSstFileList(Collection<String> sstList, String msg, int sampleSize) {
int count = sstList.size();
if (LOG.isDebugEnabled()) {
LOG.debug(msg, count, "", sstList);
} else if (count > sampleSize) {
List<String> sample = sstList.subList(0, sampleSize);
List<String> sample = sstList.stream().limit(sampleSize).collect(Collectors.toList());
LOG.info(msg, count, ", sample", sample);
} else {
LOG.info(msg, count, "", sstList);
Expand Down Expand Up @@ -187,30 +189,24 @@ private void generateSnapshotCheckpoint(HttpServletRequest request,
}
}

DBCheckpoint checkpoint = null;

boolean flush = false;
String flushParam =
request.getParameter(OZONE_DB_CHECKPOINT_REQUEST_FLUSH);
if (StringUtils.isNotEmpty(flushParam)) {
flush = Boolean.parseBoolean(flushParam);
}

List<String> receivedSstList = new ArrayList<>();
processMetadataSnapshotRequest(request, response, isFormData, flush);
}

private void processMetadataSnapshotRequest(HttpServletRequest request, HttpServletResponse response,
boolean isFormData, boolean flush) {
List<String> excludedSstList = new ArrayList<>();
String[] sstParam = isFormData ?
parseFormDataParameters(request) : request.getParameterValues(
OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST);
if (sstParam != null) {
receivedSstList.addAll(
Arrays.stream(sstParam)
.filter(s -> s.endsWith(ROCKSDB_SST_SUFFIX))
.distinct()
.collect(Collectors.toList()));
logSstFileList(receivedSstList,
"Received list of {} SST files to be excluded{}: {}", 5);
}

Set<String> receivedSstFiles = extractSstFilesToExclude(sstParam);
DBCheckpoint checkpoint = null;
Path tmpdir = null;
try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) {
tmpdir = Files.createTempDirectory(bootstrapTempData.toPath(),
Expand All @@ -235,8 +231,8 @@ private void generateSnapshotCheckpoint(HttpServletRequest request,
file + ".tar\"");

Instant start = Instant.now();
writeDbDataToStream(checkpoint, request,
response.getOutputStream(), receivedSstList, excludedSstList, tmpdir);
writeDbDataToStream(checkpoint, request, response.getOutputStream(),
receivedSstFiles, tmpdir);
Instant end = Instant.now();

long duration = Duration.between(start, end).toMillis();
Expand Down Expand Up @@ -276,6 +272,16 @@ private void generateSnapshotCheckpoint(HttpServletRequest request,
}
}

protected static Set<String> extractSstFilesToExclude(String[] sstParam) {
Set<String> receivedSstFiles = new HashSet<>();
if (sstParam != null) {
receivedSstFiles.addAll(
Arrays.stream(sstParam).filter(s -> s.endsWith(ROCKSDB_SST_SUFFIX)).distinct().collect(Collectors.toList()));
logSstFileList(receivedSstFiles, "Received list of {} SST files to be excluded{}: {}", 5);
}
return receivedSstFiles;
}

public DBCheckpoint getCheckpoint(Path ignoredTmpdir, boolean flush)
throws IOException {
return dbStore.getCheckpoint(flush);
Expand Down Expand Up @@ -346,20 +352,16 @@ public void doPost(HttpServletRequest request, HttpServletResponse response) {
* (Parameter is ignored in this class but used in child classes).
* @param destination The stream to write to.
* @param toExcludeList the files to be excluded
* @param excludedList the files excluded
*
*/
public void writeDbDataToStream(DBCheckpoint checkpoint,
HttpServletRequest ignoredRequest,
OutputStream destination,
List<String> toExcludeList,
List<String> excludedList, Path tmpdir)
Set<String> toExcludeList,
Path tmpdir)
throws IOException, InterruptedException {
Objects.requireNonNull(toExcludeList);
Objects.requireNonNull(excludedList);

writeDBCheckpointToStream(checkpoint, destination,
toExcludeList, excludedList);
writeDBCheckpointToStream(checkpoint, destination, toExcludeList);
}

public DBStore getDbStore() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -617,14 +617,12 @@ public static MetricsSystem initializeMetrics(
* @param checkpoint checkpoint file
* @param destination destination output stream.
* @param toExcludeList the files to be excluded
* @param excludedList the files excluded
* @throws IOException
*/
public static void writeDBCheckpointToStream(
DBCheckpoint checkpoint,
OutputStream destination,
List<String> toExcludeList,
List<String> excludedList)
Set<String> toExcludeList)
throws IOException {
try (ArchiveOutputStream<TarArchiveEntry> archiveOutputStream = tar(destination);
Stream<Path> files =
Expand All @@ -636,8 +634,6 @@ public static void writeDBCheckpointToStream(
String fileName = fileNamePath.toString();
if (!toExcludeList.contains(fileName)) {
includeFile(path.toFile(), fileName, archiveOutputStream);
} else {
excludedList.add(fileName);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -118,8 +117,8 @@ public void downloadSnapshot(String leaderNodeID, File targetFile)
.collect(Collectors.toList()));
try (OutputStream outputStream = Files.newOutputStream(targetFile.toPath())) {
writeDBCheckpointToStream(dbCheckpoint, outputStream,
HAUtils.getExistingSstFiles(
rdbSnapshotProvider.getCandidateDir()), new ArrayList<>());
new HashSet<>(HAUtils.getExistingSstFiles(
rdbSnapshotProvider.getCandidateDir())));
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashSet;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.hdds.utils.db.DBStore;
Expand Down Expand Up @@ -66,8 +66,7 @@ public void writeDBCheckPointToSream(OutputStream stream, boolean flush)
}

Instant start = Instant.now();
HddsServerUtil.writeDBCheckpointToStream(checkpoint, stream,
new ArrayList<>(), new ArrayList<>());
HddsServerUtil.writeDBCheckpointToStream(checkpoint, stream, new HashSet<>());
Instant end = Instant.now();

long duration = Duration.between(start, end).toMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Stream;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
Expand Down Expand Up @@ -114,7 +115,7 @@ public void init() throws Exception {
Collections.emptyList(),
false);
doCallRealMethod().when(scmDbCheckpointServletMock)
.writeDbDataToStream(any(), any(), any(), any(), any(), any());
.writeDbDataToStream(any(), any(), any(), any(), any());
doCallRealMethod().when(scmDbCheckpointServletMock).doPost(requestMock,
responseMock);
doCallRealMethod().when(scmDbCheckpointServletMock).doGet(requestMock,
Expand Down Expand Up @@ -145,7 +146,7 @@ void testEndpoint(String httpMethod, @TempDir Path tempDir)
throws ServletException, IOException, InterruptedException {
this.method = httpMethod;

List<String> toExcludeList = new ArrayList<>();
Set<String> toExcludeList = new HashSet<>();
toExcludeList.add("sstFile1.sst");
toExcludeList.add("sstFile2.sst");

Expand Down Expand Up @@ -197,7 +198,7 @@ public void write(int b) throws IOException {
.isGreaterThan(initialCheckpointCount);

verify(scmDbCheckpointServletMock).writeDbDataToStream(any(),
any(), any(), eq(toExcludeList), any(), any());
any(), any(), eq(toExcludeList), any());
}

@Test
Expand Down Expand Up @@ -235,7 +236,7 @@ private static Stream<Arguments> getHttpMethods() {
* @param toExcludeList SST file names to be excluded.
* @throws IOException
*/
private void setupHttpMethod(List<String> toExcludeList) throws IOException {
private void setupHttpMethod(Collection<String> toExcludeList) throws IOException {
if (method.equals("POST")) {
setupPostMethod(toExcludeList);
} else {
Expand All @@ -248,7 +249,7 @@ private void setupHttpMethod(List<String> toExcludeList) throws IOException {
* @param toExcludeList SST file names to be excluded.
* @throws IOException
*/
private void setupPostMethod(List<String> toExcludeList)
private void setupPostMethod(Collection<String> toExcludeList)
throws IOException {
when(requestMock.getMethod()).thenReturn("POST");
when(requestMock.getContentType()).thenReturn("multipart/form-data; " +
Expand Down Expand Up @@ -286,7 +287,7 @@ private void setupPostMethod(List<String> toExcludeList)
* Setups details for HTTP GET request.
* @param toExcludeList SST file names to be excluded.
*/
private void setupGetMethod(List<String> toExcludeList) {
private void setupGetMethod(Collection<String> toExcludeList) {
when(requestMock.getMethod()).thenReturn("GET");
when(requestMock
.getParameterValues(OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public void write(int b) throws IOException {
responseMock);

doCallRealMethod().when(omDbCheckpointServletMock)
.writeDbDataToStream(any(), any(), any(), any(), any(), any());
.writeDbDataToStream(any(), any(), any(), any(), any());

when(omDbCheckpointServletMock.getBootstrapStateLock())
.thenReturn(lock);
Expand Down Expand Up @@ -265,7 +265,7 @@ private void testEndpoint(String method) throws Exception {
doNothing().when(responseMock).setContentType("application/x-tar");
doNothing().when(responseMock).setHeader(anyString(), anyString());

List<String> toExcludeList = new ArrayList<>();
Set<String> toExcludeList = new HashSet<>();
toExcludeList.add("sstFile1.sst");
toExcludeList.add("sstFile2.sst");

Expand All @@ -288,7 +288,7 @@ private void testEndpoint(String method) throws Exception {
.isGreaterThan(initialCheckpointCount);

verify(omDbCheckpointServletMock).writeDbDataToStream(any(),
any(), any(), eq(toExcludeList), any(), any());
any(), any(), eq(toExcludeList), any());
}

private void testDoPostWithInvalidContentType() throws Exception {
Expand Down Expand Up @@ -528,7 +528,7 @@ private void testWriteDbDataWithoutOmSnapshot()
Path tmpdir = folder.resolve("bootstrapData");
try (OutputStream fileOutputStream = Files.newOutputStream(tempFile.toPath())) {
omDbCheckpointServletMock.writeDbDataToStream(dbCheckpoint, requestMock,
fileOutputStream, new ArrayList<>(), new ArrayList<>(), tmpdir);
fileOutputStream, new HashSet<>(), tmpdir);
}

// Untar the file into a temp folder to be examined.
Expand Down Expand Up @@ -562,8 +562,7 @@ private void testWriteDbDataWithToExcludeFileList()
writer.write("Dummy data.");
}
assertTrue(dummyFile.exists());
List<String> toExcludeList = new ArrayList<>();
List<String> excludedList = new ArrayList<>();
Set<String> toExcludeList = new HashSet<>();
toExcludeList.add(dummyFile.getName());

// Set http param to exclude snapshot data.
Expand All @@ -574,7 +573,7 @@ private void testWriteDbDataWithToExcludeFileList()
Path tmpdir = folder.resolve("bootstrapData");
try (OutputStream fileOutputStream = Files.newOutputStream(tempFile.toPath())) {
omDbCheckpointServletMock.writeDbDataToStream(dbCheckpoint, requestMock,
fileOutputStream, toExcludeList, excludedList, tmpdir);
fileOutputStream, toExcludeList, tmpdir);
}

// Untar the file into a temp folder to be examined.
Expand Down Expand Up @@ -611,7 +610,7 @@ private void doEndpoint(String method) {
* @param toExcludeList SST file names to be excluded.
* @throws IOException
*/
private void setupHttpMethod(String method, List<String> toExcludeList) throws IOException {
private void setupHttpMethod(String method, Collection <String> toExcludeList) throws IOException {
if (method.equals("POST")) {
setupPostMethod(toExcludeList);
} else {
Expand All @@ -624,7 +623,7 @@ private void setupHttpMethod(String method, List<String> toExcludeList) throws I
* @param toExcludeList SST file names to be excluded.
* @throws IOException
*/
private void setupPostMethod(List<String> toExcludeList)
private void setupPostMethod(Collection<String> toExcludeList)
throws IOException {
when(requestMock.getMethod()).thenReturn("POST");
when(requestMock.getContentType()).thenReturn("multipart/form-data; " +
Expand Down Expand Up @@ -662,7 +661,7 @@ private void setupPostMethod(List<String> toExcludeList)
* Setups details for HTTP GET request.
* @param toExcludeList SST file names to be excluded.
*/
private void setupGetMethod(List<String> toExcludeList) {
private void setupGetMethod(Collection<String> toExcludeList) {
when(requestMock.getMethod()).thenReturn("GET");
when(requestMock
.getParameterValues(OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST))
Expand Down
Loading
Loading