Skip to main content

src/livery_grpc_health_store.erl

-module(livery_grpc_health_store).
-moduledoc """
Serving-status store behind `livery_grpc_health`, with watch subscriptions.

Holds the per-service status and the set of watchers. Setting a status
pushes a `{grpc_health_watch, Service, Status}` message to every watcher of
that service, which is how `Watch` streams live updates. Watchers are
monitored, so a disconnected watcher is dropped automatically.

The empty service name (`<<>>`) is the overall server status and defaults
to `SERVING`; a named service that was never set is unknown.
""".
-behaviour(gen_server).

-export([start_link/0, set/2, status/1, subscribe/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

-export_type([serving_status/0]).

-type serving_status() :: 'UNKNOWN' | 'SERVING' | 'NOT_SERVING' | 'SERVICE_UNKNOWN'.

-record(state, {
    statuses = #{} :: #{binary() => serving_status()},
    %% Service -> set of watcher pids and their monitor refs.
    watchers = #{} :: #{binary() => #{pid() => reference()}},
    %% Monitor ref -> {Service, Pid}, for O(1) cleanup on DOWN.
    monitors = #{} :: #{reference() => {binary(), pid()}}
}).

-define(SERVER, ?MODULE).

%%====================================================================
%% API
%%====================================================================

-spec start_link() -> {ok, pid()} | {error, term()}.
start_link() ->
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

-doc "Set a service's serving status and notify its watchers.".
-spec set(binary(), serving_status()) -> ok.
set(Service, Status) ->
    gen_server:call(?SERVER, {set, Service, Status}).

-doc """
The current status for `Check`: `{ok, Status}`, or `not_found` for a named
service that was never registered (the overall server defaults to
`SERVING`).
""".
-spec status(binary()) -> {ok, serving_status()} | not_found.
status(Service) ->
    gen_server:call(?SERVER, {status, Service}).

-doc """
Register the caller as a watcher of `Service` and return the current
status to emit first. A named unknown service resolves to
`SERVICE_UNKNOWN` (Watch keeps the stream open rather than erroring).
""".
-spec subscribe(binary()) -> serving_status().
subscribe(Service) ->
    gen_server:call(?SERVER, {subscribe, Service, self()}).

%%====================================================================
%% gen_server
%%====================================================================

-spec init([]) -> {ok, #state{}}.
init([]) ->
    {ok, #state{}}.

-spec handle_call(term(), {pid(), term()}, #state{}) -> {reply, term(), #state{}}.
handle_call({status, Service}, _From, State) ->
    {reply, lookup(Service, State), State};
handle_call({subscribe, Service, Pid}, _From, State) ->
    MRef = erlang:monitor(process, Pid),
    State1 = add_watcher(Service, Pid, MRef, State),
    {reply, resolve(Service, State), State1};
handle_call({set, Service, Status}, _From, #state{statuses = Statuses} = State) ->
    notify(Service, Status, State),
    {reply, ok, State#state{statuses = Statuses#{Service => Status}}};
handle_call(_Request, _From, State) ->
    {reply, {error, unknown_call}, State}.

-spec handle_cast(term(), #state{}) -> {noreply, #state{}}.
handle_cast(_Msg, State) ->
    {noreply, State}.

-spec handle_info(term(), #state{}) -> {noreply, #state{}}.
handle_info({'DOWN', MRef, process, _Pid, _Reason}, State) ->
    {noreply, drop_watcher(MRef, State)};
handle_info(_Info, State) ->
    {noreply, State}.

-spec terminate(term(), #state{}) -> ok.
terminate(_Reason, _State) ->
    ok.

-spec code_change(term(), #state{}, term()) -> {ok, #state{}}.
code_change(_Old, State, _Extra) ->
    {ok, State}.

%%====================================================================
%% Internals
%%====================================================================

-spec lookup(binary(), #state{}) -> {ok, serving_status()} | not_found.
lookup(Service, #state{statuses = Statuses}) ->
    case maps:find(Service, Statuses) of
        {ok, Status} -> {ok, Status};
        error when Service =:= <<>> -> {ok, 'SERVING'};
        error -> not_found
    end.

-spec resolve(binary(), #state{}) -> serving_status().
resolve(Service, State) ->
    case lookup(Service, State) of
        {ok, Status} -> Status;
        not_found -> 'SERVICE_UNKNOWN'
    end.

-spec add_watcher(binary(), pid(), reference(), #state{}) -> #state{}.
add_watcher(Service, Pid, MRef, #state{watchers = Watchers, monitors = Monitors} = State) ->
    ForService = maps:get(Service, Watchers, #{}),
    State#state{
        watchers = Watchers#{Service => ForService#{Pid => MRef}},
        monitors = Monitors#{MRef => {Service, Pid}}
    }.

-spec drop_watcher(reference(), #state{}) -> #state{}.
drop_watcher(MRef, #state{watchers = Watchers, monitors = Monitors} = State) ->
    case maps:take(MRef, Monitors) of
        {{Service, Pid}, Monitors1} ->
            ForService = maps:remove(Pid, maps:get(Service, Watchers, #{})),
            State#state{watchers = Watchers#{Service => ForService}, monitors = Monitors1};
        error ->
            State
    end.

-spec notify(binary(), serving_status(), #state{}) -> ok.
notify(Service, Status, #state{watchers = Watchers}) ->
    Pids = maps:keys(maps:get(Service, Watchers, #{})),
    lists:foreach(fun(Pid) -> Pid ! {grpc_health_watch, Service, Status} end, Pids).