-module(lake_raw_connection).
-export([connect/6, tls_connect/6]).
-include("response_codes.hrl").
connect(Host, Port, User, Password, Vhost, Options) ->
try
{ok, Socket} = gen_tcp:connect(Host, Port, [binary, {active, once}]),
connect1({gen_tcp, Socket}, User, Password, Vhost, Options)
catch
throw:Reason ->
{error, Reason}
end.
tls_connect(Host, Port, User, Password, Vhost, Options) ->
try
{ok, TlsSocket} = ssl:connect(Host, Port, [binary, {active, once}]),
connect1({ssl, TlsSocket}, User, Password, Vhost, Options)
catch
throw:Reason ->
{error, Reason}
end.
connect1(Socket, User, Password, Vhost, Options) ->
peer_properties(Socket),
sasl_handshake(Socket),
%% TODO Allow using mechanisms other than <<"PLAIN">> ?
MaybeTune = sasl_authenticate(Socket, <<"PLAIN">>, User, Password),
{FrameMax, NegotiatedHeartbeat} =
tune(Socket, MaybeTune, proplists:get_value(heartbeat, Options)),
open(Socket, Vhost),
case proplists:get_value(exchange_command_versions, Options, true) of
true ->
exchange_command_versions(Socket);
false ->
ok
end,
{ok, {Socket, FrameMax, NegotiatedHeartbeat}}.
peer_properties(Socket) ->
PeerProperties = [{<<"platform">>, <<"Erlang">>}],
lake_utils:send_message(Socket, lake_messages:peer_properties(0, PeerProperties)),
case wait_for_message(Socket) of
{{peer_properties_response, 0, ?RESPONSE_OK, _}, <<>>} ->
ok;
{{peer_properties_response, 0, ResponseCode, _}, <<>>} ->
throw({peer_properties_failed, lake_utils:response_code_to_atom(ResponseCode)})
end.
sasl_handshake(Socket) ->
lake_utils:send_message(Socket, lake_messages:sasl_handshake(1)),
case wait_for_message(Socket) of
{{sasl_handshake_response, 1, ?RESPONSE_OK, Mechanisms}, <<>>} ->
Mechanisms;
{{sasl_handshake_response, 1, ResponseCode, _Mechanisms}, <<>>} ->
throw({sasl_handshake_failed, lake_utils:response_code_to_atom(ResponseCode)})
end.
sasl_authenticate(Socket, Mechanism, User, Password) ->
lake_utils:send_message(Socket, lake_messages:sasl_authenticate(2, Mechanism, User, Password)),
case wait_for_message(Socket) of
{{sasl_authenticate_response, 2, ?RESPONSE_OK, _SaslOpaque}, Rest} ->
Rest;
{{sasl_authenticate_response, 2, ResponseCode, _SaslOpaque}, _Rest} ->
throw({authentication_failed, lake_utils:response_code_to_atom(ResponseCode)})
end.
tune(Socket, MaybeTune, ClientHeartbeat) ->
{{tune, FrameMax, ServerHeartbeat}, <<>>} =
case MaybeTune of
<<>> ->
wait_for_message(Socket);
<<Size:32, Tune:Size/binary>> ->
{lake_messages:parse(Tune), <<>>}
end,
NegotiatedHeartbeat = pick_heartbeat(ServerHeartbeat, ClientHeartbeat),
lake_utils:send_message(Socket, lake_messages:tune(FrameMax, NegotiatedHeartbeat)),
{FrameMax, NegotiatedHeartbeat}.
pick_heartbeat(ServerHeartbeat, undefined) -> ServerHeartbeat;
pick_heartbeat(ServerHeartbeat, ClientHeartbeat) -> min(ServerHeartbeat, ClientHeartbeat).
open(Socket, Vhost) ->
lake_utils:send_message(Socket, lake_messages:open(3, Vhost)),
case wait_for_message(Socket) of
%% FIXME make use of advertised host and port?
{{open_response, 3, ?RESPONSE_OK, _ConnectionProperties}, <<>>} ->
ok;
{{open_response, 3, ResponseCode}, <<>>} ->
throw({open_failed, lake_utils:response_code_to_atom(ResponseCode)})
end.
exchange_command_versions(Socket) ->
lake_utils:send_message(Socket, lake_messages:exchange_command_versions(4)),
case wait_for_message(Socket) of
{{exchange_command_versions_response, 4, ?RESPONSE_OK, _CommandVersions}, <<>>} ->
ok;
{{exchange_command_versions_response, 4, ResponseCode, _}, <<>>} ->
throw({exchange_command_versions_failed, lake_utils:response_code_to_atom(ResponseCode)})
end.
wait_for_message({gen_tcp, Socket}) ->
receive
{tcp, Socket, <<Size:32, Message:Size/binary, Rest/binary>>} ->
inet:setopts(Socket, [{active, once}]),
Parsed = lake_messages:parse(Message),
{Parsed, Rest};
{tcp, Socket, Other} ->
{error, {malformed, Other}}
after 5000 -> {error, timeout}
end;
wait_for_message({ssl, Socket}) ->
receive
{ssl, Socket, <<Size:32, Message:Size/binary, Rest/binary>>} ->
ssl:setopts(Socket, [{active, once}]),
Parsed = lake_messages:parse(Message),
{Parsed, Rest};
{ssl, Socket, Other} ->
{error, {malformed, Other}};
Other ->
{error, {unexpected, Other}}
after 5000 -> {error, timeout}
end.