Skip to content

Commit

Permalink
Make disk persistence capacity configurable (#2329)
Browse files Browse the repository at this point in the history
* Make disk persistence max size to be configurable

* Fix tests

* Change sender interval to 10 seconds if disk size is bigger than 50MB

* Rename

* Delete a line
  • Loading branch information
heyams authored Jun 20, 2022
1 parent 37f6d4c commit c7783a8
Show file tree
Hide file tree
Showing 12 changed files with 42 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ public static class PreviewConfiguration {
// telemetry (they are much smaller so a larger queue size is ok)
public int metricsExportQueueCapacity = 65536;

// disk persistence has a default capacity of 50MB
public int diskPersistenceMaxSizeMb = 50;

// unfortunately the Java SDK behavior has always been to report the "% Processor Time" number
// as "normalized" (divided by # of CPU cores), even though it should be non-normalized
// we cannot change this existing behavior as it would break existing customers' alerts, but at
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) {
config.internal.statsbeat.endpoint)
.setRoleName(config.role.name)
.setRoleInstance(config.role.instance)
.setDiskPersistenceMaxSizeMb(config.preview.diskPersistenceMaxSizeMb)
.build();

PerformanceCounterInitializer.initialize(telemetryClient, config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class TelemetryClient {
@Nullable private final File tempDir;
private final int generalExportQueueCapacity;
private final int metricsExportQueueCapacity;
private final int diskPersistenceMaxSizeMb;

@Nullable private final Configuration.AadAuthentication aadAuthentication;

Expand Down Expand Up @@ -126,6 +127,7 @@ public TelemetryClient(Builder builder) {
this.statsbeatConnectionString = builder.statsbeatConnectionString;
this.roleName = builder.roleName;
this.roleInstance = builder.roleInstance;
this.diskPersistenceMaxSizeMb = builder.diskPersistenceMaxSizeMb;
}

public static TelemetryClient getActive() {
Expand Down Expand Up @@ -260,6 +262,7 @@ private BatchItemProcessor initBatchItemProcessor(
new DiagnosticTelemetryPipelineListener(
"Sending telemetry to the ingestion service", true),
new LocalStorageTelemetryPipelineListener(
diskPersistenceMaxSizeMb,
TempDirs.getSubDir(tempDir, TELEMETRY_FOLDER_NAME),
telemetryPipeline,
statsbeatModule.getNonessentialStatsbeat(),
Expand Down Expand Up @@ -291,6 +294,7 @@ public BatchItemProcessor getStatsbeatBatchItemProcessor() {
} else {
LocalStorageTelemetryPipelineListener localStorageTelemetryPipelineListener =
new LocalStorageTelemetryPipelineListener(
diskPersistenceMaxSizeMb,
TempDirs.getSubDir(tempDir, STATSBEAT_FOLDER_NAME),
telemetryPipeline,
LocalStorageStats.noop(),
Expand Down Expand Up @@ -448,6 +452,7 @@ public static class Builder {
@Nullable private StatsbeatConnectionString statsbeatConnectionString;
@Nullable private String roleName;
@Nullable private String roleInstance;
private int diskPersistenceMaxSizeMb;

public Builder setCustomDimensions(Map<String, String> customDimensions) {
StringSubstitutor substitutor = new StringSubstitutor(System.getenv());
Expand Down Expand Up @@ -526,6 +531,11 @@ public Builder setRoleInstance(@Nullable String roleInstance) {
return this;
}

public Builder setDiskPersistenceMaxSizeMb(int diskPersistenceMaxSizeMb) {
this.diskPersistenceMaxSizeMb = diskPersistenceMaxSizeMb;
return this;
}

public TelemetryClient build() {
return new TelemetryClient(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ private TelemetryItemExporter initExporterBuilder() {
new TelemetryItemExporter(
pipeline,
new LocalStorageTelemetryPipelineListener(
50, // default to 50MB
TempDirs.getSubDir(tempDir, "telemetry"),
pipeline,
LocalStorageStats.noop(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ class LocalFileSender implements Runnable {

private static final Logger logger = LoggerFactory.getLogger(LocalFileSender.class);

// send persisted telemetries from local disk every 30 seconds.
private static final long INTERVAL_SECONDS = 30;

private final LocalFileLoader localFileLoader;
private final TelemetryPipeline telemetryPipeline;

Expand All @@ -51,6 +48,7 @@ class LocalFileSender implements Runnable {
private final TelemetryPipelineListener diagnosticListener;

LocalFileSender(
long intervalSeconds,
LocalFileLoader localFileLoader,
TelemetryPipeline telemetryPipeline,
boolean suppressWarnings) { // used to suppress warnings from statsbeat
Expand All @@ -64,7 +62,7 @@ class LocalFileSender implements Runnable {
"Sending telemetry to the ingestion service (retry from disk)", false);

scheduledExecutor.scheduleWithFixedDelay(
this, INTERVAL_SECONDS, INTERVAL_SECONDS, TimeUnit.SECONDS);
this, intervalSeconds, intervalSeconds, TimeUnit.SECONDS);
}

void shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,25 @@
/** This class manages writing a list of {@link ByteBuffer} to the file system. */
final class LocalFileWriter {

// 50MB per folder for all apps.
private static final long MAX_FILE_SIZE_IN_BYTES = 52428800; // 50MB
private static final String PERMANENT_FILE_EXTENSION = ".trn";

private final long diskPersistenceMaxSizeBytes;
private final LocalFileCache localFileCache;
private final File telemetryFolder;
private final LocalStorageStats stats;

private final OperationLogger operationLogger;

LocalFileWriter(
int diskPersistenceMaxSizeMb,
LocalFileCache localFileCache,
File telemetryFolder,
LocalStorageStats stats,
boolean suppressWarnings) { // used to suppress warnings from statsbeat
this.telemetryFolder = telemetryFolder;
this.localFileCache = localFileCache;
this.stats = stats;
this.diskPersistenceMaxSizeBytes = diskPersistenceMaxSizeMb * 1024L * 1024L;

operationLogger =
suppressWarnings
Expand All @@ -63,7 +64,7 @@ final class LocalFileWriter {

void writeToDisk(String instrumentationKey, List<ByteBuffer> buffers) {
long size = getTotalSizeOfPersistedFiles(telemetryFolder);
if (size >= MAX_FILE_SIZE_IN_BYTES) {
if (size >= diskPersistenceMaxSizeBytes) {
operationLogger.recordFailure(
"Local persistent storage capacity has been reached. It's currently at ("
+ (size / 1024)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class LocalStorageTelemetryPipelineListener implements TelemetryPipelineL

// telemetryFolder must already exist and be writable
public LocalStorageTelemetryPipelineListener(
int diskPersistenceMaxSizeMb,
File telemetryFolder,
TelemetryPipeline pipeline,
LocalStorageStats stats,
Expand All @@ -48,9 +49,14 @@ public LocalStorageTelemetryPipelineListener(
LocalFileCache localFileCache = new LocalFileCache(telemetryFolder);
LocalFileLoader loader =
new LocalFileLoader(localFileCache, telemetryFolder, stats, suppressWarnings);
localFileWriter = new LocalFileWriter(localFileCache, telemetryFolder, stats, suppressWarnings);
localFileWriter =
new LocalFileWriter(
diskPersistenceMaxSizeMb, localFileCache, telemetryFolder, stats, suppressWarnings);

localFileSender = new LocalFileSender(loader, pipeline, suppressWarnings);
// send persisted telemetries from local disk every 30 seconds by default.
// if diskPersistenceMaxSizeMb is greater than 50, it will get changed to 10 seconds.
long intervalSeconds = diskPersistenceMaxSizeMb > 50 ? 10 : 30;
localFileSender = new LocalFileSender(intervalSeconds, loader, pipeline, suppressWarnings);
localFilePurger = new LocalFilePurger(telemetryFolder, suppressWarnings);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void setup() throws Exception {
new TelemetryItemExporter(
telemetryPipeline,
new LocalStorageTelemetryPipelineListener(
tempFolder, telemetryPipeline, LocalStorageStats.noop(), false));
50, tempFolder, telemetryPipeline, LocalStorageStats.noop(), false));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void testLoadFile() throws IOException {
public void testWriteAndReadRandomText() {
String text = "hello world";
LocalFileCache cache = new LocalFileCache(tempFolder);
LocalFileWriter writer = new LocalFileWriter(cache, tempFolder, null, false);
LocalFileWriter writer = new LocalFileWriter(50, cache, tempFolder, null, false);
writer.writeToDisk(INSTRUMENTATION_KEY, singletonList(ByteBuffer.wrap(text.getBytes(UTF_8))));

LocalFileLoader loader = new LocalFileLoader(cache, tempFolder, null, false);
Expand Down Expand Up @@ -215,7 +215,7 @@ public void testWriteGzipRawByte() throws IOException {
// write gzipped bytes[] to disk
byte[] result = byteArrayOutputStream.toByteArray();
LocalFileCache cache = new LocalFileCache(tempFolder);
LocalFileWriter writer = new LocalFileWriter(cache, tempFolder, null, false);
LocalFileWriter writer = new LocalFileWriter(50, cache, tempFolder, null, false);
writer.writeToDisk(INSTRUMENTATION_KEY, singletonList(ByteBuffer.wrap(result)));

// read gzipped byte[] from disk
Expand All @@ -242,7 +242,8 @@ public void testDeleteFilePermanentlyOnSuccess() throws Exception {
HttpClient mockedClient = getMockHttpClientSuccess();
HttpPipelineBuilder pipelineBuilder = new HttpPipelineBuilder().httpClient(mockedClient);
LocalFileCache localFileCache = new LocalFileCache(tempFolder);
LocalFileWriter localFileWriter = new LocalFileWriter(localFileCache, tempFolder, null, false);
LocalFileWriter localFileWriter =
new LocalFileWriter(50, localFileCache, tempFolder, null, false);
LocalFileLoader localFileLoader = new LocalFileLoader(localFileCache, tempFolder, null, false);

TelemetryPipeline telemetryPipeline =
Expand Down Expand Up @@ -294,7 +295,8 @@ public void testDeleteFilePermanentlyOnFailure() throws Exception {
LocalFileCache localFileCache = new LocalFileCache(tempFolder);

LocalFileLoader localFileLoader = new LocalFileLoader(localFileCache, tempFolder, null, false);
LocalFileWriter localFileWriter = new LocalFileWriter(localFileCache, tempFolder, null, false);
LocalFileWriter localFileWriter =
new LocalFileWriter(50, localFileCache, tempFolder, null, false);

TelemetryPipeline telemetryPipeline =
new TelemetryPipeline(pipelineBuilder.build(), new URL("http://foo.bar"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class LocalFilePurgerTests {
public void testPurgedExpiredFiles() throws InterruptedException {
String text = "hello world";
LocalFileCache cache = new LocalFileCache(tempFolder);
LocalFileWriter writer = new LocalFileWriter(cache, tempFolder, null, false);
LocalFileWriter writer = new LocalFileWriter(50, cache, tempFolder, null, false);

// run purge task every second to delete files that are 5 seconds old
LocalFilePurger purger = new LocalFilePurger(tempFolder, 5L, 1L, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,14 @@ public void testWriteByteBuffersList() throws IOException {

assertThat(byteBuffers.size()).isEqualTo(10);

LocalFileWriter writer = new LocalFileWriter(localFileCache, tempFolder, null, false);
LocalFileWriter writer = new LocalFileWriter(50, localFileCache, tempFolder, null, false);
writer.writeToDisk("00000000-0000-0000-0000-0FEEDDADBEEF", byteBuffers);
assertThat(localFileCache.getPersistedFilesCache().size()).isEqualTo(1);
}

@Test
public void testWriteRawByteArray() {
LocalFileWriter writer = new LocalFileWriter(localFileCache, tempFolder, null, false);
LocalFileWriter writer = new LocalFileWriter(50, localFileCache, tempFolder, null, false);
writer.writeToDisk("00000000-0000-0000-0000-0FEEDDADBEEF", singletonList(buffer));
assertThat(localFileCache.getPersistedFilesCache().size()).isEqualTo(1);
}
Expand All @@ -110,7 +110,8 @@ public void testWriteUnderMultipleThreadsEnvironment() throws InterruptedExcepti
executorService.execute(
() -> {
for (int j = 0; j < 10; j++) {
LocalFileWriter writer = new LocalFileWriter(localFileCache, tempFolder, null, false);
LocalFileWriter writer =
new LocalFileWriter(50, localFileCache, tempFolder, null, false);
writer.writeToDisk(
"00000000-0000-0000-0000-0FEEDDADBEEF",
singletonList(ByteBuffer.wrap(telemetry.getBytes(UTF_8))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private TelemetryItemExporter getExporter() throws MalformedURLException {

return new TelemetryItemExporter(
telemetryPipeline,
new LocalStorageTelemetryPipelineListener(tempFolder, telemetryPipeline, null, false));
new LocalStorageTelemetryPipelineListener(50, tempFolder, telemetryPipeline, null, false));
}

private static String getRequestBodyString(Flux<ByteBuffer> requestBody) {
Expand Down

0 comments on commit c7783a8

Please sign in to comment.