Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
97a5a2c
Adding log statement + removing eviction on exception for debugging p…
zarna1parekh Jul 30, 2025
7443840
Formatting
zarna1parekh Jul 30, 2025
30ef18c
adding cache eviction back
zarna1parekh Jul 30, 2025
11de483
printing only relevant fields
zarna1parekh Jul 30, 2025
4294bc1
printing specific fields
zarna1parekh Jul 30, 2025
ac66b41
Checking Lucene Index status before upload
zarna1parekh Aug 12, 2025
f909840
post download check on cache nodes
zarna1parekh Aug 6, 2025
5c4b07a
Addressing directory locking issue
zarna1parekh Aug 12, 2025
c975fe3
Fixing test cases
zarna1parekh Aug 13, 2025
99ae402
Updating logging statement
zarna1parekh Aug 13, 2025
b09f9e0
lucene check after download on cache nodes
zarna1parekh Aug 13, 2025
4809591
log clean up
zarna1parekh Aug 13, 2025
808160a
Code Refactor
zarna1parekh Aug 13, 2025
fd4744c
S3 Exception logging
zarna1parekh Aug 16, 2025
31a054c
exposing the s3 exception on failure
zarna1parekh Aug 16, 2025
dbfdfe0
Check if download is failing from S3
zarna1parekh Aug 17, 2025
2c2ccec
adding more log statements
zarna1parekh Aug 17, 2025
5d41271
log lines
zarna1parekh Aug 17, 2025
6dc0c76
log line
zarna1parekh Aug 17, 2025
8acd25f
Increasing retries for S3
zarna1parekh Aug 17, 2025
e5abb7b
Checksum validation
zarna1parekh Aug 17, 2025
98dceab
Checksum only when required
zarna1parekh Aug 17, 2025
48c83b2
logging cache node id
zarna1parekh Aug 18, 2025
e171c05
logs
zarna1parekh Aug 18, 2025
f6cb74b
crc32 checksum validation
zarna1parekh Aug 19, 2025
52f394e
changing log level to info
zarna1parekh Aug 19, 2025
d2d5093
Enriching log statement
zarna1parekh Sep 10, 2025
e3e3b56
fmt
zarna1parekh Sep 10, 2025
cfadbdc
Directory cleanup
zarna1parekh Sep 11, 2025
17b0dfb
Do not delete the error dir
zarna1parekh Sep 12, 2025
4540e97
fmt
zarna1parekh Sep 12, 2025
6b72251
reverting unwanted changes
zarna1parekh Sep 12, 2025
961c8d5
Updating async client to have more Native Memory
zarna1parekh Sep 13, 2025
dd68d77
fmt
zarna1parekh Sep 13, 2025
f041267
Incrasing Native memory to multipart upload
zarna1parekh Sep 14, 2025
bcd0970
fmt
zarna1parekh Sep 14, 2025
175de27
Conservative on Native memory
zarna1parekh Sep 15, 2025
b17a16c
Upload sequentially instead of dir upload
zarna1parekh Sep 16, 2025
2787a34
build failure
zarna1parekh Sep 16, 2025
11691ed
handling wildcard imports
zarna1parekh Sep 17, 2025
67d541d
fixing build issue
zarna1parekh Sep 17, 2025
7575911
Cleanup Chunk Manager
zarna1parekh Sep 17, 2025
b75d892
cleanup debug code
zarna1parekh Sep 19, 2025
5b879f8
Fixing test cases
zarna1parekh Sep 22, 2025
2db12ce
Verobose log line
zarna1parekh Sep 22, 2025
15e76f4
upload time taken
zarna1parekh Sep 26, 2025
d497e6f
Closing searcher and writer before upload begins
zarna1parekh Oct 2, 2025
414bd33
Close scheduled jobs
zarna1parekh Oct 3, 2025
6c14cbe
rolback searcher changes
zarna1parekh Oct 5, 2025
ca6fb0e
index writer close + lucene check after upload + local download
zarna1parekh Oct 11, 2025
e45a2de
closing searcher before snapshot uplaod
zarna1parekh Oct 14, 2025
4e658fd
Updating AWS SDK version
zarna1parekh Oct 14, 2025
3f0b897
Updating aws crt version
zarna1parekh Oct 14, 2025
4920c58
Reverting javac to 21
zarna1parekh Oct 14, 2025
f5270f4
Fixing test cases
zarna1parekh Oct 15, 2025
76809e2
Java version 21
zarna1parekh Oct 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions astra/src/main/java/com/slack/astra/blobfs/BlobStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -195,6 +197,28 @@ public List<String> listFiles(String prefix) {
return filesList;
}

