diff --git a/examples/mqtt.xml.in b/examples/mqtt.xml.in new file mode 100644 index 000000000..9aaa49c87 --- /dev/null +++ b/examples/mqtt.xml.in @@ -0,0 +1,58 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + test_message + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/include/mqtt.hrl b/include/mqtt.hrl new file mode 100644 index 000000000..cb1a3a68e --- /dev/null +++ b/include/mqtt.hrl @@ -0,0 +1,94 @@ +%% The MIT License (MIT) +%% +%% Copyright (c) <2013> +%% +%% Permission is hereby granted, free of charge, to any person obtaining a copy +%% of this software and associated documentation files (the "Software"), to deal +%% in the Software without restriction, including without limitation the rights +%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%% copies of the Software, and to permit persons to whom the Software is +%% furnished to do so, subject to the following conditions: +%% +%% The above copyright notice and this permission notice shall be included in +%% all copies or substantial portions of the Software. +%% +%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +%% THE SOFTWARE. + +%% +%% An erlang client for MQTT (http://www.mqtt.org/) +%% + +-define(LOG(Msg), io:format("{~p:~p ~p}: ~p~n", [?MODULE, ?LINE, self(), Msg])). + +-define(MQTT_PORT, 1883). + +-define(PROTOCOL_NAME, "MQIsdp"). +-define(PROTOCOL_VERSION, 3). + +-define(UNUSED, 0). + +-define(USERNAME, undefined). +-define(PASSWORD, undefined). + +-define(DEFAULT_KEEPALIVE, 120). +-define(DEFAULT_RETRY, 120). +-define(DEFAULT_CONNECT_TIMEOUT, 5). + +-define(CONNECT, 1). +-define(CONNACK, 2). +-define(PUBLISH, 3). +-define(PUBACK, 4). +-define(PUBREC, 5). +-define(PUBREL, 6). +-define(PUBCOMP, 7). +-define(SUBSCRIBE, 8). +-define(SUBACK, 9). +-define(UNSUBSCRIBE, 10). +-define(UNSUBACK, 11). +-define(PINGREQ, 12). +-define(PINGRESP, 13). +-define(DISCONNECT, 14). + +-record(connect_options, { + protocol_name = ?PROTOCOL_NAME, + protocol_version = ?PROTOCOL_VERSION, + client_id, + clean_start = true, + will, + keepalive = ?DEFAULT_KEEPALIVE, + username = ?USERNAME, + password = ?PASSWORD, + retry = ?DEFAULT_RETRY, + connect_timeout = ?DEFAULT_CONNECT_TIMEOUT +}). + +-record(mqtt, { + id, + type, + dup = 0, + qos = 0, + retain = 0, + arg +}). + +-record(sub, { + topic, + qos = 0 +}). + +-record(publish_options, { + qos = 0, + retain = 0 +}). + +-record(will, { + topic, + message, + publish_options = #publish_options{} +}). diff --git a/include/ts_mqtt.hrl b/include/ts_mqtt.hrl new file mode 100644 index 000000000..135831f78 --- /dev/null +++ b/include/ts_mqtt.hrl @@ -0,0 +1,56 @@ +%%% This code was developped by Zhihui Jiao(jzhihui521@gmail.com). +%%% +%%% Copyright (C) 2013 Zhihui Jiao +%%% +%%% This program is free software; you can redistribute it and/or modify +%%% it under the terms of the GNU General Public License as published by +%%% the Free Software Foundation; either version 2 of the License, or +%%% (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +%%% GNU General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License +%%% along with this program; if not, write to the Free Software +%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. +%%% +%%% In addition, as a special exception, you have the permission to +%%% link the code of this program with any library released under +%%% the EPL license and distribute linked combinations including +%%% the two; the MPL (Mozilla Public License), which EPL (Erlang +%%% Public License) is based on, is included in this exception. + + +-vc('$Id$ '). +-author('jzhihui521@gmail.com'). + +%% use by the client to create the request +-record(mqtt_request, { + type, + clean_start = true, + keepalive = 10, % 10s + will_topic, + will_qos, + will_msg, + will_retain, + topic, + qos = 0, + retained = false, + payload + }). + +-record(mqtt_dyndata, { + none + } + ). + +-record(mqtt_session, { + ack_buf = <<>>, + ping_pid, + keepalive, + curr_id = 0, + wait, % wait code + status % connection status + }). diff --git a/src/lib/mqtt_frame.erl b/src/lib/mqtt_frame.erl new file mode 100644 index 000000000..111ccc648 --- /dev/null +++ b/src/lib/mqtt_frame.erl @@ -0,0 +1,366 @@ +%% The MIT License (MIT) +%% +%% Copyright (c) <2013> +%% +%% Permission is hereby granted, free of charge, to any person obtaining a copy +%% of this software and associated documentation files (the "Software"), to deal +%% in the Software without restriction, including without limitation the rights +%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%% copies of the Software, and to permit persons to whom the Software is +%% furnished to do so, subject to the following conditions: +%% +%% The above copyright notice and this permission notice shall be included in +%% all copies or substantial portions of the Software. +%% +%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +%% THE SOFTWARE. + +%% Modified from: https://code.google.com/p/my-mqtt4erl/source/browse/src/mqtt_core.erl. +-module(mqtt_frame). +-author(hellomatty@gmail.com). + +%% +%% An erlang client for MQTT (http://www.mqtt.org/) +%% + +-include_lib("mqtt.hrl"). + +-export([encode/1, decode/1]). +-export([set_connect_options/1, set_publish_options/1, command_for_type/1]). + +%%%=================================================================== +%%% API functions +%%%=================================================================== +encode(#mqtt{} = Message) -> + {VariableHeader, Payload} = encode_message(Message), + FixedHeader = encode_fixed_header(Message), + EncodedLength = encode_length(size(VariableHeader) + size(Payload)), + <>. + +decode(<>) -> + {RemainingLength, Rest1} = decode_length(Rest), + Size = size(Rest1), + if + Size >= RemainingLength -> + <> = Rest1, + {decode_message(decode_fixed_header(<>), Body), Left}; + true -> more + end; +decode(_Data) -> + more. + +set_connect_options(Options) -> + set_connect_options(Options, #connect_options{}). + +set_publish_options(Options) -> + set_publish_options(Options, #publish_options{}). + +command_for_type(Type) -> + case Type of + ?CONNECT -> connect; + ?CONNACK -> connack; + ?PUBLISH -> publish; + ?PUBACK -> puback; + ?PUBREC -> pubrec; + ?PUBREL -> pubrel; + ?PUBCOMP -> pubcomp; + ?SUBSCRIBE -> subscribe; + ?SUBACK -> suback; + ?UNSUBSCRIBE -> unsubscribe; + ?UNSUBACK -> unsuback; + ?PINGREQ -> pingreq; + ?PINGRESP -> pingresp; + ?DISCONNECT -> disconnect; + _ -> unknown + end. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +set_connect_options([], Options) -> + Options; +set_connect_options([{keepalive, KeepAlive}|T], Options) -> + set_connect_options(T, Options#connect_options{keepalive = KeepAlive}); +set_connect_options([{retry, Retry}|T], Options) -> + set_connect_options(T, Options#connect_options{retry = Retry}); +set_connect_options([{client_id, ClientId}|T], Options) -> + set_connect_options(T, Options#connect_options{client_id = ClientId}); +set_connect_options([{clean_start, Flag}|T], Options) -> + set_connect_options(T, Options#connect_options{clean_start = Flag}); +set_connect_options([{connect_timeout, Timeout}|T], Options) -> + set_connect_options(T, Options#connect_options{connect_timeout = Timeout}); +set_connect_options([{username, UserName}|T], Options) -> + set_connect_options(T, Options#connect_options{username = UserName}); +set_connect_options([{password, Password}|T], Options) -> + set_connect_options(T, Options#connect_options{password = Password}); +set_connect_options([#will{} = Will|T], Options) -> + set_connect_options(T, Options#connect_options{will = Will}); +set_connect_options([UnknownOption|_T], _Options) -> + exit({connect, unknown_option, UnknownOption}). + +set_publish_options([], Options) -> + Options; +set_publish_options([{qos, QoS}|T], Options) when QoS >= 0, QoS =< 2 -> + set_publish_options(T, Options#publish_options{qos = QoS}); +set_publish_options([{retain, true}|T], Options) -> + set_publish_options(T, Options#publish_options{retain = 1}); +set_publish_options([{retain, false}|T], Options) -> + set_publish_options(T, Options#publish_options{retain = 0}); +set_publish_options([UnknownOption|_T], _Options) -> + exit({unknown, publish_option, UnknownOption}). + +construct_will(WT, WM, WillQoS, WillRetain) -> + #will{ + topic = WT, + message = WM, + publish_options = #publish_options{qos = WillQoS, retain = WillRetain} + }. + +decode_message(#mqtt{type = ?CONNECT} = Message, Rest) -> + <> = Rest, + {VariableHeader, Payload} = split_binary(Rest, 2 + ProtocolNameLength + 4), + <<_:16, ProtocolName:ProtocolNameLength/binary, ProtocolVersion:8/big, UsernameFlag:1, PasswordFlag:1, WillRetain:1, WillQoS:2/big, WillFlag:1, CleanStart:1, _:1, KeepAlive:16/big>> = VariableHeader, + {ClientId, Will, Username, Password} = case {WillFlag, UsernameFlag, PasswordFlag} of + {1, 0, 0} -> + [C, WT, WM] = decode_strings(Payload), + W = construct_will(WT, WM, WillQoS, WillRetain), + {C, W, undefined, undefined}; + {1, 1, 0} -> + [C, WT, WM, U] = decode_strings(Payload), + W = construct_will(WT, WM, WillQoS, WillRetain), + {C, W, U, undefined}; + {1, 1, 1} -> + [C, WT, WM, U, P] = decode_strings(Payload), + W = construct_will(WT, WM, WillQoS, WillRetain), + {C, W, U, P}; + {0, 1, 0} -> + [C, U] = decode_strings(Payload), + {C, undefined, U, undefined}; + {0, 1, 1} -> + [C, U, P] = decode_strings(Payload), + {C, undefined, U, P}; + {0, 0, 0} -> + [C] = decode_strings(Payload), + {C, undefined, undefined, undefined} + end, + Message#mqtt{ + arg = #connect_options{ + client_id = ClientId, + protocol_name = binary_to_list(ProtocolName), + protocol_version = ProtocolVersion, + clean_start = CleanStart =:= 1, + will = Will, + username = Username, + password = Password, + keepalive = KeepAlive + } + }; +decode_message(#mqtt{type = ?CONNACK} = Message, Rest) -> + <<_:8, ResponseCode:8/big>> = Rest, + Message#mqtt{arg = ResponseCode}; +decode_message(#mqtt{type = ?PINGRESP} = Message, _Rest) -> + Message; +decode_message(#mqtt{type = ?PINGREQ} = Message, _Rest) -> + Message; +decode_message(#mqtt{type = ?PUBLISH, qos = 0} = Message, Rest) -> + {<>, _} = split_binary(Rest, 2), + {<<_:16, Topic/binary>>, Payload} = split_binary(Rest, 2 + TopicLength), + Message#mqtt{ + arg = {binary_to_list(Topic), binary_to_list(Payload)} + }; +decode_message(#mqtt{type = ?PUBLISH} = Message, Rest) -> + {<>, _} = split_binary(Rest, 2), + {<<_:16, Topic:TopicLength/binary, MessageId:16/big>>, Payload} = split_binary(Rest, 4 + TopicLength), + Message#mqtt{ + id = MessageId, + arg = {binary_to_list(Topic), binary_to_list(Payload)} + }; +decode_message(#mqtt{type = Type} = Message, Rest) + when + Type =:= ?PUBACK; + Type =:= ?PUBREC; + Type =:= ?PUBREL; + Type =:= ?PUBCOMP -> + <> = Rest, + Message#mqtt{ + arg = MessageId + }; +decode_message(#mqtt{type = ?SUBSCRIBE} = Message, Rest) -> + {<>, Payload} = split_binary(Rest, 2), + Message#mqtt{ + id = MessageId, + arg = decode_subs(Payload, []) + }; +decode_message(#mqtt{type = ?SUBACK} = Message, Rest) -> + {<>, Payload} = split_binary(Rest, 2), + GrantedQoS = lists:map(fun(Item) -> + <<_:6, QoS:2/big>> = <>, + QoS + end, + binary_to_list(Payload) + ), + Message#mqtt{ + arg = {MessageId, GrantedQoS} + }; +decode_message(#mqtt{type = ?UNSUBSCRIBE} = Message, Rest) -> + {<>, Payload} = split_binary(Rest, 2), + Message#mqtt{ + id = MessageId, + arg = {MessageId, lists:map(fun(T) -> #sub{topic = T} end, decode_strings(Payload))} + }; +decode_message(#mqtt{type = ?UNSUBACK} = Message, Rest) -> + <> = Rest, + Message#mqtt{ + arg = MessageId + }; +decode_message(#mqtt{type = ?DISCONNECT} = Message, _Rest) -> + Message; +decode_message(Message, Rest) -> + exit({decode_message, unexpected_message, {Message, Rest}}). + +decode_subs(<<>>, Subs) -> + lists:reverse(Subs); +decode_subs(Bytes, Subs) -> + <> = Bytes, + <<_:16, Topic:TopicLength/binary, ?UNUSED:6, QoS:2/big, Rest/binary>> = Bytes, + decode_subs(Rest, [#sub{topic = binary_to_list(Topic), qos = QoS}|Subs]). + +encode_message(#mqtt{type = ?CONNACK, arg = ReturnCode}) -> + {<>,<<>>}; +encode_message(#mqtt{type = ?CONNECT, arg = Options}) -> + CleanStart = case Options#connect_options.clean_start of + true -> + 1; + false -> + 0 + end, + {UserNameFlag, UserNameValue} = case Options#connect_options.username of + undefined -> + {0, undefined}; + UserName -> + {1, UserName} + end, + {PasswordFlag, PasswordValue} = case Options#connect_options.password of + undefined -> + {0, undefined}; + Password -> + {1, Password} + end, + {WillFlag, WillQoS, WillRetain, PayloadList} = case Options#connect_options.will of + {will, WillTopic, WillMessage, WillOptions} -> + { + 1, WillOptions#publish_options.qos, WillOptions#publish_options.retain, + [encode_string(Options#connect_options.client_id), encode_string(WillTopic), encode_string(WillMessage)] + }; + undefined -> + {0, 0, 0, [encode_string(Options#connect_options.client_id)]} + end, + Payload1 = case UserNameValue of + undefined -> list_to_binary(PayloadList); + _ -> + case PasswordValue of + undefined -> list_to_binary(lists:append(PayloadList, [encode_string(UserNameValue)])); + _ -> list_to_binary(lists:append(PayloadList, [encode_string(UserNameValue), encode_string(PasswordValue)])) + end + end, + { + list_to_binary([ + encode_string(Options#connect_options.protocol_name), + <<(Options#connect_options.protocol_version)/big>>, + <> + ]), + Payload1 + }; +encode_message(#mqtt{type = ?PUBLISH, arg = {Topic, Payload}} = Message) -> + if + Message#mqtt.qos =:= 0 -> + { + encode_string(Topic), + list_to_binary(Payload) + }; + Message#mqtt.qos > 0 -> + { + list_to_binary([encode_string(Topic), <<(Message#mqtt.id):16/big>>]), + list_to_binary(Payload) + } + end; +encode_message(#mqtt{type = ?PUBACK, arg = MessageId}) -> + { + <>, + <<>> + }; +encode_message(#mqtt{type = ?SUBSCRIBE, arg = Subs} = Message) -> + { + <<(Message#mqtt.id):16/big>>, + list_to_binary( lists:flatten( lists:map(fun({sub, Topic, RequestedQoS}) -> [encode_string(Topic), <>] end, Subs))) + }; +encode_message(#mqtt{type = ?SUBACK, arg = {MessageId, Subs}}) -> + { + <>, + list_to_binary(lists:map(fun(S) -> <> end, Subs)) + }; +encode_message(#mqtt{type = ?UNSUBSCRIBE, arg = Subs} = Message) -> + { + <<(Message#mqtt.id):16/big>>, + list_to_binary(lists:map(fun({sub, T, _Q}) -> encode_string(T) end, Subs)) + }; +encode_message(#mqtt{type = ?UNSUBACK, arg = MessageId}) -> + {<>, <<>>}; +encode_message(#mqtt{type = ?PINGREQ}) -> + {<<>>, <<>>}; +encode_message(#mqtt{type = ?PINGRESP}) -> + {<<>>, <<>>}; +encode_message(#mqtt{type = ?PUBREC, arg = MessageId}) -> + {<>, <<>>}; +encode_message(#mqtt{type = ?PUBREL, arg = MessageId}) -> + {<>, <<>>}; +encode_message(#mqtt{type = ?PUBCOMP, arg = MessageId}) -> + {<>, <<>>}; +encode_message(#mqtt{type = ?DISCONNECT}) -> + {<<>>, <<>>}; +encode_message(#mqtt{} = Message) -> + exit({encode_message, unknown_type, Message}). + +decode_length(Data) -> + decode_length(Data, 1, 0). +decode_length(<<0:1, Length:7, Rest/binary>>, Multiplier, Value) -> + {Value + Multiplier * Length, Rest}; +decode_length(<<1:1, Length:7, Rest/binary>>, Multiplier, Value) -> + decode_length(Rest, Multiplier * 128, Value + Multiplier * Length). + +encode_length(Length) -> + encode_length(Length, <<>>). + +encode_length(Length, Buff) when Length div 128 > 0 -> + Digit = Length rem 128, + Current = <<1:1, Digit:7/big>>, + encode_length(Length div 128, <>); +encode_length(Length, Buff) -> + Digit = Length rem 128, + Current = <<0:1, Digit:7/big>>, + <>. + +encode_fixed_header(Message) when is_record(Message, mqtt) -> + <<(Message#mqtt.type):4/big, (Message#mqtt.dup):1, (Message#mqtt.qos):2/big, (Message#mqtt.retain):1>>. + +decode_fixed_header(Byte) -> + <> = Byte, + #mqtt{type = Type, dup = Dup, qos = QoS, retain = Retain}. + +encode_string(String) -> + Bytes = list_to_binary(String), + Length = size(Bytes), + <>. + +decode_strings(Bytes) when is_binary(Bytes) -> + decode_strings(Bytes, []). +decode_strings(<<>>, Strings) -> + lists:reverse(Strings); +decode_strings(<> = Bytes, Strings) -> + <<_:16, Binary:Length/binary, Rest/binary>> = Bytes, + decode_strings(Rest, [binary_to_list(Binary)|Strings]). diff --git a/src/test/ts_test_all.erl b/src/test/ts_test_all.erl index 16879251c..bfbd46af0 100644 --- a/src/test/ts_test_all.erl +++ b/src/test/ts_test_all.erl @@ -33,5 +33,6 @@ all_test_() -> [ts_test_recorder, ts_test_search, ts_test_stats, ts_test_interaction, - ts_test_websocket + ts_test_websocket, + ts_test_mqtt ]. diff --git a/src/test/ts_test_mqtt.erl b/src/test/ts_test_mqtt.erl new file mode 100644 index 000000000..363ed4b9a --- /dev/null +++ b/src/test/ts_test_mqtt.erl @@ -0,0 +1,157 @@ +%%% This code was developped by Zhihui Jiao(jzhihui521@gmail.com). +%%% +%%% Copyright (C) 2013 Zhihui Jiao +%%% +%%% This program is free software; you can redistribute it and/or modify +%%% it under the terms of the GNU General Public License as published by +%%% the Free Software Foundation; either version 2 of the License, or +%%% (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +%%% GNU General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License +%%% along with this program; if not, write to the Free Software +%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. +%%% +%%% In addition, as a special exception, you have the permission to +%%% link the code of this program with any library released under +%%% the EPL license and distribute linked combinations including +%%% the two; the MPL (Mozilla Public License), which EPL (Erlang +%%% Public License) is based on, is included in this exception. + +-module(ts_test_mqtt). +-vc('$Id$ '). +-author('jzhihui521@gmail.com'). + +-compile(export_all). + +-include("ts_profile.hrl"). +-include("mqtt.hrl"). +-include("ts_config.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +encode_connect_test() -> + ClientId = "tsung-test-id", + PublishOptions = mqtt_frame:set_publish_options([{qos, 0}, + {retain, false}]), + Will = #will{topic = "will_topic", message = "will_message", + publish_options = PublishOptions}, + + Options = mqtt_frame:set_connect_options([{client_id, ClientId}, + {clean_start, true}, + {keepalive, 10}, + Will]), + Message = #mqtt{type = ?CONNECT, arg = Options}, + EncodedData = mqtt_frame:encode(Message), + ?assertEqual(<<16,53,0,6,77,81,73,115,100,112,3,6,0,10,0,13,116,115,117,110, + 103,45,116,101,115,116,45,105,100,0,10,119,105,108,108,95, + 116,111,112,105,99,0,12,119,105,108,108,95,109,101,115,115, + 97,103,101>>, EncodedData). + +decode_connect_test() -> + Data = <<16,53,0,6,77,81,73,115,100,112,3,6,0,10,0,13,116,115,117,110, + 103,45,116,101,115,116,45,105,100,0,10,119,105,108,108,95, + 116,111,112,105,99,0,12,119,105,108,108,95,109,101,115,115, + 97,103,101>>, + {#mqtt{type = Type}, Left} = mqtt_frame:decode(Data), + ?assertEqual(<<>>, Left), + ?assertEqual(?CONNECT, Type). + +encode_disconnect_test() -> + Message = #mqtt{type = ?DISCONNECT}, + EncodedData = mqtt_frame:encode(Message), + ?assertEqual(<<224,0>>, EncodedData). + +decode_disconnect_test() -> + Data = <<224,0>>, + {#mqtt{type = Type}, Left} = mqtt_frame:decode(Data), + ?assertEqual(<<>>, Left), + ?assertEqual(?DISCONNECT, Type). + +encode_publish_test() -> + Message = #mqtt{id = 1, type = ?PUBLISH, qos = 0, retain = 0, + arg = {"test_topic", "test_message"}}, + EncodedData = mqtt_frame:encode(Message), + ?assertEqual(<<48,24,0,10,116,101,115,116,95,116,111,112,105,99,116,101,115,116,95,109,101,115,115,97,103,101>>, EncodedData). + +decode_publish_test() -> + Data = <<48,24,0,10,116,101,115,116,95,116,111,112,105,99,116,101,115,116,95,109,101,115,115,97,103,101>>, + {#mqtt{type = Type}, Left} = mqtt_frame:decode(Data), + ?assertEqual(<<>>, Left), + ?assertEqual(?PUBLISH, Type). + +encode_subscribe_test() -> + Arg = [#sub{topic = "test_topic", qos = 0}], + Message = #mqtt{id = 1, type = ?SUBSCRIBE, arg = Arg}, + EncodedData = mqtt_frame:encode(Message), + ?assertEqual(<<128,15,0,1,0,10,116,101,115,116,95,116,111,112,105,99,0>>, EncodedData). + +decode_subscribe_test() -> + Data = <<128,15,0,1,0,10,116,101,115,116,95,116,111,112,105,99,0>>, + {#mqtt{type = Type}, Left} = mqtt_frame:decode(Data), + ?assertEqual(<<>>, Left), + ?assertEqual(?SUBSCRIBE, Type). + +encode_unsubscribe_test() -> + Arg = [#sub{topic = "test_topic"}], + Message = #mqtt{id = 1, type = ?UNSUBSCRIBE, arg = Arg}, + EncodedData = mqtt_frame:encode(Message), + ?assertEqual(<<160,14,0,1,0,10,116,101,115,116,95,116,111,112,105,99>>, EncodedData). + +decode_unsubscribe_test() -> + Data = <<160,14,0,1,0,10,116,101,115,116,95,116,111,112,105,99>>, + {#mqtt{type = Type}, Left} = mqtt_frame:decode(Data), + ?assertEqual(<<>>, Left), + ?assertEqual(?UNSUBSCRIBE, Type). + +encode_puback_test() -> + Message = #mqtt{type = ?PUBACK, arg = 1}, + EncodedData = mqtt_frame:encode(Message), + ?assertEqual(<<64,2,0,1>>, EncodedData). + +decode_puback_test() -> + Data = <<64,2,0,1>>, + {#mqtt{type = Type}, Left} = mqtt_frame:decode(Data), + ?assertEqual(<<>>, Left), + ?assertEqual(?PUBACK, Type). + +encode_ping_test() -> + Message = #mqtt{type = ?PINGREQ}, + EncodedData = mqtt_frame:encode(Message), + ?assertEqual(<<192,0>>, EncodedData). + +decode_ping_test() -> + Data = <<192,0>>, + {#mqtt{type = Type}, Left} = mqtt_frame:decode(Data), + ?assertEqual(<<>>, Left), + ?assertEqual(?PINGREQ, Type). + +encode_pong_test() -> + Message = #mqtt{type = ?PINGRESP}, + EncodedData = mqtt_frame:encode(Message), + ?assertEqual(<<208,0>>, EncodedData). + +decode_pong_test() -> + Data = <<208,0>>, + {#mqtt{type = Type}, Left} = mqtt_frame:decode(Data), + ?assertEqual(<<>>, Left), + ?assertEqual(?PINGRESP, Type). + +more_test() -> + Data = <<64,2,0>>, + Result = mqtt_frame:decode(Data), + ?assertEqual(more, Result). + +left_test() -> + Data = <<64,2,0,1,2,3>>, + {#mqtt{type = Type}, Left} = mqtt_frame:decode(Data), + ?assertEqual(<<2,3>>, Left), + ?assertEqual(?PUBACK, Type). + +myset_env()-> + myset_env(0). +myset_env(N)-> + application:set_env(stdlib, debug_level, N). diff --git a/src/tsung/ts_mqtt.erl b/src/tsung/ts_mqtt.erl new file mode 100644 index 000000000..6a93960d4 --- /dev/null +++ b/src/tsung/ts_mqtt.erl @@ -0,0 +1,304 @@ +%%% This code was developped by Zhihui Jiao(jzhihui521@gmail.com). +%%% +%%% Copyright (C) 2013 Zhihui Jiao +%%% +%%% This program is free software; you can redistribute it and/or modify +%%% it under the terms of the GNU General Public License as published by +%%% the Free Software Foundation; either version 2 of the License, or +%%% (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +%%% GNU General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License +%%% along with this program; if not, write to the Free Software +%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. +%%% +%%% In addition, as a special exception, you have the permission to +%%% link the code of this program with any library released under +%%% the EPL license and distribute linked combinations including +%%% the two; the MPL (Mozilla Public License), which EPL (Erlang +%%% Public License) is based on, is included in this exception. + +-module(ts_mqtt). + +-vc('$Id$ '). +-author('jzhihui521@gmail.com'). + +-behavior(ts_plugin). + +-include("ts_profile.hrl"). +-include("ts_config.hrl"). +-include("ts_mqtt.hrl"). +-include("mqtt.hrl"). + +-export([add_dynparams/4, + get_message/2, + session_defaults/0, + parse/2, + dump/2, + parse_bidi/2, + parse_config/2, + decode_buffer/2, + new_session/0]). +-export([ping_loop/3]). + +%%---------------------------------------------------------------------- +%% Function: session_default/0 +%% Purpose: default parameters for session (persistent & bidirectional) +%% Returns: {ok, true|false, true|false} +%%---------------------------------------------------------------------- +session_defaults() -> + {ok, true, true}. + +%% @spec decode_buffer(Buffer::binary(),Session::record(jabber)) -> +%% NewBuffer::binary() +%% @doc We need to decode buffer (remove chunks, decompress ...) for +%% matching or dyn_variables +%% @end +decode_buffer(Buffer, #mqtt_session{}) -> + Buffer. + +%%---------------------------------------------------------------------- +%% Function: new_session/0 +%% Purpose: initialize session information +%% Returns: record or [] +%%---------------------------------------------------------------------- +new_session() -> + #mqtt_session{}. + +dump(A, B) -> + ts_plugin:dump(A,B). +%%---------------------------------------------------------------------- +%% Function: get_message/1 +%% Purpose: Build a message/request , +%% Args: record +%% Returns: binary +%%---------------------------------------------------------------------- +get_message(#mqtt_request{type = connect, clean_start = CleanStart, + keepalive = KeepAlive, will_topic = WillTopic, + will_qos = WillQos, will_msg = WillMsg, + will_retain = WillRetain}, + #state_rcv{session = MqttSession}) -> + ClientId = ["tsung-", ts_utils:randombinstr(10)], + PublishOptions = mqtt_frame:set_publish_options([{qos, WillQos}, + {retain, WillRetain}]), + Will = #will{topic = WillTopic, message = WillMsg, + publish_options = PublishOptions}, + + Options = mqtt_frame:set_connect_options([{client_id, ClientId}, + {clean_start, CleanStart}, + {keepalive, KeepAlive}, + Will]), + Message = #mqtt{type = ?CONNECT, arg = Options}, + {mqtt_frame:encode(Message), + MqttSession#mqtt_session{wait = ?CONNACK, keepalive = KeepAlive}}; +get_message(#mqtt_request{type = disconnect}, + #state_rcv{session = MqttSession}) -> + PingPid = MqttSession#mqtt_session.ping_pid, + PingPid ! stop, + Message = #mqtt{type = ?DISCONNECT}, + ts_mon:add({count, mqtt_disconnected}), + {mqtt_frame:encode(Message), + MqttSession#mqtt_session{wait = none, status = disconnect}}; +get_message(#mqtt_request{type = publish, topic = Topic, qos = Qos, + retained = Retained, payload = Payload}, + #state_rcv{session = MqttSession = #mqtt_session{curr_id = Id}}) -> + NewMqttSession = case Qos of + 0 -> MqttSession; + _ -> MqttSession#mqtt_session{curr_id = Id + 1} + end, + + MsgId = NewMqttSession#mqtt_session.curr_id, + Message = #mqtt{id = MsgId, type = ?PUBLISH, qos = Qos, retain = Retained, + arg = {Topic, Payload}}, + Wait = case Qos of + 1 -> ?PUBACK; + _ -> none + end, + ts_mon:add({count, mqtt_published}), + {mqtt_frame:encode(Message), NewMqttSession#mqtt_session{wait = Wait}}; +get_message(#mqtt_request{type = subscribe, topic = Topic, qos = Qos}, + #state_rcv{session = MqttSession = #mqtt_session{curr_id = Id}}) -> + NewMqttSession = MqttSession#mqtt_session{curr_id = Id + 1}, + Arg = [#sub{topic = Topic, qos = Qos}], + MsgId = NewMqttSession#mqtt_session.curr_id, + Message = #mqtt{id = MsgId, type = ?SUBSCRIBE, arg = Arg}, + {mqtt_frame:encode(Message), NewMqttSession#mqtt_session{wait = ?SUBACK}}; +get_message(#mqtt_request{type = unsubscribe, topic = Topic}, + #state_rcv{session = MqttSession = #mqtt_session{curr_id = Id}}) -> + NewMqttSession = MqttSession#mqtt_session{curr_id = Id + 1}, + Arg = [#sub{topic = Topic}], + MsgId = NewMqttSession#mqtt_session.curr_id, + Message = #mqtt{id = MsgId, type = ?UNSUBSCRIBE, arg = Arg}, + {mqtt_frame:encode(Message),NewMqttSession#mqtt_session{wait = ?UNSUBACK}}. + +%%---------------------------------------------------------------------- +%% Function: parse/2 +%% Purpose: parse the response from the server and keep information +%% about the response in State#state_rcv.session +%% Args: Data (binary), State (#state_rcv) +%% Returns: {NewState, Options for socket (list), Close = true|false} +%%---------------------------------------------------------------------- +parse(closed, State) -> + {State#state_rcv{ack_done = true, datasize=0}, [], true}; +%% new response, compute data size (for stats) +parse(Data, State=#state_rcv{acc = [], datasize= 0}) -> + parse(Data, State#state_rcv{datasize= size(Data)}); + +%% normal mqtt message +parse(Data, State=#state_rcv{acc = [], session = MqttSession, socket = Socket}) -> + Wait = MqttSession#mqtt_session.wait, + AckBuf = MqttSession#mqtt_session.ack_buf, + case mqtt_frame:decode(Data) of + {_MqttMsg = #mqtt{type = Wait}, Left} -> + ?DebugF("receive mqtt_msg: ~p ~p~n", + [mqtt_frame:command_for_type(Wait), _MqttMsg]), + NewLeft = case Wait of + ?SUBACK -> <<>>; + _ -> Left + end, + case Wait of + ?CONNACK -> ts_mon:add({count, mqtt_connected}); + ?PUBACK -> ts_mon:add({count, mqtt_server_pubacked}); + ?SUBACK -> + case {AckBuf, Left} of + {<<>>, <<>>} -> ok; + _ -> self() ! {gen_ts_transport, Socket, Left} + end; + _ -> ok + end, + NewMqttSession = case Wait of + ?CONNACK -> + Proto = State#state_rcv.protocol, + KeepAlive = MqttSession#mqtt_session.keepalive, + PingPid = create_ping_proc(Proto, Socket, KeepAlive), + MqttSession#mqtt_session{ping_pid = PingPid}; + _ -> MqttSession + end, + {State#state_rcv{ack_done = true, acc = NewLeft, + session = NewMqttSession}, [], false}; + {_MqttMsg = #mqtt{id = MessageId, type = Type, qos = Qos}, Left} -> + ?DebugF("receive mqtt_msg, expecting: ~p, actual: ~p ~p~n", + [mqtt_frame:command_for_type(Wait), + mqtt_frame:command_for_type(Type), _MqttMsg]), + NewMqttSession = case {Wait, Type, Qos} of + {?SUBACK, ?PUBLISH, 1} -> + Message = #mqtt{type = ?PUBACK, arg = MessageId}, + EncodedData = mqtt_frame:encode(Message), + ts_mon:add({count, mqtt_server_published}), + NewAckBuf = <>, + MqttSession#mqtt_session{ack_buf = NewAckBuf}; + _ -> MqttSession + end, + {State#state_rcv{ack_done = false, acc = Left, + session = NewMqttSession}, [], false}; + more -> + ?DebugF("incomplete mqtt frame: ~p~n", [Data]), + {State#state_rcv{acc = Data}, [], false} + end; +%% more data, add this to accumulator and parse, update datasize +parse(Data, State=#state_rcv{acc = Acc, datasize = DataSize}) -> + NewSize= DataSize + size(Data), + parse(<< Acc/binary, Data/binary >>, + State#state_rcv{acc = [], datasize = NewSize}). + +parse_bidi(<<>>, State=#state_rcv{acc = [], session = MqttSession}) -> + AckBuf = MqttSession#mqtt_session.ack_buf, + Ack = case AckBuf of + <<>> -> nodata; + _ -> AckBuf + end, + NewMqttSession = MqttSession#mqtt_session{ack_buf = <<>>}, + ?DebugF("ack buf: ~p~n", [AckBuf]), + {Ack, State#state_rcv{session = NewMqttSession}}; +parse_bidi(Data, State=#state_rcv{acc = [], session = MqttSession}) -> + AckBuf = MqttSession#mqtt_session.ack_buf, + case mqtt_frame:decode(Data) of + {_MqttMsg = #mqtt{type = ?PUBLISH, qos = Qos, id = MessageId}, Left} -> + ?DebugF("receive bidi mqtt_msg: ~p ~p~n", + [mqtt_frame:command_for_type(?PUBLISH), _MqttMsg]), + + ts_mon:add({count, mqtt_server_published}), + ts_mon:add({count, mqtt_pubacked}), + Ack = case Qos of + 1 -> + Message = #mqtt{type = ?PUBACK, arg = MessageId}, + mqtt_frame:encode(Message); + _ -> <<>> + end, + NewAckBuf = <>, + NewMqttSession = MqttSession#mqtt_session{ack_buf = NewAckBuf}, + parse_bidi(Left, State#state_rcv{session = NewMqttSession}); + {_MqttMsg = #mqtt{type = _Type}, Left} -> + ?DebugF("receive bidi mqtt_msg: ~p ~p~n", + [mqtt_frame:command_for_type(_Type), _MqttMsg]), + parse_bidi(Left, State); + more -> + {nodata, State#state_rcv{acc = Data}} + end; +parse_bidi(Data, State=#state_rcv{acc = Acc, datasize = DataSize}) -> + NewSize = DataSize + size(Data), + ?DebugF("parse mqtt bidi data: ~p ~p~n", [Data, Acc]), + parse_bidi(<>, + State#state_rcv{acc = [], datasize = NewSize}). + +%%---------------------------------------------------------------------- +%% Function: parse_config/2 +%% Purpose: parse tags in the XML config file related to the protocol +%% Returns: List +%%---------------------------------------------------------------------- +parse_config(Element, Conf) -> + ts_config_mqtt:parse_config(Element, Conf). + +%%---------------------------------------------------------------------- +%% Function: add_dynparams/4 +%% Purpose: we dont actually do anything +%% Returns: #websocket_request +%%---------------------------------------------------------------------- +add_dynparams(true, {DynVars, _S}, + Param = #mqtt_request{type = publish, topic = Topic, + payload = Payload}, + _HostData) -> + NewTopic = ts_search:subst(Topic, DynVars), + NewPayload = ts_search:subst(Payload, DynVars), + Param#mqtt_request{topic = NewTopic, payload = NewPayload}; +add_dynparams(true, {DynVars, _S}, + Param = #mqtt_request{type = subscribe, topic = Topic}, + _HostData) -> + NewTopic = ts_search:subst(Topic, DynVars), + Param#mqtt_request{topic = NewTopic}; +add_dynparams(true, {DynVars, _S}, + Param = #mqtt_request{type = unsubscribe, topic = Topic}, + _HostData) -> + NewTopic = ts_search:subst(Topic, DynVars), + Param#mqtt_request{topic = NewTopic}; +add_dynparams(_Bool, _DynData, Param, _HostData) -> + Param#mqtt_request{}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +create_ping_proc(Proto, Socket, KeepAlive) -> + PingPid = proc_lib:spawn_link(?MODULE, ping_loop, [Proto, Socket, KeepAlive]), + erlang:send_after(KeepAlive * 1000, PingPid, ping), + PingPid. + +ping_loop(Proto, Socket, KeepAlive) -> + receive + ping -> + try + Message = #mqtt{type = ?PINGREQ}, + PingFrame = mqtt_frame:encode(Message), + Proto:send(Socket, PingFrame, []) + catch + Error -> + ?LOGF("Error sending mqtt pingreq: ~p~n",[Error], ?ERR) + end, + erlang:send_after(KeepAlive * 1000, self(), ping), + ping_loop(Proto, Socket, KeepAlive); + stop -> ok + end. + diff --git a/src/tsung_controller/ts_config_mqtt.erl b/src/tsung_controller/ts_config_mqtt.erl new file mode 100644 index 000000000..11e9af4c8 --- /dev/null +++ b/src/tsung_controller/ts_config_mqtt.erl @@ -0,0 +1,115 @@ +%%% This code was developped by Zhihui Jiao(jzhihui521@gmail.com). +%%% +%%% Copyright (C) 2013 Zhihui Jiao +%%% +%%% This program is free software; you can redistribute it and/or modify +%%% it under the terms of the GNU General Public License as published by +%%% the Free Software Foundation; either version 2 of the License, or +%%% (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +%%% GNU General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License +%%% along with this program; if not, write to the Free Software +%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. +%%% +%%% In addition, as a special exception, you have the permission to +%%% link the code of this program with any library released under +%%% the EPL license and distribute linked combinations including +%%% the two; the MPL (Mozilla Public License), which EPL (Erlang +%%% Public License) is based on, is included in this exception. + +-module(ts_config_mqtt). + +-vc('$Id$ '). +-author('jzhihui521@gmail.com'). + +-export([parse_config/2]). + +-include("ts_profile.hrl"). +-include("ts_config.hrl"). +-include("ts_mqtt.hrl"). + +-include("xmerl.hrl"). + +%%---------------------------------------------------------------------- +%% Func: parse_config/2 +%% Args: Element, Config +%% Returns: List +%% Purpose: parse a request defined in the XML config file +%%---------------------------------------------------------------------- +%% Parsing other elements +parse_config(Element = #xmlElement{name = dyn_variable}, Conf = #config{}) -> + ts_config:parse(Element, Conf); +parse_config(Element = #xmlElement{name = mqtt}, + Config = #config{curid = Id, session_tab = Tab, + sessions = [CurS | _], dynvar = DynVar, + subst = SubstFlag, match = MatchRegExp}) -> + Type = ts_config:getAttr(atom, Element#xmlElement.attributes, type), + CleanStart = ts_config:getAttr(atom, Element#xmlElement.attributes, + clean_start, true), + KeepAlive = ts_config:getAttr(float_or_integer, Element#xmlElement.attributes, + keepalive, 10), + WillTopic = ts_config:getAttr(string, Element#xmlElement.attributes, + will_topic, ""), + WillQos = ts_config:getAttr(float_or_integer, Element#xmlElement.attributes, + will_qos, 0), + WillMsg = ts_config:getAttr(string, Element#xmlElement.attributes, + will_msg, ""), + WillRetain = ts_config:getAttr(atom, Element#xmlElement.attributes, + will_retain, false), + Topic = ts_config:getAttr(string, Element#xmlElement.attributes, topic, ""), + Qos = ts_config:getAttr(float_or_integer, Element#xmlElement.attributes, + qos, 0), + Retained = ts_config:getAttr(atom, Element#xmlElement.attributes, + retained, false), + RetainValue = case Retained of + true -> 1; + false -> 0 + end, + Timeout = ts_config:getAttr(float_or_integer, Element#xmlElement.attributes, + timeout, 1), + Payload = ts_config:getText(Element#xmlElement.content), + + Request = #mqtt_request{type = Type, clean_start = CleanStart, + keepalive = KeepAlive, will_topic = WillTopic, + will_qos = WillQos, will_msg = WillMsg, + will_retain = WillRetain, topic = Topic, qos = Qos, + retained = RetainValue, payload = Payload}, + Ack = case {Type, Qos} of + {publish, 0} -> no_ack; + {disconnect, _} -> no_ack; + _ -> parse + end, + Msg = #ts_request{ack = Ack, + endpage = true, + dynvar_specs = DynVar, + subst = SubstFlag, + match = MatchRegExp, + param = Request}, + + ts_config:mark_prev_req(Id-1, Tab, CurS), + case Type of + waitForMessages -> + ets:insert(Tab, {{CurS#session.id, Id}, + {thinktime, Timeout * 1000}}); + _ -> + ets:insert(Tab, {{CurS#session.id, Id}, Msg }) + end, + + ?LOGF("request tab: ~p~n", [ets:match(Tab, '$1')], ?INFO), + + lists:foldl( fun(A, B)->ts_config:parse(A, B) end, + Config#config{dynvar = []}, + Element#xmlElement.content); + +%% Parsing other elements +parse_config(Element = #xmlElement{}, Conf = #config{}) -> + ts_config:parse(Element,Conf); +%% Parsing non #xmlElement elements +parse_config(_, Conf = #config{}) -> + Conf. + diff --git a/tsung-1.0.dtd b/tsung-1.0.dtd index b689d08d5..97869c9d5 100644 --- a/tsung-1.0.dtd +++ b/tsung-1.0.dtd @@ -121,7 +121,7 @@ repeat | if | change_type | foreach | set_option | interaction )*> persistent (true | false) #IMPLIED probability NMTOKEN #IMPLIED weight NMTOKEN #IMPLIED - type (ts_jabber | ts_http | ts_raw | ts_pgsql | ts_ldap | ts_webdav |ts_mysql| ts_fs | ts_shell | ts_job | ts_websocket | ts_amqp) #REQUIRED> + type (ts_jabber | ts_http | ts_raw | ts_pgsql | ts_ldap | ts_webdav |ts_mysql| ts_fs | ts_shell | ts_job | ts_websocket | ts_amqp | ts_mqtt) #REQUIRED> > + pgsql | ldap | mysql |fs | shell | job | websocket | amqp | mqtt) )> timeout CDATA "1" ack CDATA "false" > + + +