src/marina.erl

-module(marina).
-include("marina_internal.hrl").

-export([
    async_batch/2,
    async_query/2,
    async_reusable_query/2,
    batch/2,
    query/2,
    receive_response/1,
    response/1,
    reusable_query/2
]).

%% public
-spec async_batch([batch_query()], query_opts()) ->
    {ok, shackle:request_id()} | error().

async_batch(Queries, QueryOpts) ->
    async_call({batch, Queries}, QueryOpts).

-spec async_query(query(), query_opts()) ->
    {ok, shackle:request_id()} | error().

async_query(Query, QueryOpts) ->
    async_call({query, Query}, QueryOpts).

-spec async_reusable_query(query(), query_opts()) ->
    {ok, shackle:request_id()} | error().

async_reusable_query(Query, QueryOpts) ->
    RoutingKey = marina_utils:query_opts(routing_key, QueryOpts),
    case marina_pool:node(RoutingKey) of
        {ok, Pool} ->
            async_reusable_query(Pool, Query, QueryOpts);
        {error, Reason} ->
            {error, Reason}
    end.

-spec batch([batch_query()], query_opts()) ->
    {ok, term()} | error().

batch(Queries, QueryOpts) ->
    call({batch, Queries}, QueryOpts).

-spec query(query(), query_opts()) ->
    {ok, term()} | error().

query(Query, QueryOpts) ->
    call({query, Query}, QueryOpts).

-spec receive_response(term()) ->
    {ok, term()} | error().

receive_response(RequestId) ->
    response(shackle:receive_response(RequestId)).

-spec response({ok, term()} | error()) ->
    {ok, term()} | error().

response({ok, Frame}) ->
    marina_body:decode(Frame);
response({error, Reason}) ->
    {error, Reason}.

-spec reusable_query(query(), query_opts()) ->
    {ok, term()} | error().

reusable_query(Query, QueryOpts) ->
    RoutingKey = marina_utils:query_opts(routing_key, QueryOpts),
    case marina_pool:node(RoutingKey) of
        {ok, Pool} ->
            reusable_query(Pool, Query, QueryOpts);
        {error, Reason} ->
            {error, Reason}
    end.

%% private
async_call(Msg, QueryOpts) ->
    RoutingKey = marina_utils:query_opts(routing_key, QueryOpts),
    case marina_pool:node(RoutingKey) of
        {ok, Pool} ->
            async_call(Pool, Msg, QueryOpts);
        {error, Reason} ->
            {error, Reason}
    end.


async_call(Pool, Msg, QueryOpts) ->
    Pid = marina_utils:query_opts(pid, QueryOpts),
    Timeout = marina_utils:query_opts(timeout, QueryOpts),
    shackle:cast(Pool, {Msg, QueryOpts}, Pid, Timeout).

async_reusable_query(Pool, Query, QueryOpts) ->
    case marina_cache:get(Pool, Query) of
        {ok, StatementId} ->
            async_call(Pool, {execute, StatementId}, QueryOpts);
        {error, not_found} ->
            case call(Pool, {prepare, Query}, QueryOpts) of
                {ok, StatementId} ->
                    marina_cache:put(Pool, Query, StatementId),
                    async_call(Pool, {execute, StatementId}, QueryOpts);
                {error, Reason} ->
                    {error, Reason}
            end
    end.

call(Msg, QueryOpts) ->
    RoutingKey = marina_utils:query_opts(routing_key, QueryOpts),
    case marina_pool:node(RoutingKey) of
        {ok, Pool} ->
            call(Pool, Msg, QueryOpts);
        {error, Reason} ->
            {error, Reason}
    end.

call(Pool, Msg, QueryOpts) ->
    Timeout = marina_utils:query_opts(timeout, QueryOpts),
    response(shackle:call(Pool, {Msg, QueryOpts}, Timeout)).

reusable_query(Pool, Query, QueryOpts) ->
    Timeout = marina_utils:query_opts(timeout, QueryOpts),
    Timestamp = os:timestamp(),
    case marina_cache:get(Pool, Query) of
        {ok, StatementId} ->
            Execute = call(Pool, {execute, StatementId}, QueryOpts),
            case Execute of
                {error, {9472, _}} ->
                    marina_cache:erase(Pool, Query),
                    case marina_utils:timeout(Timeout, Timestamp) of
                        Timeout2 when Timeout2 > 0 ->
                            reusable_query(Pool, Query,
                                QueryOpts#{timeout => Timeout2});
                        _ ->
                            {error, timeout}
                    end;
                Response ->
                    Response
            end;
        {error, not_found} ->
            case call(Pool, {prepare, Query}, QueryOpts) of
                {ok, StatementId} ->
                    marina_cache:put(Pool, Query, StatementId),
                    case marina_utils:timeout(Timeout, Timestamp) of
                        Timeout2 when Timeout2 > 0 ->
                            call(Pool, {execute, StatementId},
                                QueryOpts#{timeout => Timeout2});
                        _ ->
                            {error, timeout}
                    end;
                {error, Reason} ->
                    {error, Reason}
            end
    end.