public Map<String, Long> listFilesWithSize(String prefix) {
assert prefix != null && !prefix.isEmpty();

ListObjectsV2Request listRequest = builder().bucket(bucketName).prefix(prefix).build();
ListObjectsV2Publisher asyncPaginatedListResponse =
s3AsyncClient.listObjectsV2Paginator(listRequest);

Map<String, Long> filesListWithSize = new HashMap<>();
try {
asyncPaginatedListResponse
.subscribe(
listResponse ->
listResponse
.contents()
.forEach(s3Object -> filesListWithSize.put(s3Object.key(), s3Object.size())))
.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
return filesListWithSize;
Comment on lines +273 to +291

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could extract a helper method that would be used by both listFiles... methods that takes a prefix and a Consumer. Then this could look like the following.

Also, the block passed to subscribe could be called in multiple threads, so this should use a storage class that is safe wrt concurrent modifications.

Suggested change
assert prefix != null && !prefix.isEmpty();
ListObjectsV2Request listRequest = builder().bucket(bucketName).prefix(prefix).build();
ListObjectsV2Publisher asyncPaginatedListResponse =
s3AsyncClient.listObjectsV2Paginator(listRequest);
Map<String, Long> filesListWithSize = new HashMap<>();
try {
asyncPaginatedListResponse
.subscribe(
listResponse ->
listResponse
.contents()
.forEach(s3Object -> filesListWithSize.put(s3Object.key(), s3Object.size())))
.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
return filesListWithSize;
Map<String, Long> filesWithSize = new ConcurrentHashMap<>();
listFilesAndDo(prefix, s3Object -> filesListWithSize.put(s3Object.key(), s3Object.size()));
return filesWithSize;

}

/**
* Deletes a chunk off of object storage by chunk id. If object was not found returns false.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.slack.astra.chunk;

import java.nio.file.Path;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.NoLockFactory;

public class ChunkValidationUtils {

public static boolean isChunkClean(Path path) throws Exception {
FSDirectory existingDir = FSDirectory.open(path, NoLockFactory.INSTANCE);
CheckIndex checker = new CheckIndex(existingDir);
CheckIndex.Status status = checker.checkIndex();
checker.close();
return status.clean;
}
}
71 changes: 70 additions & 1 deletion astra/src/main/java/com/slack/astra/chunk/ReadOnlyChunkImpl.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.slack.astra.chunk;

import static com.slack.astra.chunk.ChunkValidationUtils.isChunkClean;
import static com.slack.astra.chunkManager.CachingChunkManager.ASTRA_NG_DYNAMIC_CHUNK_SIZES_FLAG;
import static com.slack.astra.server.AstraConfig.DEFAULT_ZK_TIMEOUT_SECS;

Expand Down Expand Up @@ -28,9 +29,11 @@
import com.slack.astra.proto.metadata.Metadata;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.EnumSet;
import java.util.List;
Expand Down Expand Up @@ -228,6 +231,48 @@ public CacheNodeAssignment getCacheNodeAssignment() {
return assignment;
}

private boolean validateS3vsLocalDownLoad() {
// check if the number of files in S3 matches the local directory
Map<String, Long> filesWithSizeInS3 = blobStore.listFilesWithSize(snapshotMetadata.snapshotId);

Map<String, Long> localFiles;
try (Stream<Path> fileList = Files.list(dataDirectory)) {
localFiles =
fileList
.filter(Files::isRegularFile)
.collect(
Collectors.toMap(
path ->
dataDirectory.relativize(path).toString().replace(File.separator, "/"),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. I think the replace isn't necessary since Files.list() only returns files in the current directory. Although, maybe you should use Path#getFileName().toString() here, which would align with calling Paths.get(s3Path).getFileName().toString() on the s3 entries below.

path -> path.toFile().length()));
} catch (IOException e) {
throw new RuntimeException(
String.format("Error reading local files in directory %s", dataDirectory), e);
}
if (localFiles.size() != filesWithSizeInS3.size()) {
LOG.error(
String.format(
"Mismatch in number of files in S3 (%s) and local directory (%s) for snapshot %s",
filesWithSizeInS3.size(), localFiles.size(), snapshotMetadata.toString()));
return false;
}

for (Map.Entry<String, Long> entry : filesWithSizeInS3.entrySet()) {
String s3Path = entry.getKey();
long s3Size = entry.getValue();
String fileName = Paths.get(s3Path).getFileName().toString();

if (!localFiles.containsKey(fileName) || !localFiles.get(fileName).equals(s3Size)) {
LOG.error(
String.format(
"Mismatch for file %s in S3 and local directory of size %s for snapshot %s",
s3Path, s3Size, snapshotMetadata.toString()));
return false;
}
}
return true;
}

public void downloadChunkData() {
Timer.Sample assignmentTimer = Timer.start(meterRegistry);
// lock
Expand Down Expand Up @@ -265,7 +310,24 @@ public void downloadChunkData() {
"No files found on blob storage, released slot for re-assignment");
}
}
// validate if the number of files in S3 matches the local directory
if (!validateS3vsLocalDownLoad()) {
String errorString =
String.format(
"Mismatch in number or size of files in S3 and local directory for snapshot %s",
snapshotMetadata);
throw new IOException(errorString);
}

// check if lucene index is valid and not corrupted
boolean luceneStatus = isChunkClean(dataDirectory);
if (!luceneStatus) {
throw new IOException(
String.format(
"Lucene index is not clean. Found issues for snapshot: %s.", snapshotMetadata));
}

// check if schema file exists
Path schemaPath = Path.of(dataDirectory.toString(), ReadWriteChunk.SCHEMA_FILE_NAME);
if (!Files.exists(schemaPath)) {
throw new RuntimeException("We expect a schema.json file to exist within the index");
Expand Down Expand Up @@ -305,7 +367,14 @@ public void downloadChunkData() {
// disregarding any errors
setAssignmentState(
getCacheNodeAssignment(), Metadata.CacheNodeAssignment.CacheNodeAssignmentState.EVICT);
LOG.error("Error handling chunk assignment", e);
LOG.error(
"Error handling chunk assignment for assignment: {}, snapshot id: {}, snapshot size: {}, replicaId: {}, replicaSet: {}",
assignment.assignmentId,
assignment.snapshotId,
assignment.snapshotSize,
assignment.replicaId,
assignment.replicaSet,
e);
assignmentTimer.stop(chunkAssignmentTimerFailure);
} finally {
chunkAssignmentLock.unlock();
Expand Down
26 changes: 25 additions & 1 deletion astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.slack.astra.chunk;

import static com.slack.astra.chunk.ChunkInfo.toSnapshotMetadata;
import static com.slack.astra.chunk.ChunkValidationUtils.isChunkClean;
import static com.slack.astra.writer.SpanFormatter.isValidTimestamp;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -249,6 +250,13 @@ public boolean snapshotToS3(BlobStore blobStore) {
totalBytes += sizeOfFile;
logger.debug("File name is {} ({} bytes)", fileName, sizeOfFile);
}
// check if lucene index is valid and not corrupted
boolean luceneStatus = isChunkClean(dirPath);
if (!luceneStatus) {
logger.error("Lucene index is not clean. Found issues for chunk: {}.", chunkInfo);
return false;
}

this.fileUploadAttempts.increment(filesToUpload.size());
Timer.Sample snapshotTimer = Timer.start(meterRegistry);

Expand All @@ -258,7 +266,8 @@ public boolean snapshotToS3(BlobStore blobStore) {
snapshotTimer.stop(meterRegistry.timer(SNAPSHOT_TIMER));
chunkInfo.setSizeInBytesOnDisk(totalBytes);

List<String> filesUploaded = blobStore.listFiles(chunkInfo.chunkId);
Map<String, Long> filesWithSizeInS3 = blobStore.listFilesWithSize(chunkInfo.chunkId);
List<String> filesUploaded = new ArrayList<>(filesWithSizeInS3.keySet().stream().toList());
filesUploaded.removeIf(file -> file.endsWith("write.lock"));

// check here that all files are uploaded
Expand All @@ -273,6 +282,21 @@ public boolean snapshotToS3(BlobStore blobStore) {
filesUploaded);
return false;
}

// validate the size of the uploaded files
for (String fileName : filesToUpload) {
String s3Path = String.format("%s/%s", chunkInfo.chunkId, fileName);
long sizeOfFile = Files.size(Path.of(dirPath + "/" + fileName));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe use File.separator here instead of "/"?

if (!filesWithSizeInS3.containsKey(s3Path)
|| !filesWithSizeInS3.get(s3Path).equals(sizeOfFile)) {
logger.error(
String.format(
"Mismatch for file %s in S3 and local directory of size %s for chunk %s",
s3Path, sizeOfFile, chunkInfo.chunkId));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to include the s3 file size here as well.

return false;
}
}

// and schema file exists in s3
if (!filesUploaded.contains(chunkInfo.chunkId + "/" + SCHEMA_FILE_NAME)) {
logger.error("Schema file was not uploaded to S3: {}", SCHEMA_FILE_NAME);
Expand Down
30 changes: 30 additions & 0 deletions astra/src/test/java/com/slack/astra/blobfs/BlobStoreTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -218,6 +219,35 @@ void testListFilesNonExistingPrefix() {
assertThat(blobStore.listFiles(chunkId).size()).isEqualTo(0);
}

@Test
void testListFilesWithSize() throws IOException {
BlobStore blobStore = new BlobStore(s3Client, TEST_BUCKET);
String chunkId = UUID.randomUUID().toString();

assertThat(blobStore.listFiles(chunkId).size()).isEqualTo(0);

Path directoryUpload = Files.createTempDirectory("");
Path foo = Files.createTempFile(directoryUpload, "", "");
try (FileWriter fileWriter = new FileWriter(foo.toFile())) {
fileWriter.write("Example test 1");
}
Path bar = Files.createTempFile(directoryUpload, "", "");
Comment on lines +225 to +234

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you used a non-random chunkId and file names, you could have the assertion use more literals and it would be easier to follow.

Also, could you have one of the files have a different number of characters in it so it would be clear that they are different?

try (FileWriter fileWriter = new FileWriter(bar.toFile())) {
fileWriter.write("Example test 2");
}
blobStore.upload(chunkId, directoryUpload);

Map<String, Long> filesWithSize = blobStore.listFilesWithSize(chunkId);
assertThat(filesWithSize.size()).isEqualTo(2);
assertThat(filesWithSize)
.containsExactlyInAnyOrderEntriesOf(
Map.of(
String.format("%s/%s", chunkId, foo.getFileName().toString()),
Files.size(foo),
String.format("%s/%s", chunkId, bar.getFileName().toString()),
Files.size(bar)));
}

@Test
public void testCompressDecompressJsonData() throws Exception {
// Arrange
Expand Down