Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/main/java/org/influxdb/InfluxDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,30 @@ public String value() {
*/
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
final ThreadFactory threadFactory);
/**
* Enable batching of single Point writes with consistency set for an entire batch
* flushDurations is reached first, a batch write is issued.
* Note that batch processing needs to be explicitly stopped before the application is shutdown.
* To do so call disableBatch(). Default consistency is ONE.
*
* @param actions
* the number of actions to collect
* @param flushDuration
* the time to wait at most.
* @param flushDurationTimeUnit
* the TimeUnit for the given flushDuration.
* @param threadFactory
* a ThreadFactory instance to be used.
* @param exceptionHandler
* a consumer function to handle asynchronous errors
* @param consistency
* a consistency setting for batch writes.
* @return the InfluxDB instance to be able to use it in a fluent manner.
*/

InfluxDB enableBatch(int actions, int flushDuration, TimeUnit flushDurationTimeUnit,
ThreadFactory threadFactory, BiConsumer<Iterable<Point>, Throwable> exceptionHandler,
ConsistencyLevel consistency);

/**
* Enable batching of single Point writes to speed up writes significant. If either actions or
Expand Down
45 changes: 34 additions & 11 deletions src/main/java/org/influxdb/impl/BatchProcessor.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package org.influxdb.impl;

import org.influxdb.InfluxDB;
import org.influxdb.InfluxDB.ConsistencyLevel;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -16,18 +21,14 @@
import java.util.logging.Level;
import java.util.logging.Logger;

import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;

/**
* A BatchProcessor can be attached to a InfluxDB Instance to collect single point writes and
* aggregates them to BatchPoints to get a better write performance.
*
* @author stefan.majer [at] gmail.com
*
*/
public class BatchProcessor {
public final class BatchProcessor {

private static final Logger LOG = Logger.getLogger(BatchProcessor.class.getName());
protected final BlockingQueue<AbstractBatchEntry> queue;
Expand All @@ -37,6 +38,7 @@ public class BatchProcessor {
final int actions;
private final TimeUnit flushIntervalUnit;
private final int flushInterval;
private final ConsistencyLevel consistencyLevel;

/**
* The Builder to create a BatchProcessor instance.
Expand All @@ -48,6 +50,7 @@ public static final class Builder {
private TimeUnit flushIntervalUnit;
private int flushInterval;
private BiConsumer<Iterable<Point>, Throwable> exceptionHandler = (entries, throwable) -> { };
private ConsistencyLevel consistencyLevel;

/**
* @param threadFactory
Expand Down Expand Up @@ -107,6 +110,18 @@ public Builder exceptionHandler(final BiConsumer<Iterable<Point>, Throwable> han
this.exceptionHandler = handler;
return this;
}
/**
* Consistency level for batch write.
*
* @param consistencyLevel
* the consistencyLevel
*
* @return this Builder to use it fluent
*/
public Builder consistencyLevel(final ConsistencyLevel consistencyLevel) {
this.consistencyLevel = consistencyLevel;
return this;
}

/**
* Create the BatchProcessor.
Expand All @@ -120,8 +135,9 @@ public BatchProcessor build() {
Objects.requireNonNull(this.flushIntervalUnit, "flushIntervalUnit");
Objects.requireNonNull(this.threadFactory, "threadFactory");
Objects.requireNonNull(this.exceptionHandler, "exceptionHandler");
return new BatchProcessor(this.influxDB, this.threadFactory, this.actions, this.flushIntervalUnit,
this.flushInterval, exceptionHandler);
return new BatchProcessor(this.influxDB, this.threadFactory,
this.actions, this.flushIntervalUnit,
this.flushInterval, exceptionHandler, this.consistencyLevel);
}
}

Expand Down Expand Up @@ -180,16 +196,18 @@ public static Builder builder(final InfluxDB influxDB) {
return new Builder(influxDB);
}

BatchProcessor(final InfluxDBImpl influxDB, final ThreadFactory threadFactory, final int actions,
final TimeUnit flushIntervalUnit, final int flushInterval,
final BiConsumer<Iterable<Point>, Throwable> exceptionHandler) {
private BatchProcessor(final InfluxDBImpl influxDB, final ThreadFactory threadFactory, final int actions,
final TimeUnit flushIntervalUnit, final int flushInterval,
final BiConsumer<Iterable<Point>, Throwable> exceptionHandler,
final ConsistencyLevel consistencyLevel) {
super();
this.influxDB = influxDB;
this.actions = actions;
this.flushIntervalUnit = flushIntervalUnit;
this.flushInterval = flushInterval;
this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
this.exceptionHandler = exceptionHandler;
this.consistencyLevel = consistencyLevel;
if (actions > 1 && actions < Integer.MAX_VALUE) {
this.queue = new LinkedBlockingQueue<>(actions);
} else {
Expand Down Expand Up @@ -229,7 +247,7 @@ void write() {
String batchKey = dbName + "_" + rp;
if (!batchKeyToBatchPoints.containsKey(batchKey)) {
BatchPoints batchPoints = BatchPoints.database(dbName)
.retentionPolicy(rp).build();
.retentionPolicy(rp).consistency(getConsistencyLevel()).build();
batchKeyToBatchPoints.put(batchKey, batchPoints);
}
batchKeyToBatchPoints.get(batchKey).point(point);
Expand Down Expand Up @@ -297,4 +315,9 @@ void flushAndShutdown() {
void flush() {
this.write();
}

public ConsistencyLevel getConsistencyLevel() {
return consistencyLevel;
}

}
31 changes: 20 additions & 11 deletions src/main/java/org/influxdb/impl/InfluxDBImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,15 @@

import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.Moshi;

import okhttp3.Headers;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.RequestBody;
import okhttp3.ResponseBody;
import okhttp3.logging.HttpLoggingInterceptor;
import okhttp3.logging.HttpLoggingInterceptor.Level;
import okio.BufferedSource;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBException;
import org.influxdb.InfluxDBIOException;
Expand All @@ -14,16 +22,6 @@
import org.influxdb.dto.QueryResult;
import org.influxdb.impl.BatchProcessor.HttpBatchEntry;
import org.influxdb.impl.BatchProcessor.UdpBatchEntry;

import okhttp3.Headers;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.RequestBody;
import okhttp3.ResponseBody;
import okhttp3.logging.HttpLoggingInterceptor;
import okhttp3.logging.HttpLoggingInterceptor.Level;
import okio.BufferedSource;
import retrofit2.Call;
import retrofit2.Callback;
import retrofit2.Response;
Expand Down Expand Up @@ -201,6 +199,16 @@ public InfluxDB enableBatch(final int actions, final int flushDuration,
return this;
}

@Override
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
final ThreadFactory threadFactory,
final BiConsumer<Iterable<Point>, Throwable> exceptionHandler,
final ConsistencyLevel consistency) {
enableBatch(actions, flushDuration, flushDurationTimeUnit, threadFactory, exceptionHandler)
.setConsistency(consistency);
return this;
}

@Override
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
final ThreadFactory threadFactory,
Expand All @@ -214,6 +222,7 @@ public InfluxDB enableBatch(final int actions, final int flushDuration, final Ti
.exceptionHandler(exceptionHandler)
.interval(flushDuration, flushDurationTimeUnit)
.threadFactory(threadFactory)
.consistencyLevel(consistency)
.build();
this.batchEnabled.set(true);
return this;
Expand Down
58 changes: 35 additions & 23 deletions src/test/java/org/influxdb/InfluxDBTest.java
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@
package org.influxdb;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.influxdb.InfluxDB.LogLevel;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.influxdb.impl.InfluxDBImpl;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
* Test the InfluxDB API.
*
Expand Down Expand Up @@ -147,7 +148,7 @@ public void testDescribeDatabases() {
Assertions.assertTrue(found, "It is expected that describeDataBases contents the newly create database.");
this.influxDB.deleteDatabase(dbName);
}

/**
* Test that Database exists works.
*/
Expand Down Expand Up @@ -188,7 +189,7 @@ public void testWrite() {
Assertions.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty());
this.influxDB.deleteDatabase(dbName);
}

/**
* Test the implementation of {@link InfluxDB#write(int, Point)}'s sync support.
*/
Expand All @@ -203,7 +204,7 @@ public void testSyncWritePointThroughUDP() throws InterruptedException {
QueryResult result = this.influxDB.query(query);
Assertions.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty());
}

/**
* Test the implementation of {@link InfluxDB#write(int, Point)}'s async support.
*/
Expand All @@ -223,8 +224,8 @@ public void testAsyncWritePointThroughUDP() throws InterruptedException {
this.influxDB.disableBatch();
}
}


