diff --git a/CHANGELOG.md b/CHANGELOG.md index 3af1b89..7a934b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### Fixes - [#150](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/150) - `HTTPOptions::httpReadTimeout` is also set as the connect timeout for HTTP connection on ESP32. It doesn't work for HTTPS connection yet. - [#156](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/156) - Correctly rounding _writeBufferSize_, when _bufferSize/batchSize >= 256_. + - [#162](https://github.com/tobiasschuerg/InfluxDB-Client-for-Arduino/pull/162) - Fixed flushing of not full buffer after to timeout. ## 3.8.0 [2021-04-01] ### Features diff --git a/src/InfluxDbClient.cpp b/src/InfluxDbClient.cpp index cd80112..0b36ef9 100644 --- a/src/InfluxDbClient.cpp +++ b/src/InfluxDbClient.cpp @@ -98,7 +98,7 @@ bool InfluxDBClient::init() { INFLUXDB_CLIENT_DEBUG("[D] Org: %s\n", _connInfo.org.c_str()); INFLUXDB_CLIENT_DEBUG("[D] Bucket: %s\n", _connInfo.bucket.c_str()); INFLUXDB_CLIENT_DEBUG("[D] Token: %s\n", _connInfo.authToken.c_str()); - INFLUXDB_CLIENT_DEBUG("[D] DB version: %d\n", _dbVersion); + INFLUXDB_CLIENT_DEBUG("[D] DB version: %d\n", _connInfo.dbVersion); if(_connInfo.serverUrl.length() == 0 || (_connInfo.dbVersion == 2 && (_connInfo.org.length() == 0 || _connInfo.bucket.length() == 0 || _connInfo.authToken.length() == 0))) { INFLUXDB_CLIENT_DEBUG("[E] Invalid parameters\n"); _connInfo.lastError = F("Invalid parameters"); @@ -137,7 +137,7 @@ void InfluxDBClient::clean() { _service = nullptr; } _buckets = nullptr; - _lastFlushed = 0; + _lastFlushed = millis(); _retryTime = 0; } @@ -359,11 +359,14 @@ bool InfluxDBClient::checkBuffer() { // in case we (over)reach batchSize with non full buffer bool bufferReachedBatchsize = _writeBuffer[_batchPointer] && _writeBuffer[_batchPointer]->isFull(); // or flush interval timed out - bool flushTimeout = _writeOptions._flushInterval > 0 && _lastFlushed > 0 && (millis()/1000 - _lastFlushed) > _writeOptions._flushInterval; + bool flushTimeout = _writeOptions._flushInterval > 0 && ((millis() - _lastFlushed)/1000) >= _writeOptions._flushInterval; + INFLUXDB_CLIENT_DEBUG("[D] Flushing buffer: is oversized %s, is timeout %s, is buffer full %s\n", + bool2string(bufferReachedBatchsize),bool2string(flushTimeout), bool2string(isBufferFull())); + if(bufferReachedBatchsize || flushTimeout || isBufferFull() ) { - INFLUXDB_CLIENT_DEBUG("[D] Flushing buffer: is oversized %s, is timeout %s, is buffer full %s\n", bufferReachedBatchsize?"true":"false",flushTimeout?"true":"false", isBufferFull()?"true":"false"); - return flushBufferInternal(true); + + return flushBufferInternal(!flushTimeout); } return true; } @@ -412,7 +415,7 @@ bool InfluxDBClient::flushBufferInternal(bool flashOnlyFull) { success = statusCode >= 200 && statusCode < 300; // advance even on message failure x e <300;429) if(success || !retry) { - _lastFlushed = millis()/1000; + _lastFlushed = millis(); dropCurrentBatch(); } else if(retry) { _writeBuffer[_batchPointer]->retryCount++; @@ -542,7 +545,7 @@ FluxQueryResult InfluxDBClient::query(String fluxQuery) { String header = httpClient->header(TransferEncoding); chunked = header.equalsIgnoreCase("chunked"); } - INFLUXDB_CLIENT_DEBUG("[D] chunked: %s\n", chunked?"true":"false"); + INFLUXDB_CLIENT_DEBUG("[D] chunked: %s\n", bool2string(chunked)); HttpStreamScanner *scanner = new HttpStreamScanner(httpClient, chunked); reader = new CsvReader(scanner); return false; diff --git a/src/InfluxDbClient.h b/src/InfluxDbClient.h index 8e5ccf6..130e23b 100644 --- a/src/InfluxDbClient.h +++ b/src/InfluxDbClient.h @@ -200,7 +200,7 @@ class InfluxDBClient { // Index of bath start for next write uint8_t _batchPointer = 0; // Last time in sec buffer has been successfully flushed - uint32_t _lastFlushed = 0; + uint32_t _lastFlushed; // Bucket sub-client BucketsClient _buckets; protected: diff --git a/src/Point.h b/src/Point.h index f9d81a3..0231f4f 100644 --- a/src/Point.h +++ b/src/Point.h @@ -29,6 +29,7 @@ #include #include "WritePrecision.h" +#include "util/helpers.h" /** * Class Point represents InfluxDB point in line protocol. @@ -49,7 +50,7 @@ friend class InfluxDBClient; void addField(String name, unsigned int value) { putField(name, String(value)+"i"); } void addField(String name, long value) { putField(name, String(value)+"i"); } void addField(String name, unsigned long value) { putField(name, String(value)+"i"); } - void addField(String name, bool value) { putField(name,value?"true":"false"); } + void addField(String name, bool value) { putField(name, bool2string(value)); } void addField(String name, String value) { addField(name, value.c_str()); } void addField(String name, const char *value); // Set timestamp to `now()` and store it in specified precision, nanoseconds by default. Date and time must be already set. See `configTime` in the device API diff --git a/src/query/HttpStreamScanner.cpp b/src/query/HttpStreamScanner.cpp index a3e6aa2..a9eb3a3 100644 --- a/src/query/HttpStreamScanner.cpp +++ b/src/query/HttpStreamScanner.cpp @@ -29,6 +29,7 @@ // Uncomment bellow in case of a problem and rebuild sketch //#define INFLUXDB_CLIENT_DEBUG_ENABLE #include "util/debug.h" +#include "util/helpers.h" HttpStreamScanner::HttpStreamScanner(HTTPClient *client, bool chunked) { @@ -37,7 +38,7 @@ HttpStreamScanner::HttpStreamScanner(HTTPClient *client, bool chunked) _chunked = chunked; _chunkHeader = chunked; _len = client->getSize(); - INFLUXDB_CLIENT_DEBUG("[D] HttpStreamScanner: chunked: %s, size: %d\n", _chunked?"true":"false", _len); + INFLUXDB_CLIENT_DEBUG("[D] HttpStreamScanner: chunked: %s, size: %d\n", bool2string(_chunked), _len); } bool HttpStreamScanner::next() { diff --git a/src/util/helpers.cpp b/src/util/helpers.cpp index 77b22cd..dfb3f7f 100644 --- a/src/util/helpers.cpp +++ b/src/util/helpers.cpp @@ -161,3 +161,6 @@ bool isValidID(const char *idString) { return true; } +const char *bool2string(bool val) { + return (val?"true":"false"); +} diff --git a/src/util/helpers.h b/src/util/helpers.h index c5110a2..506369c 100644 --- a/src/util/helpers.h +++ b/src/util/helpers.h @@ -50,6 +50,7 @@ String escapeValue(const char *value); String urlEncode(const char* src); // Returns true of string contains valid InfluxDB ID type bool isValidID(const char *idString); - +// Return "true" if val is true, otherwise "false" +const char *bool2string(bool val); #endif //_INFLUXDB_CLIENT_HELPERS_H \ No newline at end of file diff --git a/test/Test.cpp b/test/Test.cpp index b7d9a88..57dddea 100644 --- a/test/Test.cpp +++ b/test/Test.cpp @@ -56,6 +56,7 @@ void Test::run() { testFluxParserMissingDatatype(); testFluxParserErrorInRow(); testBasicFunction(); + testFlushing(); testInit(); testRepeatedInit(); testV1(); @@ -2076,6 +2077,46 @@ void Test::testBuckets() { TEST_END(); } +void Test::testFlushing() { + TEST_INIT("testFlushing"); + InfluxDBClient client(Test::apiUrl, Test::orgName, Test::bucketName, Test::token); + TEST_ASSERT(waitServer(Test::managementUrl, true)); + TEST_ASSERT(client.validateConnection()); + TEST_ASSERT(!client.isBufferFull()); + TEST_ASSERT(client.isBufferEmpty()); + client.setWriteOptions(WriteOptions().batchSize(10).bufferSize(30).flushInterval(2)); + + for (int i = 0; i < 5; i++) { + Point *p = createPoint("test1"); + p->addField("index", i); + TEST_ASSERT(client.writePoint(*p)); + delete p; + } + TEST_ASSERT(!client.isBufferFull()); + TEST_ASSERT(!client.isBufferEmpty()); + client.checkBuffer(); + TEST_ASSERT(!client.isBufferFull()); + TEST_ASSERT(!client.isBufferEmpty()); + String query = "select"; + FluxQueryResult q = client.query(query); + int count = countLines(q); + TEST_ASSERTM(q.getError()=="", q.getError()); + TEST_ASSERTM( count == 0, String(count) + " vs 0"); //5 points + + delay(2100); + client.checkBuffer(); + TEST_ASSERT(!client.isBufferFull()); + TEST_ASSERT(client.isBufferEmpty()); + + q = client.query(query); + count = countLines(q); + TEST_ASSERTM(q.getError()=="", q.getError()); + TEST_ASSERTM( count == 5, String(count) + " vs 0"); //5 points + + TEST_END(); + deleteAll(Test::apiUrl); +} + void Test::setServerUrl(InfluxDBClient &client, String serverUrl) { client._connInfo.serverUrl = serverUrl; client._service->_apiURL = serverUrl + "/api/v2/"; diff --git a/test/Test.h b/test/Test.h index 6545813..a051f5a 100644 --- a/test/Test.h +++ b/test/Test.h @@ -70,6 +70,7 @@ class Test : public TestBase { static void testRepeatedInit(); static void testIsValidID(); static void testBuckets(); + static void testFlushing(); }; #endif //_TEST_H_ \ No newline at end of file