779 lines
27 KiB
C++
779 lines
27 KiB
C++
|
/**
|
||
|
*
|
||
|
* InfluxDBClient.cpp: InfluxDB Client for Arduino
|
||
|
*
|
||
|
* MIT License
|
||
|
*
|
||
|
* Copyright (c) 2020 InfluxData
|
||
|
*
|
||
|
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||
|
* of this software and associated documentation files (the "Software"), to deal
|
||
|
* in the Software without restriction, including without limitation the rights
|
||
|
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||
|
* copies of the Software, and to permit persons to whom the Software is
|
||
|
* furnished to do so, subject to the following conditions:
|
||
|
*
|
||
|
* The above copyright notice and this permission notice shall be included in all
|
||
|
* copies or substantial portions of the Software.
|
||
|
*
|
||
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||
|
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||
|
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||
|
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||
|
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||
|
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||
|
* SOFTWARE.
|
||
|
*/
|
||
|
#include "InfluxDbClient.h"
|
||
|
#include "Platform.h"
|
||
|
#include "Version.h"
|
||
|
|
||
|
#include "util/debug.h"
|
||
|
|
||
|
static const char TooEarlyMessage[] PROGMEM = "Cannot send request yet because of applied retry strategy. Remaining ";
|
||
|
|
||
|
static String escapeJSONString(const String &value);
|
||
|
static String precisionToString(WritePrecision precision, uint8_t version = 2) {
|
||
|
switch(precision) {
|
||
|
case WritePrecision::US:
|
||
|
return version==1?"u":"us";
|
||
|
case WritePrecision::MS:
|
||
|
return "ms";
|
||
|
case WritePrecision::NS:
|
||
|
return "ns";
|
||
|
case WritePrecision::S:
|
||
|
return "s";
|
||
|
default:
|
||
|
return "";
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
InfluxDBClient::InfluxDBClient() {
|
||
|
resetBuffer();
|
||
|
}
|
||
|
|
||
|
InfluxDBClient::InfluxDBClient(const String &serverUrl, const String &db):InfluxDBClient() {
|
||
|
setConnectionParamsV1(serverUrl, db);
|
||
|
}
|
||
|
|
||
|
InfluxDBClient::InfluxDBClient(const String &serverUrl, const String &org, const String &bucket, const String &authToken):InfluxDBClient(serverUrl, org, bucket, authToken, nullptr) {
|
||
|
}
|
||
|
|
||
|
InfluxDBClient::InfluxDBClient(const String &serverUrl, const String &org, const String &bucket, const String &authToken, const char *serverCert):InfluxDBClient() {
|
||
|
setConnectionParams(serverUrl, org, bucket, authToken, serverCert);
|
||
|
}
|
||
|
|
||
|
void InfluxDBClient::setInsecure(bool value){
|
||
|
_connInfo.insecure = value;
|
||
|
}
|
||
|
|
||
|
void InfluxDBClient::setConnectionParams(const String &serverUrl, const String &org, const String &bucket, const String &authToken, const char *certInfo) {
|
||
|
clean();
|
||
|
_connInfo.serverUrl = serverUrl;
|
||
|
_connInfo.bucket = bucket;
|
||
|
_connInfo.org = org;
|
||
|
_connInfo.authToken = authToken;
|
||
|
_connInfo.certInfo = certInfo;
|
||
|
_connInfo.dbVersion = 2;
|
||
|
}
|
||
|
|
||
|
void InfluxDBClient::setConnectionParamsV1(const String &serverUrl, const String &db, const String &user, const String &password, const char *certInfo) {
|
||
|
clean();
|
||
|
_connInfo.serverUrl = serverUrl;
|
||
|
_connInfo.bucket = db;
|
||
|
_connInfo.user = user;
|
||
|
_connInfo.password = password;
|
||
|
_connInfo.certInfo = certInfo;
|
||
|
_connInfo.dbVersion = 1;
|
||
|
}
|
||
|
|
||
|
bool InfluxDBClient::init() {
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] Init\n");
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] Library version: " INFLUXDB_CLIENT_VERSION "\n");
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] Device : " INFLUXDB_CLIENT_PLATFORM "\n");
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] SDK version: " INFLUXDB_CLIENT_PLATFORM_VERSION "\n");
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] Server url: %s\n", _connInfo.serverUrl.c_str());
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] Org: %s\n", _connInfo.org.c_str());
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] Bucket: %s\n", _connInfo.bucket.c_str());
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] Token: %s\n", _connInfo.authToken.c_str());
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] DB version: %d\n", _connInfo.dbVersion);
|
||
|
if(_connInfo.serverUrl.length() == 0 || (_connInfo.dbVersion == 2 && (_connInfo.org.length() == 0 || _connInfo.bucket.length() == 0 || _connInfo.authToken.length() == 0))) {
|
||
|
INFLUXDB_CLIENT_DEBUG("[E] Invalid parameters\n");
|
||
|
_connInfo.lastError = F("Invalid parameters");
|
||
|
return false;
|
||
|
}
|
||
|
if(_connInfo.serverUrl.endsWith("/")) {
|
||
|
_connInfo.serverUrl = _connInfo.serverUrl.substring(0,_connInfo.serverUrl.length()-1);
|
||
|
}
|
||
|
if(!_connInfo.serverUrl.startsWith("http")) {
|
||
|
_connInfo.lastError = F("Invalid URL scheme");
|
||
|
return false;
|
||
|
}
|
||
|
_service = new HTTPService(&_connInfo);
|
||
|
|
||
|
setUrls();
|
||
|
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
InfluxDBClient::~InfluxDBClient() {
|
||
|
if(_writeBuffer) {
|
||
|
for(int i=0;i<_writeBufferSize;i++) {
|
||
|
delete _writeBuffer[i];
|
||
|
}
|
||
|
delete [] _writeBuffer;
|
||
|
_writeBuffer = nullptr;
|
||
|
_bufferPointer = 0;
|
||
|
_batchPointer = 0;
|
||
|
_bufferCeiling = 0;
|
||
|
}
|
||
|
clean();
|
||
|
}
|
||
|
|
||
|
void InfluxDBClient::clean() {
|
||
|
if(_service) {
|
||
|
delete _service;
|
||
|
_service = nullptr;
|
||
|
}
|
||
|
_buckets = nullptr;
|
||
|
_lastFlushed = millis();
|
||
|
_retryTime = 0;
|
||
|
}
|
||
|
|
||
|
bool InfluxDBClient::setUrls() {
|
||
|
if(!_service && !init()) {
|
||
|
return false;
|
||
|
}
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] setUrls\n");
|
||
|
if( _connInfo.dbVersion == 2) {
|
||
|
_writeUrl = _service->getServerAPIURL();
|
||
|
_writeUrl += "write?org=";
|
||
|
_writeUrl += urlEncode(_connInfo.org.c_str());
|
||
|
_writeUrl += "&bucket=";
|
||
|
_writeUrl += urlEncode(_connInfo.bucket.c_str());
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] writeUrl: %s\n", _writeUrl.c_str());
|
||
|
_queryUrl = _service->getServerAPIURL();;
|
||
|
_queryUrl += "query?org=";
|
||
|
_queryUrl += urlEncode(_connInfo.org.c_str());
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] queryUrl: %s\n", _queryUrl.c_str());
|
||
|
} else {
|
||
|
_writeUrl = _connInfo.serverUrl;
|
||
|
_writeUrl += "/write?db=";
|
||
|
_writeUrl += urlEncode(_connInfo.bucket.c_str());
|
||
|
_queryUrl = _connInfo.serverUrl;
|
||
|
_queryUrl += "/api/v2/query";
|
||
|
if(_connInfo.user.length() > 0 && _connInfo.password.length() > 0) {
|
||
|
String auth = "&u=";
|
||
|
auth += urlEncode(_connInfo.user.c_str());
|
||
|
auth += "&p=";
|
||
|
auth += urlEncode(_connInfo.password.c_str());
|
||
|
_writeUrl += auth;
|
||
|
_queryUrl += "?";
|
||
|
_queryUrl += auth;
|
||
|
}
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] writeUrl: %s\n", _writeUrl.c_str());
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] queryUrl: %s\n", _queryUrl.c_str());
|
||
|
}
|
||
|
if(_writeOptions._writePrecision != WritePrecision::NoTime) {
|
||
|
_writeUrl += "&precision=";
|
||
|
_writeUrl += precisionToString(_writeOptions._writePrecision, _connInfo.dbVersion);
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] writeUrl: %s\n", _writeUrl.c_str());
|
||
|
}
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
bool InfluxDBClient::setWriteOptions(WritePrecision precision, uint16_t batchSize, uint16_t bufferSize, uint16_t flushInterval, bool preserveConnection) {
|
||
|
if(!_service && !init()) {
|
||
|
return false;
|
||
|
}
|
||
|
if(!setWriteOptions(WriteOptions().writePrecision(precision).batchSize(batchSize).bufferSize(bufferSize).flushInterval(flushInterval))) {
|
||
|
return false;
|
||
|
}
|
||
|
if(!setHTTPOptions(_service->getHTTPOptions().connectionReuse(preserveConnection))) {
|
||
|
return false;
|
||
|
}
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
bool InfluxDBClient::setWriteOptions(const WriteOptions & writeOptions) {
|
||
|
if(_writeOptions._writePrecision != writeOptions._writePrecision) {
|
||
|
_writeOptions._writePrecision = writeOptions._writePrecision;
|
||
|
if(!setUrls()) {
|
||
|
return false;
|
||
|
}
|
||
|
}
|
||
|
bool writeBufferSizeChanges = false;
|
||
|
if(writeOptions._batchSize > 0 && _writeOptions._batchSize != writeOptions._batchSize) {
|
||
|
_writeOptions._batchSize = writeOptions._batchSize;
|
||
|
writeBufferSizeChanges = true;
|
||
|
}
|
||
|
if(writeOptions._bufferSize > 0 && _writeOptions._bufferSize != writeOptions._bufferSize) {
|
||
|
_writeOptions._bufferSize = writeOptions._bufferSize;
|
||
|
if(_writeOptions._bufferSize < 2*_writeOptions._batchSize) {
|
||
|
_writeOptions._bufferSize = 2*_writeOptions._batchSize;
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] Changing buffer size to %d\n", _writeOptions._bufferSize);
|
||
|
}
|
||
|
writeBufferSizeChanges = true;
|
||
|
}
|
||
|
if(writeBufferSizeChanges) {
|
||
|
resetBuffer();
|
||
|
}
|
||
|
_writeOptions._flushInterval = writeOptions._flushInterval;
|
||
|
_writeOptions._retryInterval = writeOptions._retryInterval;
|
||
|
_writeOptions._maxRetryInterval = writeOptions._maxRetryInterval;
|
||
|
_writeOptions._maxRetryAttempts = writeOptions._maxRetryAttempts;
|
||
|
_writeOptions._defaultTags = writeOptions._defaultTags;
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
bool InfluxDBClient::setHTTPOptions(const HTTPOptions & httpOptions) {
|
||
|
if(!_service && !init()) {
|
||
|
return false;
|
||
|
}
|
||
|
_service->setHTTPOptions(httpOptions);
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
BucketsClient InfluxDBClient::getBucketsClient() {
|
||
|
if(!_service && !init()) {
|
||
|
return BucketsClient();
|
||
|
}
|
||
|
if(!_buckets) {
|
||
|
_buckets = BucketsClient(&_connInfo, _service);
|
||
|
}
|
||
|
return _buckets;
|
||
|
}
|
||
|
|
||
|
void InfluxDBClient::resetBuffer() {
|
||
|
if(_writeBuffer) {
|
||
|
for(int i=0;i<_writeBufferSize;i++) {
|
||
|
delete _writeBuffer[i];
|
||
|
}
|
||
|
delete [] _writeBuffer;
|
||
|
}
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] Reset buffer: buffer Size: %d, batch size: %d\n", _writeOptions._bufferSize, _writeOptions._batchSize);
|
||
|
uint16_t a = _writeOptions._bufferSize/_writeOptions._batchSize;
|
||
|
//limit to max(byte)
|
||
|
_writeBufferSize = a>=(1<<8)?(1<<8)-1:a;
|
||
|
if(_writeBufferSize < 2) {
|
||
|
_writeBufferSize = 2;
|
||
|
}
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] Reset buffer: writeBuffSize: %d\n", _writeBufferSize);
|
||
|
_writeBuffer = new Batch*[_writeBufferSize];
|
||
|
for(int i=0;i<_writeBufferSize;i++) {
|
||
|
_writeBuffer[i] = nullptr;
|
||
|
}
|
||
|
_bufferPointer = 0;
|
||
|
_batchPointer = 0;
|
||
|
_bufferCeiling = 0;
|
||
|
}
|
||
|
|
||
|
void InfluxDBClient::reserveBuffer(int size) {
|
||
|
if(size > _writeBufferSize) {
|
||
|
Batch **newBuffer = new Batch*[size];
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] Resizing buffer from %d to %d\n",_writeBufferSize, size);
|
||
|
for(int i=0;i<_bufferCeiling; i++) {
|
||
|
newBuffer[i] = _writeBuffer[i];
|
||
|
}
|
||
|
|
||
|
delete [] _writeBuffer;
|
||
|
_writeBuffer = newBuffer;
|
||
|
_writeBufferSize = size;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
void InfluxDBClient::addZerosToTimestamp(Point &point, int zeroes) {
|
||
|
char *ts = point._timestamp, *s;
|
||
|
point._timestamp = new char[strlen(point._timestamp) + 1 + zeroes];
|
||
|
strcpy(point._timestamp, ts);
|
||
|
s = point._timestamp+strlen(ts);
|
||
|
for(int i=0;i<zeroes;i++) {
|
||
|
*s++ = '0';
|
||
|
}
|
||
|
*s = 0;
|
||
|
delete [] ts;
|
||
|
}
|
||
|
|
||
|
void InfluxDBClient::checkPrecisions(Point & point) {
|
||
|
if(_writeOptions._writePrecision != WritePrecision::NoTime) {
|
||
|
if(!point.hasTime()) {
|
||
|
point.setTime(_writeOptions._writePrecision);
|
||
|
// Check different write precisions
|
||
|
} else if(point._tsWritePrecision != WritePrecision::NoTime && point._tsWritePrecision != _writeOptions._writePrecision) {
|
||
|
int diff = int(point._tsWritePrecision) - int(_writeOptions._writePrecision);
|
||
|
if(diff > 0) { //point has higher precision, cut
|
||
|
point._timestamp[strlen(point._timestamp)-diff*3] = 0;
|
||
|
} else { //point has lower precision, add zeroes
|
||
|
addZerosToTimestamp(point, diff*-3);
|
||
|
}
|
||
|
}
|
||
|
// check someone set WritePrecision on point and not on client. NS precision is ok, cause it is default on server
|
||
|
} else if(point.hasTime() && point._tsWritePrecision != WritePrecision::NoTime && point._tsWritePrecision != WritePrecision::NS) {
|
||
|
int diff = int(WritePrecision::NS) - int(point._tsWritePrecision);
|
||
|
addZerosToTimestamp(point, diff*3);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
bool InfluxDBClient::writePoint(Point & point) {
|
||
|
if (point.hasFields()) {
|
||
|
checkPrecisions(point);
|
||
|
String line = pointToLineProtocol(point);
|
||
|
return writeRecord(line);
|
||
|
}
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
InfluxDBClient::Batch::Batch(uint16_t size):_size(size) {
|
||
|
buffer = new char*[size];
|
||
|
for(int i=0;i< _size; i++) {
|
||
|
buffer[i] = nullptr;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
InfluxDBClient::Batch::~Batch() {
|
||
|
clear();
|
||
|
delete [] buffer;
|
||
|
buffer = nullptr;
|
||
|
}
|
||
|
|
||
|
void InfluxDBClient::Batch::clear() {
|
||
|
for(int i=0;i< _size; i++) {
|
||
|
free(buffer[i]);
|
||
|
buffer[i] = nullptr;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
bool InfluxDBClient::Batch::append(const char *line) {
|
||
|
if(pointer == _size) {
|
||
|
//overwriting, clean buffer
|
||
|
clear();
|
||
|
pointer = 0;
|
||
|
}
|
||
|
buffer[pointer] = strdup(line);
|
||
|
++pointer;
|
||
|
return isFull();
|
||
|
}
|
||
|
|
||
|
char * InfluxDBClient::Batch::createData() {
|
||
|
int length = 0;
|
||
|
char *buff = nullptr;
|
||
|
for(int c=0; c < pointer; c++) {
|
||
|
length += strlen(buffer[c]);
|
||
|
yield();
|
||
|
}
|
||
|
//create buffer for all lines including new line char and terminating char
|
||
|
if(length) {
|
||
|
buff = new char[length + pointer + 1];
|
||
|
if(buff) {
|
||
|
buff[0] = 0;
|
||
|
for(int c=0; c < pointer; c++) {
|
||
|
strcat(buff+strlen(buff), buffer[c]);
|
||
|
strcat(buff+strlen(buff), "\n");
|
||
|
yield();
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return buff;
|
||
|
}
|
||
|
|
||
|
bool InfluxDBClient::writeRecord(const String &record) {
|
||
|
return writeRecord(record.c_str());
|
||
|
}
|
||
|
|
||
|
bool InfluxDBClient::writeRecord(const char *record) {
|
||
|
if(!_writeBuffer[_bufferPointer]) {
|
||
|
_writeBuffer[_bufferPointer] = new Batch(_writeOptions._batchSize);
|
||
|
}
|
||
|
if(isBufferFull() && _batchPointer <= _bufferPointer) {
|
||
|
// When we are overwriting buffer and nothing is written, batchPointer must point to the oldest point
|
||
|
_batchPointer = _bufferPointer+1;
|
||
|
if(_batchPointer == _writeBufferSize) {
|
||
|
_batchPointer = 0;
|
||
|
}
|
||
|
}
|
||
|
if(_writeBuffer[_bufferPointer]->append(record)) { //we reached batch size
|
||
|
_bufferPointer++;
|
||
|
if(_bufferPointer == _writeBufferSize) { // writeBuffer is full
|
||
|
_bufferPointer = 0;
|
||
|
INFLUXDB_CLIENT_DEBUG("[W] Reached write buffer size, old points will be overwritten\n");
|
||
|
}
|
||
|
|
||
|
if(_bufferCeiling < _writeBufferSize) {
|
||
|
_bufferCeiling++;
|
||
|
}
|
||
|
}
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] writeRecord: bufferPointer: %d, batchPointer: %d, _bufferCeiling: %d\n", _bufferPointer, _batchPointer, _bufferCeiling);
|
||
|
return checkBuffer();
|
||
|
}
|
||
|
|
||
|
bool InfluxDBClient::checkBuffer() {
|
||
|
// in case we (over)reach batchSize with non full buffer
|
||
|
bool bufferReachedBatchsize = _writeBuffer[_batchPointer] && _writeBuffer[_batchPointer]->isFull();
|
||
|
// or flush interval timed out
|
||
|
bool flushTimeout = _writeOptions._flushInterval > 0 && ((millis() - _lastFlushed)/1000) >= _writeOptions._flushInterval;
|
||
|
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] Flushing buffer: is oversized %s, is timeout %s, is buffer full %s\n",
|
||
|
bool2string(bufferReachedBatchsize),bool2string(flushTimeout), bool2string(isBufferFull()));
|
||
|
|
||
|
if(bufferReachedBatchsize || flushTimeout || isBufferFull() ) {
|
||
|
|
||
|
return flushBufferInternal(!flushTimeout);
|
||
|
}
|
||
|
return true;
|
||
|
}
|
||
|
|
||
|
bool InfluxDBClient::flushBuffer() {
|
||
|
return flushBufferInternal(false);
|
||
|
}
|
||
|
|
||
|
uint32_t InfluxDBClient::getRemainingRetryTime() {
|
||
|
uint32_t rem = 0;
|
||
|
if(_retryTime > 0) {
|
||
|
int32_t diff = _retryTime - (millis()-_service->getLastRequestTime())/1000;
|
||
|
rem = diff<0?0:(uint32_t)diff;
|
||
|
}
|
||
|
return rem;
|
||
|
}
|
||
|
|
||
|
bool InfluxDBClient::flushBufferInternal(bool flashOnlyFull) {
|
||
|
uint32_t rwt = getRemainingRetryTime();
|
||
|
if(rwt > 0) {
|
||
|
INFLUXDB_CLIENT_DEBUG("[W] Cannot write yet, pause %ds, %ds yet\n", _retryTime, rwt);
|
||
|
// retry after period didn't run out yet
|
||
|
_connInfo.lastError = FPSTR(TooEarlyMessage);
|
||
|
_connInfo.lastError += String(rwt);
|
||
|
_connInfo.lastError += "s";
|
||
|
return false;
|
||
|
}
|
||
|
char *data;
|
||
|
bool success = true;
|
||
|
// send all batches, It could happen there was long network outage and buffer is full
|
||
|
while(_writeBuffer[_batchPointer] && (!flashOnlyFull || _writeBuffer[_batchPointer]->isFull())) {
|
||
|
if(!_writeBuffer[_batchPointer]->isFull() && _writeBuffer[_batchPointer]->retryCount == 0 ) { //do not increase pointer in case of retrying
|
||
|
// points will be written so increase _bufferPointer as it happen when buffer is flushed when is full
|
||
|
if(++_bufferPointer == _writeBufferSize) {
|
||
|
_bufferPointer = 0;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] Writing batch, batchpointer: %d, size %d\n", _batchPointer, _writeBuffer[_batchPointer]->pointer);
|
||
|
if(!_writeBuffer[_batchPointer]->isEmpty()) {
|
||
|
int statusCode = 0;
|
||
|
if(_streamWrite) {
|
||
|
statusCode = postData(_writeBuffer[_batchPointer]);
|
||
|
} else {
|
||
|
data = _writeBuffer[_batchPointer]->createData();
|
||
|
statusCode = postData(data);
|
||
|
delete [] data;
|
||
|
}
|
||
|
// retry on unsuccessfull connection or retryable status codes
|
||
|
bool retry = (statusCode < 0 || statusCode >= 429) && _writeOptions._maxRetryAttempts > 0;
|
||
|
success = statusCode >= 200 && statusCode < 300;
|
||
|
// advance even on message failure x e <300;429)
|
||
|
if(success || !retry) {
|
||
|
_lastFlushed = millis();
|
||
|
dropCurrentBatch();
|
||
|
} else if(retry) {
|
||
|
_writeBuffer[_batchPointer]->retryCount++;
|
||
|
if(statusCode > 0) { //apply retry strategy only in case of HTTP errors
|
||
|
if(_writeBuffer[_batchPointer]->retryCount > _writeOptions._maxRetryAttempts) {
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] Reached max retry count, dropping batch\n");
|
||
|
dropCurrentBatch();
|
||
|
}
|
||
|
if(!_retryTime) {
|
||
|
_retryTime = _writeOptions._retryInterval;
|
||
|
if(_writeBuffer[_batchPointer]) {
|
||
|
for(int i=1;i<_writeBuffer[_batchPointer]->retryCount;i++) {
|
||
|
_retryTime *= _writeOptions._retryInterval;
|
||
|
}
|
||
|
if(_retryTime > _writeOptions._maxRetryInterval) {
|
||
|
_retryTime = _writeOptions._maxRetryInterval;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] Leaving data in buffer for retry, retryInterval: %d\n",_retryTime);
|
||
|
// in case of retryable failure break loop
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
yield();
|
||
|
}
|
||
|
//Have we emptied the buffer?
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] Success: %d, _bufferPointer: %d, _batchPointer: %d, _writeBuffer[_bufferPointer]_%p\n",success,_bufferPointer,_batchPointer, _writeBuffer[_bufferPointer]);
|
||
|
if(_batchPointer == _bufferPointer && !_writeBuffer[_bufferPointer]) {
|
||
|
_bufferPointer = 0;
|
||
|
_batchPointer = 0;
|
||
|
_bufferCeiling = 0;
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] Buffer empty\n");
|
||
|
}
|
||
|
return success;
|
||
|
}
|
||
|
|
||
|
void InfluxDBClient::dropCurrentBatch() {
|
||
|
delete _writeBuffer[_batchPointer];
|
||
|
_writeBuffer[_batchPointer] = nullptr;
|
||
|
_batchPointer++;
|
||
|
//did we got over top?
|
||
|
if(_batchPointer == _writeBufferSize) {
|
||
|
// restart _batchPointer in ring buffer from start
|
||
|
_batchPointer = 0;
|
||
|
// we reached buffer size, that means buffer was full and now lower ceiling
|
||
|
_bufferCeiling = _bufferPointer;
|
||
|
}
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] Dropped batch, batchpointer: %d\n", _batchPointer);
|
||
|
}
|
||
|
|
||
|
String InfluxDBClient::pointToLineProtocol(const Point& point) {
|
||
|
return point.createLineProtocol(_writeOptions._defaultTags);
|
||
|
}
|
||
|
|
||
|
bool InfluxDBClient::validateConnection() {
|
||
|
if(!_service && !init()) {
|
||
|
return false;
|
||
|
}
|
||
|
// on version 1.x /ping will by default return status code 204, without verbose
|
||
|
String url = _connInfo.serverUrl + (_connInfo.dbVersion==2?"/health":"/ping?verbose=true");
|
||
|
if(_connInfo.dbVersion==1 && _connInfo.user.length() > 0 && _connInfo.password.length() > 0) {
|
||
|
url += "&u=";
|
||
|
url += urlEncode(_connInfo.user.c_str());
|
||
|
url += "&p=";
|
||
|
url += urlEncode(_connInfo.password.c_str());
|
||
|
}
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] Validating connection to %s\n", url.c_str());
|
||
|
|
||
|
bool ret = _service->doGET(url.c_str(), 200, nullptr);
|
||
|
if(!ret) {
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] error %d: %s\n", _service->getLastStatusCode(), _service->getLastErrorMessage().c_str());
|
||
|
}
|
||
|
return ret;
|
||
|
}
|
||
|
|
||
|
int InfluxDBClient::postData(const char *data) {
|
||
|
if(!_service && !init()) {
|
||
|
return 0;
|
||
|
}
|
||
|
if(data) {
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] Writing to %s\n", _writeUrl.c_str());
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] Sending:\n%s\n", data);
|
||
|
if(!_service->doPOST(_writeUrl.c_str(), data, PSTR("text/plain"), 204, nullptr)) {
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] error %d: %s\n", _service->getLastStatusCode(), _service->getLastErrorMessage().c_str());
|
||
|
}
|
||
|
_retryTime = _service->getLastRetryAfter();
|
||
|
return _service->getLastStatusCode();
|
||
|
}
|
||
|
return 0;
|
||
|
}
|
||
|
|
||
|
int InfluxDBClient::postData(Batch *batch) {
|
||
|
if(!_service && !init()) {
|
||
|
return 0;
|
||
|
}
|
||
|
|
||
|
BatchStreamer *bs = new BatchStreamer(batch);
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] Writing to %s\n", _writeUrl.c_str());
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] Sending:\n");
|
||
|
|
||
|
if(!_service->doPOST(_writeUrl.c_str(), bs, PSTR("text/plain"), 204, nullptr)) {
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] error %d: %s\n", _service->getLastStatusCode(), _service->getLastErrorMessage().c_str());
|
||
|
}
|
||
|
delete bs;
|
||
|
_retryTime = _service->getLastRetryAfter();
|
||
|
return _service->getLastStatusCode();
|
||
|
}
|
||
|
|
||
|
void InfluxDBClient::setStreamWrite(bool enable) {
|
||
|
_streamWrite = enable;
|
||
|
}
|
||
|
|
||
|
|
||
|
static const char QueryDialect[] PROGMEM = "\
|
||
|
\"dialect\": {\
|
||
|
\"annotations\": [\
|
||
|
\"datatype\"\
|
||
|
],\
|
||
|
\"dateTimeFormat\": \"RFC3339\",\
|
||
|
\"header\": true,\
|
||
|
\"delimiter\": \",\",\
|
||
|
\"commentPrefix\": \"#\"\
|
||
|
}";
|
||
|
|
||
|
static const char Params[] PROGMEM = ",\
|
||
|
\"params\": {";
|
||
|
|
||
|
FluxQueryResult InfluxDBClient::query(const String &fluxQuery) {
|
||
|
return query(fluxQuery, QueryParams());
|
||
|
}
|
||
|
|
||
|
FluxQueryResult InfluxDBClient::query(const String &fluxQuery, QueryParams params) {
|
||
|
uint32_t rwt = getRemainingRetryTime();
|
||
|
if(rwt > 0) {
|
||
|
INFLUXDB_CLIENT_DEBUG("[W] Cannot query yet, pause %ds, %ds yet\n", _retryTime, rwt);
|
||
|
// retry after period didn't run out yet
|
||
|
String mess = FPSTR(TooEarlyMessage);
|
||
|
mess += String(rwt);
|
||
|
mess += "s";
|
||
|
return FluxQueryResult(mess);
|
||
|
}
|
||
|
if(!_service && !init()) {
|
||
|
return FluxQueryResult(_connInfo.lastError);
|
||
|
}
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] Query to %s\n", _queryUrl.c_str());
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] JSON query:\n%s\n", fluxQuery.c_str());
|
||
|
|
||
|
String queryEsc = escapeJSONString(fluxQuery);
|
||
|
String body;
|
||
|
body.reserve(150 + queryEsc.length() + params.size()*30);
|
||
|
body = F("{\"type\":\"flux\",\"query\":\"");
|
||
|
body += queryEsc;
|
||
|
body += "\",";
|
||
|
body += FPSTR(QueryDialect);
|
||
|
if(params.size()) {
|
||
|
body += FPSTR(Params);
|
||
|
body += params.jsonString(0);
|
||
|
for(int i=1;i<params.size();i++) {
|
||
|
body +=",";
|
||
|
char *js = params.jsonString(i);
|
||
|
body += js;
|
||
|
delete [] js;
|
||
|
}
|
||
|
body += '}';
|
||
|
}
|
||
|
body += '}';
|
||
|
CsvReader *reader = nullptr;
|
||
|
_retryTime = 0;
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] Query: %s\n", body.c_str());
|
||
|
if(_service->doPOST(_queryUrl.c_str(), body.c_str(), PSTR("application/json"), 200, [&](HTTPClient *httpClient){
|
||
|
bool chunked = false;
|
||
|
if(httpClient->hasHeader(TransferEncoding)) {
|
||
|
String header = httpClient->header(TransferEncoding);
|
||
|
chunked = header.equalsIgnoreCase("chunked");
|
||
|
}
|
||
|
INFLUXDB_CLIENT_DEBUG("[D] chunked: %s\n", bool2string(chunked));
|
||
|
HttpStreamScanner *scanner = new HttpStreamScanner(httpClient, chunked);
|
||
|
reader = new CsvReader(scanner);
|
||
|
return false;
|
||
|
})) {
|
||
|
return FluxQueryResult(reader);
|
||
|
} else {
|
||
|
_retryTime = _service->getLastRetryAfter();
|
||
|
return FluxQueryResult(_service->getLastErrorMessage());
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
static String escapeJSONString(const String &value) {
|
||
|
String ret;
|
||
|
int d = 0;
|
||
|
int i,from = 0;
|
||
|
while((i = value.indexOf('"',from)) > -1) {
|
||
|
d++;
|
||
|
if(i == (int)value.length()-1) {
|
||
|
break;
|
||
|
}
|
||
|
from = i+1;
|
||
|
}
|
||
|
ret.reserve(value.length()+d); //most probably we will escape just double quotes
|
||
|
for (char c: value)
|
||
|
{
|
||
|
switch (c)
|
||
|
{
|
||
|
case '"': ret += "\\\""; break;
|
||
|
case '\\': ret += "\\\\"; break;
|
||
|
case '\b': ret += "\\b"; break;
|
||
|
case '\f': ret += "\\f"; break;
|
||
|
case '\n': ret += "\\n"; break;
|
||
|
case '\r': ret += "\\r"; break;
|
||
|
case '\t': ret += "\\t"; break;
|
||
|
default:
|
||
|
if (c <= '\x1f') {
|
||
|
ret += "\\u";
|
||
|
char buf[3 + 8 * sizeof(unsigned int)];
|
||
|
sprintf(buf, "\\u%04u", c);
|
||
|
ret += buf;
|
||
|
} else {
|
||
|
ret += c;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return ret;
|
||
|
}
|
||
|
|
||
|
InfluxDBClient::BatchStreamer::BatchStreamer(InfluxDBClient::Batch *batch) {
|
||
|
_batch = batch;
|
||
|
_read = 0;
|
||
|
_length = 0;
|
||
|
_pointer = 0;
|
||
|
_linePointer = 0;
|
||
|
for(uint16_t i=0;i<_batch->pointer;i++) {
|
||
|
_length += strlen(_batch->buffer[i])+1;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
int InfluxDBClient::BatchStreamer::available() {
|
||
|
return _length-_read;
|
||
|
}
|
||
|
|
||
|
int InfluxDBClient::BatchStreamer::availableForWrite() {
|
||
|
return 0;
|
||
|
}
|
||
|
|
||
|
#if defined(ESP8266)
|
||
|
int InfluxDBClient::BatchStreamer::read(uint8_t* buffer, size_t len) {
|
||
|
INFLUXDB_CLIENT_DEBUG("BatchStream::read %d\n", len);
|
||
|
return readBytes((char *)buffer, len);
|
||
|
}
|
||
|
#endif
|
||
|
size_t InfluxDBClient::BatchStreamer::readBytes(char* buffer, size_t len) {
|
||
|
|
||
|
INFLUXDB_CLIENT_DEBUG("BatchStream::readBytes %d\n", len);
|
||
|
unsigned int r=0;
|
||
|
for(unsigned int i=0;i<len;i++) {
|
||
|
if(available()) {
|
||
|
buffer[i] = read();
|
||
|
r++;
|
||
|
} else {
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
return r;
|
||
|
}
|
||
|
|
||
|
int InfluxDBClient::BatchStreamer::read() {
|
||
|
int r = peek();
|
||
|
if(r > 0) {
|
||
|
++_read;
|
||
|
++_linePointer;
|
||
|
if(!_batch->buffer[_pointer][_linePointer-1]) {
|
||
|
++_pointer;
|
||
|
_linePointer = 0;
|
||
|
}
|
||
|
}
|
||
|
return r;
|
||
|
}
|
||
|
|
||
|
int InfluxDBClient::BatchStreamer::peek() {
|
||
|
if(_pointer == _batch->pointer) {
|
||
|
//This should not happen
|
||
|
return -1;
|
||
|
}
|
||
|
|
||
|
int r;
|
||
|
if(!_batch->buffer[_pointer][_linePointer]) {
|
||
|
r = '\n';
|
||
|
} else {
|
||
|
r = _batch->buffer[_pointer][_linePointer];
|
||
|
}
|
||
|
return r;
|
||
|
}
|
||
|
|
||
|
size_t InfluxDBClient::BatchStreamer::write(uint8_t) {
|
||
|
return 0;
|
||
|
}
|