/**
* Test the implementation of {@link InfluxDB#write(int, Point)}'s async support.
*/
Expand Down Expand Up @@ -461,7 +462,7 @@ public void testCreateNumericNamedDatabase() {
Assertions.assertTrue(result.contains(numericDbName));
this.influxDB.deleteDatabase(numericDbName);
}

/**
* Test that creating database which name is empty will throw expected exception
*/
Expand Down Expand Up @@ -501,15 +502,15 @@ public void testIsBatchEnabled() {
this.influxDB.disableBatch();
Assertions.assertFalse(this.influxDB.isBatchEnabled());
}

/**
* Test the implementation of {@link InfluxDB#enableBatch(int, int, TimeUnit, ThreadFactory)}.
*/
@Test
public void testBatchEnabledWithThreadFactory() {
final String threadName = "async_influxdb_write";
this.influxDB.enableBatch(1, 1, TimeUnit.SECONDS, new ThreadFactory() {

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
Expand All @@ -524,7 +525,7 @@ public Thread newThread(Runnable r) {
existThreadWithSettedName = true;
break;
}

}
Assertions.assertTrue(existThreadWithSettedName);
this.influxDB.disableBatch();
Expand All @@ -536,7 +537,7 @@ public void testBatchEnabledWithThreadFactoryIsNull() {
this.influxDB.enableBatch(1, 1, TimeUnit.SECONDS, null);
});
}

/**
* Test the implementation of {@link InfluxDBImpl#InfluxDBImpl(String, String, String, okhttp3.OkHttpClient.Builder)}.
*/
Expand Down Expand Up @@ -778,4 +779,15 @@ public void testCreateDropRetentionPolicies() {
Assertions.assertTrue(retentionPolicies.size() == 1);
}

/**
* Test the implementation of {@link InfluxDB#isBatchEnabled() with consistency}.
*/
@Test
public void testIsBatchEnabledWithConsistency() {
Assertions.assertFalse(this.influxDB.isBatchEnabled());
this.influxDB.enableBatch(1, 1, TimeUnit.SECONDS, Executors.defaultThreadFactory(),
(a, b) -> {
}, InfluxDB.ConsistencyLevel.ALL);
Assertions.assertTrue(this.influxDB.isBatchEnabled());
}
}
Loading