diff --git a/src/main/java/org/influxdb/InfluxDB.java b/src/main/java/org/influxdb/InfluxDB.java index 3714eb131..aee0cad6d 100644 --- a/src/main/java/org/influxdb/InfluxDB.java +++ b/src/main/java/org/influxdb/InfluxDB.java @@ -134,6 +134,30 @@ public String value() { */ public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit, final ThreadFactory threadFactory); + /** + * Enable batching of single Point writes with consistency set for an entire batch + * flushDurations is reached first, a batch write is issued. + * Note that batch processing needs to be explicitly stopped before the application is shutdown. + * To do so call disableBatch(). Default consistency is ONE. + * + * @param actions + * the number of actions to collect + * @param flushDuration + * the time to wait at most. + * @param flushDurationTimeUnit + * the TimeUnit for the given flushDuration. + * @param threadFactory + * a ThreadFactory instance to be used. + * @param exceptionHandler + * a consumer function to handle asynchronous errors + * @param consistency + * a consistency setting for batch writes. + * @return the InfluxDB instance to be able to use it in a fluent manner. + */ + + InfluxDB enableBatch(int actions, int flushDuration, TimeUnit flushDurationTimeUnit, + ThreadFactory threadFactory, BiConsumer, Throwable> exceptionHandler, + ConsistencyLevel consistency); /** * Enable batching of single Point writes to speed up writes significant. If either actions or diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index 28f973dc9..35457a55c 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -1,5 +1,10 @@ package org.influxdb.impl; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDB.ConsistencyLevel; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -16,10 +21,6 @@ import java.util.logging.Level; import java.util.logging.Logger; -import org.influxdb.InfluxDB; -import org.influxdb.dto.BatchPoints; -import org.influxdb.dto.Point; - /** * A BatchProcessor can be attached to a InfluxDB Instance to collect single point writes and * aggregates them to BatchPoints to get a better write performance. @@ -27,7 +28,7 @@ * @author stefan.majer [at] gmail.com * */ -public class BatchProcessor { +public final class BatchProcessor { private static final Logger LOG = Logger.getLogger(BatchProcessor.class.getName()); protected final BlockingQueue queue; @@ -37,6 +38,7 @@ public class BatchProcessor { final int actions; private final TimeUnit flushIntervalUnit; private final int flushInterval; + private final ConsistencyLevel consistencyLevel; /** * The Builder to create a BatchProcessor instance. @@ -48,6 +50,7 @@ public static final class Builder { private TimeUnit flushIntervalUnit; private int flushInterval; private BiConsumer, Throwable> exceptionHandler = (entries, throwable) -> { }; + private ConsistencyLevel consistencyLevel; /** * @param threadFactory @@ -107,6 +110,18 @@ public Builder exceptionHandler(final BiConsumer, Throwable> han this.exceptionHandler = handler; return this; } + /** + * Consistency level for batch write. + * + * @param consistencyLevel + * the consistencyLevel + * + * @return this Builder to use it fluent + */ + public Builder consistencyLevel(final ConsistencyLevel consistencyLevel) { + this.consistencyLevel = consistencyLevel; + return this; + } /** * Create the BatchProcessor. @@ -120,8 +135,9 @@ public BatchProcessor build() { Objects.requireNonNull(this.flushIntervalUnit, "flushIntervalUnit"); Objects.requireNonNull(this.threadFactory, "threadFactory"); Objects.requireNonNull(this.exceptionHandler, "exceptionHandler"); - return new BatchProcessor(this.influxDB, this.threadFactory, this.actions, this.flushIntervalUnit, - this.flushInterval, exceptionHandler); + return new BatchProcessor(this.influxDB, this.threadFactory, + this.actions, this.flushIntervalUnit, + this.flushInterval, exceptionHandler, this.consistencyLevel); } } @@ -180,9 +196,10 @@ public static Builder builder(final InfluxDB influxDB) { return new Builder(influxDB); } - BatchProcessor(final InfluxDBImpl influxDB, final ThreadFactory threadFactory, final int actions, - final TimeUnit flushIntervalUnit, final int flushInterval, - final BiConsumer, Throwable> exceptionHandler) { + private BatchProcessor(final InfluxDBImpl influxDB, final ThreadFactory threadFactory, final int actions, + final TimeUnit flushIntervalUnit, final int flushInterval, + final BiConsumer, Throwable> exceptionHandler, + final ConsistencyLevel consistencyLevel) { super(); this.influxDB = influxDB; this.actions = actions; @@ -190,6 +207,7 @@ public static Builder builder(final InfluxDB influxDB) { this.flushInterval = flushInterval; this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory); this.exceptionHandler = exceptionHandler; + this.consistencyLevel = consistencyLevel; if (actions > 1 && actions < Integer.MAX_VALUE) { this.queue = new LinkedBlockingQueue<>(actions); } else { @@ -229,7 +247,7 @@ void write() { String batchKey = dbName + "_" + rp; if (!batchKeyToBatchPoints.containsKey(batchKey)) { BatchPoints batchPoints = BatchPoints.database(dbName) - .retentionPolicy(rp).build(); + .retentionPolicy(rp).consistency(getConsistencyLevel()).build(); batchKeyToBatchPoints.put(batchKey, batchPoints); } batchKeyToBatchPoints.get(batchKey).point(point); @@ -297,4 +315,9 @@ void flushAndShutdown() { void flush() { this.write(); } + + public ConsistencyLevel getConsistencyLevel() { + return consistencyLevel; + } + } diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index 070a6dbe8..d85600efc 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -3,7 +3,15 @@ import com.squareup.moshi.JsonAdapter; import com.squareup.moshi.Moshi; - +import okhttp3.Headers; +import okhttp3.HttpUrl; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.RequestBody; +import okhttp3.ResponseBody; +import okhttp3.logging.HttpLoggingInterceptor; +import okhttp3.logging.HttpLoggingInterceptor.Level; +import okio.BufferedSource; import org.influxdb.InfluxDB; import org.influxdb.InfluxDBException; import org.influxdb.InfluxDBIOException; @@ -14,16 +22,6 @@ import org.influxdb.dto.QueryResult; import org.influxdb.impl.BatchProcessor.HttpBatchEntry; import org.influxdb.impl.BatchProcessor.UdpBatchEntry; - -import okhttp3.Headers; -import okhttp3.HttpUrl; -import okhttp3.MediaType; -import okhttp3.OkHttpClient; -import okhttp3.RequestBody; -import okhttp3.ResponseBody; -import okhttp3.logging.HttpLoggingInterceptor; -import okhttp3.logging.HttpLoggingInterceptor.Level; -import okio.BufferedSource; import retrofit2.Call; import retrofit2.Callback; import retrofit2.Response; @@ -201,6 +199,16 @@ public InfluxDB enableBatch(final int actions, final int flushDuration, return this; } + @Override + public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit, + final ThreadFactory threadFactory, + final BiConsumer, Throwable> exceptionHandler, + final ConsistencyLevel consistency) { + enableBatch(actions, flushDuration, flushDurationTimeUnit, threadFactory, exceptionHandler) + .setConsistency(consistency); + return this; + } + @Override public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit, final ThreadFactory threadFactory, @@ -214,6 +222,7 @@ public InfluxDB enableBatch(final int actions, final int flushDuration, final Ti .exceptionHandler(exceptionHandler) .interval(flushDuration, flushDurationTimeUnit) .threadFactory(threadFactory) + .consistencyLevel(consistency) .build(); this.batchEnabled.set(true); return this; diff --git a/src/test/java/org/influxdb/InfluxDBTest.java b/src/test/java/org/influxdb/InfluxDBTest.java index e206ffaa0..ec7a35a9b 100644 --- a/src/test/java/org/influxdb/InfluxDBTest.java +++ b/src/test/java/org/influxdb/InfluxDBTest.java @@ -1,17 +1,5 @@ package org.influxdb; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; - import org.influxdb.InfluxDB.LogLevel; import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; @@ -19,13 +7,26 @@ import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; import org.influxdb.impl.InfluxDBImpl; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.platform.runner.JUnitPlatform; import org.junit.runner.RunWith; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + /** * Test the InfluxDB API. * @@ -147,7 +148,7 @@ public void testDescribeDatabases() { Assertions.assertTrue(found, "It is expected that describeDataBases contents the newly create database."); this.influxDB.deleteDatabase(dbName); } - + /** * Test that Database exists works. */ @@ -188,7 +189,7 @@ public void testWrite() { Assertions.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty()); this.influxDB.deleteDatabase(dbName); } - + /** * Test the implementation of {@link InfluxDB#write(int, Point)}'s sync support. */ @@ -203,7 +204,7 @@ public void testSyncWritePointThroughUDP() throws InterruptedException { QueryResult result = this.influxDB.query(query); Assertions.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty()); } - + /** * Test the implementation of {@link InfluxDB#write(int, Point)}'s async support. */ @@ -223,8 +224,8 @@ public void testAsyncWritePointThroughUDP() throws InterruptedException { this.influxDB.disableBatch(); } } - - + + /** * Test the implementation of {@link InfluxDB#write(int, Point)}'s async support. */ @@ -461,7 +462,7 @@ public void testCreateNumericNamedDatabase() { Assertions.assertTrue(result.contains(numericDbName)); this.influxDB.deleteDatabase(numericDbName); } - + /** * Test that creating database which name is empty will throw expected exception */ @@ -501,7 +502,7 @@ public void testIsBatchEnabled() { this.influxDB.disableBatch(); Assertions.assertFalse(this.influxDB.isBatchEnabled()); } - + /** * Test the implementation of {@link InfluxDB#enableBatch(int, int, TimeUnit, ThreadFactory)}. */ @@ -509,7 +510,7 @@ public void testIsBatchEnabled() { public void testBatchEnabledWithThreadFactory() { final String threadName = "async_influxdb_write"; this.influxDB.enableBatch(1, 1, TimeUnit.SECONDS, new ThreadFactory() { - + @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); @@ -524,7 +525,7 @@ public Thread newThread(Runnable r) { existThreadWithSettedName = true; break; } - + } Assertions.assertTrue(existThreadWithSettedName); this.influxDB.disableBatch(); @@ -536,7 +537,7 @@ public void testBatchEnabledWithThreadFactoryIsNull() { this.influxDB.enableBatch(1, 1, TimeUnit.SECONDS, null); }); } - + /** * Test the implementation of {@link InfluxDBImpl#InfluxDBImpl(String, String, String, okhttp3.OkHttpClient.Builder)}. */ @@ -778,4 +779,15 @@ public void testCreateDropRetentionPolicies() { Assertions.assertTrue(retentionPolicies.size() == 1); } + /** + * Test the implementation of {@link InfluxDB#isBatchEnabled() with consistency}. + */ + @Test + public void testIsBatchEnabledWithConsistency() { + Assertions.assertFalse(this.influxDB.isBatchEnabled()); + this.influxDB.enableBatch(1, 1, TimeUnit.SECONDS, Executors.defaultThreadFactory(), + (a, b) -> { + }, InfluxDB.ConsistencyLevel.ALL); + Assertions.assertTrue(this.influxDB.isBatchEnabled()); + } } diff --git a/src/test/java/org/influxdb/impl/BatchProcessorTest.java b/src/test/java/org/influxdb/impl/BatchProcessorTest.java index c30c3b388..8a17245f0 100644 --- a/src/test/java/org/influxdb/impl/BatchProcessorTest.java +++ b/src/test/java/org/influxdb/impl/BatchProcessorTest.java @@ -21,6 +21,11 @@ import org.junit.platform.runner.JUnitPlatform; import org.junit.runner.RunWith; +import static org.junit.Assert.assertNull; +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.assertThat; + + @RunWith(JUnitPlatform.class) public class BatchProcessorTest { @@ -115,8 +120,8 @@ public void testFlushWritesBufferedPointsAndDoesNotShutdownScheduler() throws In public void testActionsIsZero() throws InterruptedException, IOException { InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); Assertions.assertThrows(IllegalArgumentException.class, () -> { - BatchProcessor.builder(mockInfluxDB).actions(0) - .interval(1, TimeUnit.NANOSECONDS).build(); + BatchProcessor.builder(mockInfluxDB).actions(0) + .interval(1, TimeUnit.NANOSECONDS).build(); }); } @@ -124,8 +129,8 @@ public void testActionsIsZero() throws InterruptedException, IOException { public void testIntervalIsZero() throws InterruptedException, IOException { InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); Assertions.assertThrows(IllegalArgumentException.class, () -> { - BatchProcessor.builder(mockInfluxDB).actions(1) - .interval(0, TimeUnit.NANOSECONDS).build(); + BatchProcessor.builder(mockInfluxDB).actions(1) + .interval(0, TimeUnit.NANOSECONDS).build(); }); } @@ -133,8 +138,25 @@ public void testIntervalIsZero() throws InterruptedException, IOException { public void testInfluxDBIsNull() throws InterruptedException, IOException { InfluxDB mockInfluxDB = null; Assertions.assertThrows(NullPointerException.class, () -> { - BatchProcessor.builder(mockInfluxDB).actions(1) - .interval(1, TimeUnit.NANOSECONDS).build(); + BatchProcessor.builder(mockInfluxDB).actions(1) + .interval(1, TimeUnit.NANOSECONDS).build(); }); } + + @Test + public void testConsistencyLevelNull() throws InterruptedException, IOException { + InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); + BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE) + .interval(1, TimeUnit.NANOSECONDS).build(); + assertNull(batchProcessor.getConsistencyLevel()); + } + + @Test + public void testConsistencyLevelUpdated() throws InterruptedException, IOException { + InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); + BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE) + .interval(1, TimeUnit.NANOSECONDS).consistencyLevel(InfluxDB.ConsistencyLevel.ANY).build(); + assertThat(batchProcessor.getConsistencyLevel(), is(equalTo(InfluxDB.ConsistencyLevel.ANY))); + } + }