Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions plugins/in_mqtt/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_in_mqtt_config, payload_key),
"Key where the payload will be preserved"
},
{
FLB_CONFIG_MAP_SIZE, "buffer_size", MQTT_CONNECTION_DEFAULT_BUFFER_SIZE,
0, FLB_TRUE, offsetof(struct flb_in_mqtt_config, buffer_size),
"Maximum payload size"
},
/* EOF */
{0}
};
Expand Down
3 changes: 2 additions & 1 deletion plugins/in_mqtt/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ struct flb_in_mqtt_config {
char *tcp_port; /* TCP Port */

flb_sds_t payload_key; /* payload key */

size_t buffer_size; /* connection buffer size */

int msgp_len; /* msgpack data length */
char msgp[MQTT_MSGP_BUF_SIZE]; /* msgpack static buffer */
struct flb_input_instance *ins; /* plugin input instance */
Expand Down
16 changes: 15 additions & 1 deletion plugins/in_mqtt/mqtt_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ int mqtt_conn_event(void *data)
event = &connection->event;

if (event->mask & MK_EVENT_READ) {
available = sizeof(conn->buf) - conn->buf_len;
available = conn->buf_size - conn->buf_len;

bytes = flb_io_net_read(connection,
(void *) &conn->buf[conn->buf_len],
Expand Down Expand Up @@ -93,6 +93,16 @@ struct mqtt_conn *mqtt_conn_add(struct flb_connection *connection,
return NULL;
}

conn->buf = flb_calloc(ctx->buffer_size, 1);

if (conn->buf == NULL) {
flb_errno();
flb_free(conn);
return NULL;
}

conn->buf_size = ctx->buffer_size;

conn->connection = connection;

/* Set data for the event-loop */
Expand Down Expand Up @@ -137,6 +147,10 @@ int mqtt_conn_del(struct mqtt_conn *conn)
/* Release resources */
mk_list_del(&conn->_head);

if (conn->buf != NULL) {
flb_free(conn->buf);
}

flb_free(conn);

return 0;
Expand Down
5 changes: 4 additions & 1 deletion plugins/in_mqtt/mqtt_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

#include <fluent-bit/flb_connection.h>

#define MQTT_CONNECTION_DEFAULT_BUFFER_SIZE "2048"

enum {
MQTT_NEW = 1, /* it's a new connection */
MQTT_CONNECTED = 2, /* MQTT connection per protocol spec OK */
Expand All @@ -36,7 +38,8 @@ struct mqtt_conn {
int buf_frame_end; /* Frame end position */
int buf_pos; /* Index position */
int buf_len; /* Buffer content length */
unsigned char buf[1024]; /* Buffer data */
size_t buf_size; /* Buffer size */
unsigned char *buf; /* Buffer data */
struct flb_in_mqtt_config *ctx; /* Plugin configuration context */
struct flb_connection *connection;
struct mk_list _head; /* Link to flb_in_mqtt_config->conns */
Expand Down
2 changes: 2 additions & 0 deletions plugins/in_mqtt/mqtt_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,13 @@ static int mqtt_data_append(char *topic, size_t topic_len,
msgpack_unpacked_init(&result);
if (msgpack_unpack_next(&result, pack, out, &off) != MSGPACK_UNPACK_SUCCESS) {
msgpack_unpacked_destroy(&result);
flb_free(pack);
return -1;
}

if (result.data.type != MSGPACK_OBJECT_MAP){
msgpack_unpacked_destroy(&result);
flb_free(pack);
return -1;
}
root = result.data;
Expand Down