diff --git a/src/AsyncEventSource.cpp b/src/AsyncEventSource.cpp index f2914df54..80dac89ae 100644 --- a/src/AsyncEventSource.cpp +++ b/src/AsyncEventSource.cpp @@ -138,15 +138,16 @@ size_t AsyncEventSourceMessage::ack(size_t len, uint32_t time) { } size_t AsyncEventSourceMessage::send(AsyncClient *client) { + if (!client->canSend()) + return 0; const size_t len = _len - _sent; if(client->space() < len){ return 0; } size_t sent = client->add((const char *)_data, len); - if(client->canSend()) - client->send(); + client->send(); _sent += sent; - return sent; + return sent; } // Client @@ -159,7 +160,7 @@ AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, A _lastId = 0; if(request->hasHeader("Last-Event-ID")) _lastId = atoi(request->getHeader("Last-Event-ID")->value().c_str()); - + _client->setRxTimeout(0); _client->onError(NULL, NULL); _client->onAck([](void *r, AsyncClient* c, size_t len, uint32_t time){ (void)c; ((AsyncEventSourceClient*)(r))->_onAck(len, time); }, this); @@ -276,7 +277,7 @@ void AsyncEventSource::_addClient(AsyncEventSourceClient * client){ client->write((const char *)temp, 2053); free(temp); }*/ - + _clients.add(client); if(_connectcb) _connectcb(client); @@ -297,10 +298,10 @@ void AsyncEventSource::close(){ size_t AsyncEventSource::avgPacketsWaiting() const { if(_clients.isEmpty()) return 0; - + size_t aql=0; uint32_t nConnectedClients=0; - + for(const auto &c: _clients){ if(c->connected()) { aql+=c->packetsWaiting(); diff --git a/src/AsyncEventSource.h b/src/AsyncEventSource.h index b097fa623..81f2252bb 100644 --- a/src/AsyncEventSource.h +++ b/src/AsyncEventSource.h @@ -20,14 +20,18 @@ #ifndef ASYNCEVENTSOURCE_H_ #define ASYNCEVENTSOURCE_H_ +#include #include #ifdef ESP32 #include -#define SSE_MAX_QUEUED_MESSAGES 32 #else #include -#define SSE_MAX_QUEUED_MESSAGES 8 #endif + +#ifndef SSE_MAX_QUEUED_MESSAGES +#define SSE_MAX_QUEUED_MESSAGES 32 +#endif + #include #include "AsyncWebSynchronization.h" @@ -52,11 +56,11 @@ typedef std::function ArEventHandlerFuncti class AsyncEventSourceMessage { private: - uint8_t * _data; + uint8_t * _data; size_t _len; size_t _sent; //size_t _ack; - size_t _acked; + size_t _acked; public: AsyncEventSourceMessage(const char * data, size_t len); ~AsyncEventSourceMessage(); @@ -90,7 +94,7 @@ class AsyncEventSourceClient { //system callbacks (do not call) void _onAck(size_t len, uint32_t time); - void _onPoll(); + void _onPoll(); void _onTimeout(uint32_t time); void _onDisconnect(); }; diff --git a/src/AsyncWebSocket.cpp b/src/AsyncWebSocket.cpp index 52dcd75f0..3b472fb53 100644 --- a/src/AsyncWebSocket.cpp +++ b/src/AsyncWebSocket.cpp @@ -112,10 +112,7 @@ size_t webSocketSendFrame(AsyncClient *client, bool final, uint8_t opcode, bool return 0; } } - if(!client->send()){ - //os_printf("error sending frame: %lu\n", headLen+len); - return 0; - } + client->send(); return len; } @@ -135,7 +132,7 @@ AsyncWebSocketMessageBuffer::AsyncWebSocketMessageBuffer() } -AsyncWebSocketMessageBuffer::AsyncWebSocketMessageBuffer(uint8_t * data, size_t size) +AsyncWebSocketMessageBuffer::AsyncWebSocketMessageBuffer(uint8_t * data, size_t size) :_data(nullptr) ,_len(size) ,_lock(false) @@ -143,14 +140,14 @@ AsyncWebSocketMessageBuffer::AsyncWebSocketMessageBuffer(uint8_t * data, size_t { if (!data) { - return; + return; } _data = new uint8_t[_len + 1]; if (_data) { memcpy(_data, data, _len); - _data[_len] = 0; + _data[_len] = 0; } } @@ -161,12 +158,12 @@ AsyncWebSocketMessageBuffer::AsyncWebSocketMessageBuffer(size_t size) ,_lock(false) ,_count(0) { - _data = new uint8_t[_len + 1]; + _data = new uint8_t[_len + 1]; if (_data) { - _data[_len] = 0; + _data[_len] = 0; } - + } AsyncWebSocketMessageBuffer::AsyncWebSocketMessageBuffer(const AsyncWebSocketMessageBuffer & copy) @@ -180,13 +177,13 @@ AsyncWebSocketMessageBuffer::AsyncWebSocketMessageBuffer(const AsyncWebSocketMes _count = 0; if (_len) { - _data = new uint8_t[_len + 1]; - _data[_len] = 0; - } + _data = new uint8_t[_len + 1]; + _data[_len] = 0; + } if (_data) { memcpy(_data, copy._data, _len); - _data[_len] = 0; + _data[_len] = 0; } } @@ -202,35 +199,35 @@ AsyncWebSocketMessageBuffer::AsyncWebSocketMessageBuffer(AsyncWebSocketMessageBu _count = 0; if (copy._data) { - _data = copy._data; - copy._data = nullptr; - } + _data = copy._data; + copy._data = nullptr; + } } AsyncWebSocketMessageBuffer::~AsyncWebSocketMessageBuffer() { if (_data) { - delete[] _data; + delete[] _data; } } -bool AsyncWebSocketMessageBuffer::reserve(size_t size) +bool AsyncWebSocketMessageBuffer::reserve(size_t size) { - _len = size; + _len = size; if (_data) { delete[] _data; - _data = nullptr; + _data = nullptr; } _data = new uint8_t[_len + 1]; if (_data) { _data[_len] = 0; - return true; + return true; } else { - return false; + return false; } } @@ -311,7 +308,7 @@ AsyncWebSocketBasicMessage::AsyncWebSocketBasicMessage(uint8_t opcode, bool mask { _opcode = opcode & 0x07; _mask = mask; - + } @@ -366,17 +363,17 @@ AsyncWebSocketBasicMessage::~AsyncWebSocketBasicMessage() { return sent; } -// bool AsyncWebSocketBasicMessage::reserve(size_t size) { +// bool AsyncWebSocketBasicMessage::reserve(size_t size) { // if (size) { // _data = (uint8_t*)malloc(size +1); // if (_data) { -// memset(_data, 0, size); -// _len = size; +// memset(_data, 0, size); +// _len = size; // _status = WS_MSG_SENDING; -// return true; +// return true; // } // } -// return false; +// return false; // } @@ -397,22 +394,22 @@ AsyncWebSocketMultiMessage::AsyncWebSocketMultiMessage(AsyncWebSocketMessageBuff _mask = mask; if (buffer) { - _WSbuffer = buffer; - (*_WSbuffer)++; - _data = buffer->get(); - _len = buffer->length(); + _WSbuffer = buffer; + (*_WSbuffer)++; + _data = buffer->get(); + _len = buffer->length(); _status = WS_MSG_SENDING; //ets_printf("M: %u\n", _len); } else { _status = WS_MSG_ERROR; } - -} + +} AsyncWebSocketMultiMessage::~AsyncWebSocketMultiMessage() { if (_WSbuffer) { - (*_WSbuffer)--; // decreases the counter. + (*_WSbuffer)--; // decreases the counter. } } @@ -522,7 +519,7 @@ void AsyncWebSocketClient::_onAck(size_t len, uint32_t time){ if(len && !_messageQueue.isEmpty()){ _messageQueue.front()->ack(len, time); } - _server->_cleanBuffers(); + _server->_cleanBuffers(); _runQueue(); } @@ -829,9 +826,9 @@ void AsyncWebSocketClient::binary(const __FlashStringHelper *data, size_t len){ for(size_t b=0; bid() == client->id(); }); @@ -958,20 +955,20 @@ void AsyncWebSocket::text(uint32_t id, const char * message, size_t len){ void AsyncWebSocket::textAll(AsyncWebSocketMessageBuffer * buffer){ if (!buffer) return; - buffer->lock(); + buffer->lock(); for(const auto& c: _clients){ if(c->status() == WS_CONNECTED){ c->text(buffer); } } buffer->unlock(); - _cleanBuffers(); + _cleanBuffers(); } void AsyncWebSocket::textAll(const char * message, size_t len){ - AsyncWebSocketMessageBuffer * WSBuffer = makeBuffer((uint8_t *)message, len); - textAll(WSBuffer); + AsyncWebSocketMessageBuffer * WSBuffer = makeBuffer((uint8_t *)message, len); + textAll(WSBuffer); } void AsyncWebSocket::binary(uint32_t id, const char * message, size_t len){ @@ -981,20 +978,20 @@ void AsyncWebSocket::binary(uint32_t id, const char * message, size_t len){ } void AsyncWebSocket::binaryAll(const char * message, size_t len){ - AsyncWebSocketMessageBuffer * buffer = makeBuffer((uint8_t *)message, len); - binaryAll(buffer); + AsyncWebSocketMessageBuffer * buffer = makeBuffer((uint8_t *)message, len); + binaryAll(buffer); } void AsyncWebSocket::binaryAll(AsyncWebSocketMessageBuffer * buffer) { if (!buffer) return; - buffer->lock(); + buffer->lock(); for(const auto& c: _clients){ if(c->status() == WS_CONNECTED) c->binary(buffer); } - buffer->unlock(); - _cleanBuffers(); + buffer->unlock(); + _cleanBuffers(); } void AsyncWebSocket::message(uint32_t id, AsyncWebSocketMessage *message){ @@ -1008,7 +1005,7 @@ void AsyncWebSocket::messageAll(AsyncWebSocketMultiMessage *message){ if(c->status() == WS_CONNECTED) c->message(message); } - _cleanBuffers(); + _cleanBuffers(); } size_t AsyncWebSocket::printf(uint32_t id, const char *format, ...){ @@ -1033,8 +1030,8 @@ size_t AsyncWebSocket::printfAll(const char *format, ...) { size_t len = vsnprintf(temp, MAX_PRINTF_LEN, format, arg); va_end(arg); delete[] temp; - - AsyncWebSocketMessageBuffer * buffer = makeBuffer(len); + + AsyncWebSocketMessageBuffer * buffer = makeBuffer(len); if (!buffer) { return 0; } @@ -1071,8 +1068,8 @@ size_t AsyncWebSocket::printfAll_P(PGM_P formatP, ...) { size_t len = vsnprintf_P(temp, MAX_PRINTF_LEN, formatP, arg); va_end(arg); delete[] temp; - - AsyncWebSocketMessageBuffer * buffer = makeBuffer(len + 1); + + AsyncWebSocketMessageBuffer * buffer = makeBuffer(len + 1); if (!buffer) { return 0; } @@ -1168,7 +1165,7 @@ const char * WS_STR_UUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; bool AsyncWebSocket::canHandle(AsyncWebServerRequest *request){ if(!_enabled) return false; - + if(request->method() != HTTP_GET || !request->url().equals(_url) || !request->isExpectedRequestedConnType(RCT_WS)) return false; @@ -1208,24 +1205,24 @@ void AsyncWebSocket::handleRequest(AsyncWebServerRequest *request){ AsyncWebSocketMessageBuffer * AsyncWebSocket::makeBuffer(size_t size) { - AsyncWebSocketMessageBuffer * buffer = new AsyncWebSocketMessageBuffer(size); + AsyncWebSocketMessageBuffer * buffer = new AsyncWebSocketMessageBuffer(size); if (buffer) { AsyncWebLockGuard l(_lock); _buffers.add(buffer); } - return buffer; + return buffer; } AsyncWebSocketMessageBuffer * AsyncWebSocket::makeBuffer(uint8_t * data, size_t size) { - AsyncWebSocketMessageBuffer * buffer = new AsyncWebSocketMessageBuffer(data, size); - + AsyncWebSocketMessageBuffer * buffer = new AsyncWebSocketMessageBuffer(data, size); + if (buffer) { AsyncWebLockGuard l(_lock); _buffers.add(buffer); } - return buffer; + return buffer; } void AsyncWebSocket::_cleanBuffers()