Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Fixes
- [#150](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/150) - `HTTPOptions::httpReadTimeout` is also set as the connect timeout for HTTP connection on ESP32. It doesn't work for HTTPS connection yet.
- [#156](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/156) - Correctly rounding _writeBufferSize_, when _bufferSize/batchSize >= 256_.
- [#162](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/162) - Fixed flushing of not full buffer after to timeout.

## 3.8.0 [2021-04-01]
### Features
Expand Down
17 changes: 10 additions & 7 deletions src/InfluxDbClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ bool InfluxDBClient::init() {
INFLUXDB_CLIENT_DEBUG("[D] Org: %s\n", _connInfo.org.c_str());
INFLUXDB_CLIENT_DEBUG("[D] Bucket: %s\n", _connInfo.bucket.c_str());
INFLUXDB_CLIENT_DEBUG("[D] Token: %s\n", _connInfo.authToken.c_str());
INFLUXDB_CLIENT_DEBUG("[D] DB version: %d\n", _dbVersion);
INFLUXDB_CLIENT_DEBUG("[D] DB version: %d\n", _connInfo.dbVersion);
if(_connInfo.serverUrl.length() == 0 || (_connInfo.dbVersion == 2 && (_connInfo.org.length() == 0 || _connInfo.bucket.length() == 0 || _connInfo.authToken.length() == 0))) {
INFLUXDB_CLIENT_DEBUG("[E] Invalid parameters\n");
_connInfo.lastError = F("Invalid parameters");
Expand Down Expand Up @@ -137,7 +137,7 @@ void InfluxDBClient::clean() {
_service = nullptr;
}
_buckets = nullptr;
_lastFlushed = 0;
_lastFlushed = millis();
_retryTime = 0;
}

Expand Down Expand Up @@ -359,11 +359,14 @@ bool InfluxDBClient::checkBuffer() {
// in case we (over)reach batchSize with non full buffer
bool bufferReachedBatchsize = _writeBuffer[_batchPointer] && _writeBuffer[_batchPointer]->isFull();
// or flush interval timed out
bool flushTimeout = _writeOptions._flushInterval > 0 && _lastFlushed > 0 && (millis()/1000 - _lastFlushed) > _writeOptions._flushInterval;
bool flushTimeout = _writeOptions._flushInterval > 0 && ((millis() - _lastFlushed)/1000) >= _writeOptions._flushInterval;

INFLUXDB_CLIENT_DEBUG("[D] Flushing buffer: is oversized %s, is timeout %s, is buffer full %s\n",
bool2string(bufferReachedBatchsize),bool2string(flushTimeout), bool2string(isBufferFull()));

if(bufferReachedBatchsize || flushTimeout || isBufferFull() ) {
INFLUXDB_CLIENT_DEBUG("[D] Flushing buffer: is oversized %s, is timeout %s, is buffer full %s\n", bufferReachedBatchsize?"true":"false",flushTimeout?"true":"false", isBufferFull()?"true":"false");
return flushBufferInternal(true);

return flushBufferInternal(!flushTimeout);
}
return true;
}
Expand Down Expand Up @@ -412,7 +415,7 @@ bool InfluxDBClient::flushBufferInternal(bool flashOnlyFull) {
success = statusCode >= 200 && statusCode < 300;
// advance even on message failure x e <300;429)
if(success || !retry) {
_lastFlushed = millis()/1000;
_lastFlushed = millis();
dropCurrentBatch();
} else if(retry) {
_writeBuffer[_batchPointer]->retryCount++;
Expand Down Expand Up @@ -542,7 +545,7 @@ FluxQueryResult InfluxDBClient::query(String fluxQuery) {
String header = httpClient->header(TransferEncoding);
chunked = header.equalsIgnoreCase("chunked");
}
INFLUXDB_CLIENT_DEBUG("[D] chunked: %s\n", chunked?"true":"false");
INFLUXDB_CLIENT_DEBUG("[D] chunked: %s\n", bool2string(chunked));
HttpStreamScanner *scanner = new HttpStreamScanner(httpClient, chunked);
reader = new CsvReader(scanner);
return false;
Expand Down
2 changes: 1 addition & 1 deletion src/InfluxDbClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ class InfluxDBClient {
// Index of bath start for next write
uint8_t _batchPointer = 0;
// Last time in sec buffer has been successfully flushed
uint32_t _lastFlushed = 0;
uint32_t _lastFlushed;
// Bucket sub-client
BucketsClient _buckets;
protected:
Expand Down
3 changes: 2 additions & 1 deletion src/Point.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include <Arduino.h>
#include "WritePrecision.h"
#include "util/helpers.h"

/**
* Class Point represents InfluxDB point in line protocol.
Expand All @@ -49,7 +50,7 @@ friend class InfluxDBClient;
void addField(String name, unsigned int value) { putField(name, String(value)+"i"); }
void addField(String name, long value) { putField(name, String(value)+"i"); }
void addField(String name, unsigned long value) { putField(name, String(value)+"i"); }
void addField(String name, bool value) { putField(name,value?"true":"false"); }
void addField(String name, bool value) { putField(name, bool2string(value)); }
void addField(String name, String value) { addField(name, value.c_str()); }
void addField(String name, const char *value);
// Set timestamp to `now()` and store it in specified precision, nanoseconds by default. Date and time must be already set. See `configTime` in the device API
Expand Down
3 changes: 2 additions & 1 deletion src/query/HttpStreamScanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
// Uncomment bellow in case of a problem and rebuild sketch
//#define INFLUXDB_CLIENT_DEBUG_ENABLE
#include "util/debug.h"
#include "util/helpers.h"

HttpStreamScanner::HttpStreamScanner(HTTPClient *client, bool chunked)
{
Expand All @@ -37,7 +38,7 @@ HttpStreamScanner::HttpStreamScanner(HTTPClient *client, bool chunked)
_chunked = chunked;
_chunkHeader = chunked;
_len = client->getSize();
INFLUXDB_CLIENT_DEBUG("[D] HttpStreamScanner: chunked: %s, size: %d\n", _chunked?"true":"false", _len);
INFLUXDB_CLIENT_DEBUG("[D] HttpStreamScanner: chunked: %s, size: %d\n", bool2string(_chunked), _len);
}

bool HttpStreamScanner::next() {
Expand Down
3 changes: 3 additions & 0 deletions src/util/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,6 @@ bool isValidID(const char *idString) {
return true;
}

const char *bool2string(bool val) {
return (val?"true":"false");
}
3 changes: 2 additions & 1 deletion src/util/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ String escapeValue(const char *value);
String urlEncode(const char* src);
// Returns true of string contains valid InfluxDB ID type
bool isValidID(const char *idString);

// Return "true" if val is true, otherwise "false"
const char *bool2string(bool val);

#endif //_INFLUXDB_CLIENT_HELPERS_H
41 changes: 41 additions & 0 deletions test/Test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ void Test::run() {
testFluxParserMissingDatatype();
testFluxParserErrorInRow();
testBasicFunction();
testFlushing();
testInit();
testRepeatedInit();
testV1();
Expand Down Expand Up @@ -2076,6 +2077,46 @@ void Test::testBuckets() {
TEST_END();
}

void Test::testFlushing() {
TEST_INIT("testFlushing");
InfluxDBClient client(Test::apiUrl, Test::orgName, Test::bucketName, Test::token);
TEST_ASSERT(waitServer(Test::managementUrl, true));
TEST_ASSERT(client.validateConnection());
TEST_ASSERT(!client.isBufferFull());
TEST_ASSERT(client.isBufferEmpty());
client.setWriteOptions(WriteOptions().batchSize(10).bufferSize(30).flushInterval(2));

for (int i = 0; i < 5; i++) {
Point *p = createPoint("test1");
p->addField("index", i);
TEST_ASSERT(client.writePoint(*p));
delete p;
}
TEST_ASSERT(!client.isBufferFull());
TEST_ASSERT(!client.isBufferEmpty());
client.checkBuffer();
TEST_ASSERT(!client.isBufferFull());
TEST_ASSERT(!client.isBufferEmpty());
String query = "select";
FluxQueryResult q = client.query(query);
int count = countLines(q);
TEST_ASSERTM(q.getError()=="", q.getError());
TEST_ASSERTM( count == 0, String(count) + " vs 0"); //5 points

delay(2100);
client.checkBuffer();
TEST_ASSERT(!client.isBufferFull());
TEST_ASSERT(client.isBufferEmpty());

q = client.query(query);
count = countLines(q);
TEST_ASSERTM(q.getError()=="", q.getError());
TEST_ASSERTM( count == 5, String(count) + " vs 0"); //5 points

TEST_END();
deleteAll(Test::apiUrl);
}

void Test::setServerUrl(InfluxDBClient &client, String serverUrl) {
client._connInfo.serverUrl = serverUrl;
client._service->_apiURL = serverUrl + "/api/v2/";
Expand Down
1 change: 1 addition & 0 deletions test/Test.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class Test : public TestBase {
static void testRepeatedInit();
static void testIsValidID();
static void testBuckets();
static void testFlushing();
};

#endif //_TEST_H_