src/marina_client.erl

-module(marina_client).
-include("marina_internal.hrl").

-compile(inline).
-compile({inline_size, 512}).

-behavior(shackle_client).
-export([
    init/1,
    setup/2,
    handle_request/2,
    handle_data/2,
    terminate/1
]).

-record(state, {
    buffer      = marina_buffer:new() :: buffer(),
    frame_flags = 0                   :: frame_flag(),
    requests    = 0                   :: non_neg_integer()
}).

-type state() :: #state {}.

%% shackle_server callbacks
-spec init(undefined) ->
    {ok, state()}.

init(_Opts) ->
    {ok, #state {
        frame_flags = marina_utils:frame_flags()
    }}.

-spec setup(inet:socket(), state()) ->
    {ok, state()} |
    {error, atom(), state()}.

setup(Socket, State) ->
    case marina_utils:startup(Socket) of
        {ok, undefined} ->
            case marina_utils:use_keyspace(Socket) of
                ok ->
                    {ok, State};
                {error, Reason} ->
                    {error, Reason, State}
            end;
        {ok, <<"org.apache.cassandra.auth.PasswordAuthenticator">>} ->
            case marina_utils:authenticate(Socket) of
                ok ->
                    case marina_utils:use_keyspace(Socket) of
                        ok ->
                            {ok, State};
                        {error, Reason} ->
                            {error, Reason, State}
                    end;
                {error, Reason} ->
                    {error, Reason, State}
            end;
        {error, Reason} ->
            {error, Reason, State}
    end.

-spec handle_request(term(), state()) ->
    {ok, pos_integer(), iodata(), state()}.

handle_request({Request, QueryOpts}, #state {
        frame_flags = FrameFlags,
        requests = Requests
    } = State) ->

    RequestId = Requests rem ?MAX_STREAM_ID,
    Data = case Request of
        {batch, Queries} ->
            marina_request:batch(RequestId, FrameFlags, Queries, QueryOpts);
        {execute, StatementId} ->
            marina_request:execute(RequestId, FrameFlags, StatementId,
                QueryOpts);
        {prepare, Query} ->
            marina_request:prepare(RequestId, FrameFlags, Query);
        {query, Query} ->
            marina_request:query(RequestId, FrameFlags, Query, QueryOpts)
    end,

    {ok, RequestId, Data, State#state {
        requests = Requests + 1
    }}.

-spec handle_data(binary(), state()) ->
    {ok, [{pos_integer(), term()}], state()}.

handle_data(Data, #state {
        buffer = Buffer
    } = State) ->

    {Frames, Buffer2} = marina_buffer:decode(Data, Buffer),
    Replies = [{Frame#frame.stream, {ok, Frame}} || Frame <- Frames],

    {ok, Replies, State#state {
        buffer = Buffer2
    }}.

-spec terminate(state()) ->
    ok.

terminate(_State) ->
    ok.