Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make disk persistence capacity configurable #2329

Merged
merged 5 commits into from
Jun 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ public static class PreviewConfiguration {
// telemetry (they are much smaller so a larger queue size is ok)
public int metricsExportQueueCapacity = 65536;

// disk persistence has a default capacity of 50MB
public int diskPersistenceMaxSizeMb = 50;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2.x SDK enforces a limit of 1 GB:

I'm not really sure the reason, can you see if Application Insights .NET SDK has a max capacity that users can configure? if they do, we may want to implement that max too

also, I wonder if we should allow 0, meaning disable disk persistency, which users on read-only filesystems could use (and then we wouldn't need to log warning for those users)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dotnet is default to 50MB as well. they use 30 seconds as the default sending interval. same as us.. it's not configured. we could ignore value that is less than 50? and document it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dotnet source code

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dotnet is default to 50MB as well

do they allow user to configure this? and if so, do they have a maximum value that the user can set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's fixed. no configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but wait, is it exposed to customers though? is it available in the public interface?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx, does the .Net SDK impose any limit on how large the user can configure it to be?

Only limit is the maximum value of long itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like it's public API. based on the code, it can be any number.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome, thx all 👍

Copy link
Contributor Author

@heyams heyams Jun 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// unfortunately the Java SDK behavior has always been to report the "% Processor Time" number
// as "normalized" (divided by # of CPU cores), even though it should be non-normalized
// we cannot change this existing behavior as it would break existing customers' alerts, but at
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) {
config.internal.statsbeat.endpoint)
.setRoleName(config.role.name)
.setRoleInstance(config.role.instance)
.setDiskPersistenceMaxSizeMb(config.preview.diskPersistenceMaxSizeMb)
.build();

PerformanceCounterInitializer.initialize(telemetryClient, config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public class TelemetryClient {
@Nullable private final File tempDir;
private final int generalExportQueueCapacity;
private final int metricsExportQueueCapacity;
private final int diskPersistenceMaxSizeMb;

@Nullable private final Configuration.AadAuthentication aadAuthentication;

Expand Down Expand Up @@ -125,6 +126,7 @@ public TelemetryClient(Builder builder) {
this.statsbeatConnectionString = builder.statsbeatConnectionString;
this.roleName = builder.roleName;
this.roleInstance = builder.roleInstance;
this.diskPersistenceMaxSizeMb = builder.diskPersistenceMaxSizeMb;
}

public static TelemetryClient getActive() {
Expand Down Expand Up @@ -259,6 +261,7 @@ private BatchItemProcessor initBatchItemProcessor(
new DiagnosticTelemetryPipelineListener(
"Sending telemetry to the ingestion service", true),
new LocalStorageTelemetryPipelineListener(
diskPersistenceMaxSizeMb,
TempDirs.getSubDir(tempDir, TELEMETRY_FOLDER_NAME),
telemetryPipeline,
statsbeatModule.getNonessentialStatsbeat(),
Expand Down Expand Up @@ -290,6 +293,7 @@ public BatchItemProcessor getStatsbeatBatchItemProcessor() {
} else {
LocalStorageTelemetryPipelineListener localStorageTelemetryPipelineListener =
new LocalStorageTelemetryPipelineListener(
diskPersistenceMaxSizeMb,
TempDirs.getSubDir(tempDir, STATSBEAT_FOLDER_NAME),
telemetryPipeline,
LocalStorageStats.noop(),
Expand Down Expand Up @@ -442,6 +446,7 @@ public static class Builder {
@Nullable private StatsbeatConnectionString statsbeatConnectionString;
@Nullable private String roleName;
@Nullable private String roleInstance;
private int diskPersistenceMaxSizeMb;

public Builder setCustomDimensions(Map<String, String> customDimensions) {
StringSubstitutor substitutor = new StringSubstitutor(System.getenv());
Expand Down Expand Up @@ -520,6 +525,11 @@ public Builder setRoleInstance(@Nullable String roleInstance) {
return this;
}

public Builder setDiskPersistenceMaxSizeMb(int diskPersistenceMaxSizeMb) {
this.diskPersistenceMaxSizeMb = diskPersistenceMaxSizeMb;
return this;
}

public TelemetryClient build() {
return new TelemetryClient(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ private TelemetryItemExporter initExporterBuilder() {
new TelemetryItemExporter(
pipeline,
new LocalStorageTelemetryPipelineListener(
50, // default to 50MB
TempDirs.getSubDir(tempDir, "telemetry"),
pipeline,
LocalStorageStats.noop(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ class LocalFileSender implements Runnable {

private static final Logger logger = LoggerFactory.getLogger(LocalFileSender.class);

// send persisted telemetries from local disk every 30 seconds.
private static final long INTERVAL_SECONDS = 30;

private final LocalFileLoader localFileLoader;
private final TelemetryPipeline telemetryPipeline;

Expand All @@ -51,6 +48,7 @@ class LocalFileSender implements Runnable {
private final TelemetryPipelineListener diagnosticListener;

LocalFileSender(
long intervalSeconds,
LocalFileLoader localFileLoader,
TelemetryPipeline telemetryPipeline,
boolean suppressWarnings) { // used to suppress warnings from statsbeat
Expand All @@ -64,7 +62,7 @@ class LocalFileSender implements Runnable {
"Sending telemetry to the ingestion service (retry from disk)", false);

scheduledExecutor.scheduleWithFixedDelay(
this, INTERVAL_SECONDS, INTERVAL_SECONDS, TimeUnit.SECONDS);
this, intervalSeconds, intervalSeconds, TimeUnit.SECONDS);
}

void shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,25 @@
/** This class manages writing a list of {@link ByteBuffer} to the file system. */
final class LocalFileWriter {

// 50MB per folder for all apps.
private static final long MAX_FILE_SIZE_IN_BYTES = 52428800; // 50MB
private static final String PERMANENT_FILE_EXTENSION = ".trn";

private final long diskPersistenceMaxSizeBytes;
private final LocalFileCache localFileCache;
private final File telemetryFolder;
private final LocalStorageStats stats;

private final OperationLogger operationLogger;

LocalFileWriter(
int diskPersistenceMaxSizeMb,
LocalFileCache localFileCache,
File telemetryFolder,
LocalStorageStats stats,
boolean suppressWarnings) { // used to suppress warnings from statsbeat
this.telemetryFolder = telemetryFolder;
this.localFileCache = localFileCache;
this.stats = stats;
this.diskPersistenceMaxSizeBytes = diskPersistenceMaxSizeMb * 1024L * 1024L;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it would not be an issue in terms of allocation, it may be worth introducing a DiskPersistenceMaxSize object type. It would have advantages:

  • A method could be added to the DiskPersistenceMaxSize object type to encapsulate this kind of code: diskPersistenceMaxSizeMb * 1024L * 1024L;
  • This would remove the need to add comments (for example in AzureMonitorExporterBuilder for this PR)
  • This would strengthen the code checks at compilation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment is only for default value. creating an object for a primitive type with super simple arithmetic is not worthy in my opinion.


operationLogger =
suppressWarnings
Expand All @@ -63,7 +64,7 @@ final class LocalFileWriter {

void writeToDisk(String instrumentationKey, List<ByteBuffer> buffers) {
long size = getTotalSizeOfPersistedFiles(telemetryFolder);
if (size >= MAX_FILE_SIZE_IN_BYTES) {
if (size >= diskPersistenceMaxSizeBytes) {
operationLogger.recordFailure(
"Local persistent storage capacity has been reached. It's currently at ("
+ (size / 1024)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class LocalStorageTelemetryPipelineListener implements TelemetryPipelineL

// telemetryFolder must already exist and be writable
public LocalStorageTelemetryPipelineListener(
int diskPersistenceMaxSizeMb,
File telemetryFolder,
TelemetryPipeline pipeline,
LocalStorageStats stats,
Expand All @@ -48,9 +49,14 @@ public LocalStorageTelemetryPipelineListener(
LocalFileCache localFileCache = new LocalFileCache(telemetryFolder);
LocalFileLoader loader =
new LocalFileLoader(localFileCache, telemetryFolder, stats, suppressWarnings);
localFileWriter = new LocalFileWriter(localFileCache, telemetryFolder, stats, suppressWarnings);
localFileWriter =
new LocalFileWriter(
diskPersistenceMaxSizeMb, localFileCache, telemetryFolder, stats, suppressWarnings);

localFileSender = new LocalFileSender(loader, pipeline, suppressWarnings);
// send persisted telemetries from local disk every 30 seconds by default.
// if diskPersistenceMaxSizeMb is greater than 50, it will get changed to 10 seconds.
long intervalSeconds = diskPersistenceMaxSizeMb > 50 ? 10 : 30;
Comment on lines +56 to +58
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

estimating how long this will take to send a large number of batches from disk:

600 characters per telemetry

  • 512 telemetry per batch
    = ~300kb per batch

1gb of telemetry will be ~3000 batches

so sending 1 batch per 10 seconds, this will take 8 hours

which is not bad

(we can always revisit this detail when spec'ing it out with other langs)

Copy link
Contributor Author

@heyams heyams Jun 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i can add a todo to revisit this interval.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@trask We allow (in .NET) users to configure how many parallel sends can occur. (Default is 10). They can increase it to a large number, increasing the throughput heavily, subject to physical limitations of the network itself.

While scenarios like load testing, by increasing the senders, user could send HUGE amounts of data.

localFileSender = new LocalFileSender(intervalSeconds, loader, pipeline, suppressWarnings);
localFilePurger = new LocalFilePurger(telemetryFolder, suppressWarnings);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void setup() throws Exception {
new TelemetryItemExporter(
telemetryPipeline,
new LocalStorageTelemetryPipelineListener(
tempFolder, telemetryPipeline, LocalStorageStats.noop(), false));
50, tempFolder, telemetryPipeline, LocalStorageStats.noop(), false));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void testLoadFile() throws IOException {
public void testWriteAndReadRandomText() {
String text = "hello world";
LocalFileCache cache = new LocalFileCache(tempFolder);
LocalFileWriter writer = new LocalFileWriter(cache, tempFolder, null, false);
LocalFileWriter writer = new LocalFileWriter(50, cache, tempFolder, null, false);
writer.writeToDisk(INSTRUMENTATION_KEY, singletonList(ByteBuffer.wrap(text.getBytes(UTF_8))));

LocalFileLoader loader = new LocalFileLoader(cache, tempFolder, null, false);
Expand Down Expand Up @@ -215,7 +215,7 @@ public void testWriteGzipRawByte() throws IOException {
// write gzipped bytes[] to disk
byte[] result = byteArrayOutputStream.toByteArray();
LocalFileCache cache = new LocalFileCache(tempFolder);
LocalFileWriter writer = new LocalFileWriter(cache, tempFolder, null, false);
LocalFileWriter writer = new LocalFileWriter(50, cache, tempFolder, null, false);
writer.writeToDisk(INSTRUMENTATION_KEY, singletonList(ByteBuffer.wrap(result)));

// read gzipped byte[] from disk
Expand All @@ -242,7 +242,8 @@ public void testDeleteFilePermanentlyOnSuccess() throws Exception {
HttpClient mockedClient = getMockHttpClientSuccess();
HttpPipelineBuilder pipelineBuilder = new HttpPipelineBuilder().httpClient(mockedClient);
LocalFileCache localFileCache = new LocalFileCache(tempFolder);
LocalFileWriter localFileWriter = new LocalFileWriter(localFileCache, tempFolder, null, false);
LocalFileWriter localFileWriter =
new LocalFileWriter(50, localFileCache, tempFolder, null, false);
LocalFileLoader localFileLoader = new LocalFileLoader(localFileCache, tempFolder, null, false);

TelemetryPipeline telemetryPipeline =
Expand Down Expand Up @@ -294,7 +295,8 @@ public void testDeleteFilePermanentlyOnFailure() throws Exception {
LocalFileCache localFileCache = new LocalFileCache(tempFolder);

LocalFileLoader localFileLoader = new LocalFileLoader(localFileCache, tempFolder, null, false);
LocalFileWriter localFileWriter = new LocalFileWriter(localFileCache, tempFolder, null, false);
LocalFileWriter localFileWriter =
new LocalFileWriter(50, localFileCache, tempFolder, null, false);

TelemetryPipeline telemetryPipeline =
new TelemetryPipeline(pipelineBuilder.build(), new URL("http://foo.bar"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class LocalFilePurgerTests {
public void testPurgedExpiredFiles() throws InterruptedException {
String text = "hello world";
LocalFileCache cache = new LocalFileCache(tempFolder);
LocalFileWriter writer = new LocalFileWriter(cache, tempFolder, null, false);
LocalFileWriter writer = new LocalFileWriter(50, cache, tempFolder, null, false);

// run purge task every second to delete files that are 5 seconds old
LocalFilePurger purger = new LocalFilePurger(tempFolder, 5L, 1L, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,14 @@ public void testWriteByteBuffersList() throws IOException {

assertThat(byteBuffers.size()).isEqualTo(10);

LocalFileWriter writer = new LocalFileWriter(localFileCache, tempFolder, null, false);
LocalFileWriter writer = new LocalFileWriter(50, localFileCache, tempFolder, null, false);
writer.writeToDisk("00000000-0000-0000-0000-0FEEDDADBEEF", byteBuffers);
assertThat(localFileCache.getPersistedFilesCache().size()).isEqualTo(1);
}

@Test
public void testWriteRawByteArray() {
LocalFileWriter writer = new LocalFileWriter(localFileCache, tempFolder, null, false);
LocalFileWriter writer = new LocalFileWriter(50, localFileCache, tempFolder, null, false);
writer.writeToDisk("00000000-0000-0000-0000-0FEEDDADBEEF", singletonList(buffer));
assertThat(localFileCache.getPersistedFilesCache().size()).isEqualTo(1);
}
Expand All @@ -110,7 +110,8 @@ public void testWriteUnderMultipleThreadsEnvironment() throws InterruptedExcepti
executorService.execute(
() -> {
for (int j = 0; j < 10; j++) {
LocalFileWriter writer = new LocalFileWriter(localFileCache, tempFolder, null, false);
LocalFileWriter writer =
new LocalFileWriter(50, localFileCache, tempFolder, null, false);
writer.writeToDisk(
"00000000-0000-0000-0000-0FEEDDADBEEF",
singletonList(ByteBuffer.wrap(telemetry.getBytes(UTF_8))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private TelemetryItemExporter getExporter() throws MalformedURLException {

return new TelemetryItemExporter(
telemetryPipeline,
new LocalStorageTelemetryPipelineListener(tempFolder, telemetryPipeline, null, false));
new LocalStorageTelemetryPipelineListener(50, tempFolder, telemetryPipeline, null, false));
}

private static String getRequestBodyString(Flux<ByteBuffer> requestBody) {
Expand Down