forked from halfgaar/FlashMQ
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.h
239 lines (188 loc) · 8.66 KB
/
client.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
/*
This file is part of FlashMQ (https://www.flashmq.org)
Copyright (C) 2021-2023 Wiebe Cazemier
FlashMQ is free software: you can redistribute it and/or modify
it under the terms of The Open Software License 3.0 (OSL-3.0).
See LICENSE for license details.
*/
#ifndef CLIENT_H
#define CLIENT_H
#include <fcntl.h>
#include <unistd.h>
#include <vector>
#include <mutex>
#include <iostream>
#include <time.h>
#include <optional>
#include <openssl/ssl.h>
#include <openssl/err.h>
#include "forward_declarations.h"
#include "mqttpacket.h"
#include "cirbuf.h"
#include "types.h"
#include "iowrapper.h"
#include "bridgeconfig.h"
#include "enums.h"
#include "publishcopyfactory.h"
#define MQTT_HEADER_LENGH 2
/**
* @brief The StowedClient struct stores the client when doing an extended authentication, and we need to keep the info around how
* the client will be registered once the authentication succeeds.
*/
struct StowedClientRegistrationData
{
const bool clean_start;
const uint16_t clientReceiveMax;
const uint32_t sessionExpiryInterval;
StowedClientRegistrationData(bool clean_start, uint16_t clientReceiveMax, uint32_t sessionExpiryInterval);
};
class Client
{
friend class IoWrapper;
int fd;
bool fuzzMode = false;
ProtocolVersion protocolVersion = ProtocolVersion::None;
uint32_t maxOutgoingPacketSize;
const uint32_t maxIncomingPacketSize;
uint16_t maxOutgoingTopicAliasValue = 0;
uint16_t maxIncomingTopicAliasValue = 0;
IoWrapper ioWrapper;
std::string transportStr;
std::string address;
CirBuf readbuf;
CirBuf writebuf;
bool authenticated = false;
bool connectPacketSeen = false;
bool readyForWriting = false;
bool readyForReading = true;
bool disconnectWhenBytesWritten = false;
bool disconnecting = false;
bool outgoingConnection = false;
bool outgoingConnectionEstablished = false;
ClientType clientType = ClientType::Normal;
bool supportsRetained = true; // Interestingly, only SERVERS can tell CLIENTS they don't support it (in CONNACK). The CONNECT packet has no field for it.
std::string disconnectReason;
std::chrono::time_point<std::chrono::steady_clock> lastActivity = std::chrono::steady_clock::now();
std::string clientid;
std::string username;
uint16_t keepalive = 10;
bool clean_start = false;
X509ClientVerification x509ClientVerification = X509ClientVerification::None;
AllowListenerAnonymous allowAnonymousOverride = AllowListenerAnonymous::None;
std::shared_ptr<WillPublish> stagedWillPublish;
std::shared_ptr<WillPublish> willPublish;
const int epoll_fd;
std::weak_ptr<ThreadData> threadData; // The thread (data) that this client 'lives' in.
std::mutex writeBufMutex;
std::shared_ptr<Session> session;
std::unordered_map<uint16_t, std::string> incomingTopicAliases;
uint16_t curOutgoingTopicAlias = 0;
std::unordered_map<std::string, uint16_t> outgoingTopicAliases;
std::mutex outgoingTopicAliasMutex;
std::string extendedAuthenticationMethod;
std::unique_ptr<ConnAck> stagedConnack;
std::unique_ptr<StowedClientRegistrationData> registrationData;
Logger *logger = Logger::getInstance();
sockaddr_in6 addr;
std::weak_ptr<BridgeState> bridgeState;
void setReadyForWriting(bool val);
void setReadyForReading(bool val);
void setAddr(const std::string &address);
public:
uint8_t preAuthPacketCounter = 0;
Client(int fd, std::shared_ptr<ThreadData> threadData, SSL *ssl, bool websocket, bool haproxy, struct sockaddr *addr, const Settings &settings, bool fuzzMode=false);
Client(const Client &other) = delete;
Client(Client &&other) = delete;
~Client();
int getFd() { return fd;}
bool isSslAccepted() const;
bool isSsl() const;
bool needsHaProxyParsing() const;
HaProxyConnectionType readHaProxyData();
bool getSslReadWantsWrite() const;
bool getSslWriteWantsRead() const;
ProtocolVersion getProtocolVersion() const;
void setProtocolVersion(ProtocolVersion version);
void connectToBridgeTarget(FMQSockaddr_in6 addr);
void startOrContinueSslHandshake();
void markAsDisconnecting();
bool readFdIntoBuffer();
void bufferToMqttPackets(std::vector<MqttPacket> &packetQueueIn, std::shared_ptr<Client> &sender);
void setClientProperties(ProtocolVersion protocolVersion, const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive);
void setClientProperties(ProtocolVersion protocolVersion, const std::string &clientId, const std::string username, bool connectPacketSeen, uint16_t keepalive,
uint32_t maxOutgoingPacketSize, uint16_t maxOutgoingTopicAliasValue);
void setClientProperties(bool connectPacketSeen, uint16_t keepalive, uint32_t maxOutgoingPacketSize, uint16_t maxOutgoingTopicAliasValue, bool supportsRetained);
void setWill(const std::string &topic, const std::string &payload, bool retain, uint8_t qos);
void stageWill(WillPublish &&willPublish);
void setWillFromStaged();
void clearWill();
void setAuthenticated(bool value) { authenticated = value;}
bool getAuthenticated() { return authenticated; }
bool hasConnectPacketSeen() { return connectPacketSeen; }
void setHasConnectPacketSeen() { connectPacketSeen = true; }
std::string &getClientId() { return this->clientid; }
void setClientId(const std::string &id);
const std::string &getUsername() const { return this->username; }
std::string &getMutableUsername();
std::shared_ptr<WillPublish> &getWill() { return this->willPublish; }
const std::shared_ptr<WillPublish> &getStagedWill() { return this->stagedWillPublish; }
void assignSession(std::shared_ptr<Session> &session);
std::shared_ptr<Session> getSession();
void resetSession();
void setDisconnectReason(const std::string &reason);
std::chrono::seconds getSecondsTillKeepAliveAction() const;
void writeText(const std::string &text);
void writePing();
void writePingResp();
void writeLoginPacket();
PacketDropReason writeMqttPacket(const MqttPacket &packet);
PacketDropReason writeMqttPacketAndBlameThisClient(PublishCopyFactory ©Factory, uint8_t max_qos, uint16_t packet_id, bool retain);
PacketDropReason writeMqttPacketAndBlameThisClient(const MqttPacket &packet);
bool writeBufIntoFd();
bool isBeingDisconnected() const { return disconnectWhenBytesWritten; }
bool readyForDisconnecting() const { return disconnectWhenBytesWritten && writebuf.usedBytes() == 0; }
// Do this before calling an action that makes this client ready for writing, so that the EPOLLOUT will handle it.
void setReadyForDisconnect() { disconnectWhenBytesWritten = true; }
const sockaddr *getAddr() const;
std::string repr();
std::string repr_endpoint();
bool keepAliveExpired();
std::string getKeepAliveInfoString() const;
void resetBuffersIfEligible();
void setTopicAlias(const uint16_t alias_id, const std::string &topic);
const std::string &getTopicAlias(const uint16_t id) const;
uint32_t getMaxIncomingPacketSize() const;
uint16_t getMaxIncomingTopicAliasValue() const;
void sendOrQueueWill();
void serverInitiatedDisconnect(ReasonCodes reason);
void setRegistrationData(bool clean_start, uint16_t client_receive_max, uint32_t sessionExpiryInterval);
const std::unique_ptr<StowedClientRegistrationData> &getRegistrationData() const;
void clearRegistrationData();
void stageConnack(std::unique_ptr<ConnAck> &&c);
void sendConnackSuccess();
void sendConnackDeny(ReasonCodes reason);
void addAuthReturnDataToStagedConnAck(const std::string &authData);
void setExtendedAuthenticationMethod(const std::string &authMethod);
const std::string &getExtendedAuthenticationMethod() const;
std::shared_ptr<ThreadData> lockThreadData();
void setBridgeState(std::shared_ptr<BridgeState> bridgeState);
bool isOutgoingConnection() const;
std::shared_ptr<BridgeState> getBridgeState();
void setBridgeConnected();
bool getOutgoingConnectionEstablished() const;
ClientType getClientType() const { return clientType; }
void setClientType(ClientType val);
bool isRetainedAvailable() const {return supportsRetained; };
#ifdef TESTING
std::function<void(MqttPacket &packet)> onPacketReceived;
#endif
#ifndef NDEBUG
void setFakeUpgraded();
#endif
void setSslVerify(X509ClientVerification verificationMode);
std::optional<std::string> getUsernameFromPeerCertificate();
X509ClientVerification getX509ClientVerification() const;
void setAllowAnonymousOverride(const AllowListenerAnonymous allow);
AllowListenerAnonymous getAllowAnonymousOverride() const;
};
#endif // CLIENT_H