defmodule RemotePersistentTerm do
@moduledoc """
Fetch data from a remote source and store it in a [persistent_term](https://www.erlang.org/doc/man/persistent_term.html).
Can be configured to periodically check for updates.
`use` this module to define a GenServer that will manage the state of your fetcher and keep your term up to date.
"""
alias RemotePersistentTerm.Fetcher
require Logger
@opts_schema [
fetcher_mod: [
type: {:custom, __MODULE__, :existing_module?, []},
required: true,
default: Fetcher.S3,
doc: """
The implementation of the `RemotePersistentTerm.Fetcher` behaviour which \
should be used. Either one of the built in modules or a custom module.
"""
],
fetcher_opts: [
type: :keyword_list,
required: false,
default: [],
doc: """
Configuration options for the chosen fetcher implementation. \
See your chosen implementation module for details."
"""
],
refresh_interval: [
type: {:or, [:pos_integer, nil]},
required: false,
default: nil,
doc: """
How often the term should be updated in milliseconds. To disable automatic refresh, \
set the value to `nil`.
Note: updating persistent_terms can be very expensive. \
See [the docs](https://www.erlang.org/doc/man/persistent_term.html) for more info."
"""
],
lazy_init?: [
type: :boolean,
required: false,
default: false,
doc: """
If true, the GenServer will start up immediately and the term will be \
populated in a `handle_continue/2`.
This means that there will be a period while the remote term is being \
downloaded and no data is available. If this is not acceptable, set this \
value to `false` (the default).
"""
],
alias: [
type: :atom,
required: false,
doc: """
An alias for this term. A value will be generated based on the module \
name if no value is provided. Used for Telemetry events.
"""
]
]
@type t :: %__MODULE__{
fetcher_mod: module(),
fetcher_state: term(),
refresh_interval: pos_integer(),
current_version: String.t()
}
defstruct [:fetcher_mod, :fetcher_state, :refresh_interval, :current_version, :name]
@doc """
Define a GenServer that will manage this specific persistent_term.
Example:
This will define a GenServer that should be placed in your supervision tree.
The GenServer will check for a new version of `s3://my-bucket/my-object` every
12 hours and store it in a persistent_term.
Define the module:
```
defmodule MyRemotePterm do
use RemotePersistentTerm
end
```
In your supervision tree:
```
{MyRemotePterm,
[
fetcher_mod: RemotePersistentTerm.Fetcher.S3,
fetcher_opts: [bucket: "my-bucket", key: "my-object"],
refresh_interval: :timer.hours(12)
]}
```
Options:
#{NimbleOptions.docs(@opts_schema)}
"""
defmacro __using__(_opts) do
quote do
use GenServer
@behaviour RemotePersistentTerm
def start_link(opts) do
with {:ok, valid_opts} <- RemotePersistentTerm.validate_options(opts) do
GenServer.start_link(__MODULE__, valid_opts, name: __MODULE__)
end
end
@impl GenServer
def init(opts) do
fetcher_mod = opts[:fetcher_mod]
with {:ok, fetcher_state} <- fetcher_mod.init(opts[:fetcher_opts]) do
state = %RemotePersistentTerm{
fetcher_mod: fetcher_mod,
fetcher_state: fetcher_state,
refresh_interval: opts[:refresh_interval],
name: name(opts)
}
if opts[:lazy_init?] do
{:ok, state, {:continue, :fetch_term}}
else
{:ok, do_update_term(state)}
end
end
end
@impl GenServer
def handle_continue(:fetch_term, state) do
{:noreply, do_update_term(state)}
end
@impl GenServer
def handle_call(:update, _, state) do
{:reply, :ok, do_update_term(state)}
end
@impl RemotePersistentTerm
def get, do: :persistent_term.get(__MODULE__, nil)
defoverridable get: 0
@impl RemotePersistentTerm
def put(term), do: :persistent_term.put(__MODULE__, term)
defoverridable put: 1
@impl RemotePersistentTerm
def deserialize(term), do: {:ok, term}
defoverridable deserialize: 1
@doc """
Immediately update the term.
"""
def update, do: GenServer.call(__MODULE__, :update, :timer.minutes(5_000))
defp do_update_term(state) do
version =
RemotePersistentTerm.update_term(
state,
&deserialize/1,
&put/1
)
if is_integer(state.refresh_interval) do
Process.send_after(self(), :update, state.refresh_interval)
end
%{state | current_version: version}
end
defp name(opts) do
opts[:alias] || __MODULE__ |> to_string |> String.split(".") |> tl() |> Enum.join(".")
end
end
end
@doc """
Retrieve the currently stored term.
Overridable.
"""
@callback get() :: term() | nil
@doc """
Update the persistent_term.
Overridable.
This is called after `deserialize/1`.
"""
@callback put(term()) :: :ok | {:error, term()}
@doc """
Deserializes the remote term, before storing it.
Overridable.
Commonly the remote term is an ETF encoded binary. In this case you will likely want to
override this callback with something like:
```
def deserialize(binary) do
{:ok, :erlang.binary_to_term(binary)}
rescue
_ ->
{:error, "got invalid ETF"}
end
```
"""
@callback deserialize(term()) :: {:ok, term()} | {:error, term()}
@doc false
def update_term(state, deserialize_fun, put_fun) do
start_meta = %{name: state.name}
:telemetry.span(
[:remote_persistent_term, :update],
start_meta,
fn ->
{status, version} =
with {:ok, current_version} <- state.fetcher_mod.current_version(state.fetcher_state),
true <- state.current_version != current_version,
:ok <- download_and_store_term(state, deserialize_fun, put_fun) do
{:updated, current_version}
else
false ->
Logger.info("#{state.name} - up to date")
{:not_updated, state.current_version}
{:error, reason} ->
Logger.error(
"#{state.name} - failed to update remote term, reason: #{inspect(reason)}"
)
{:not_updated, state.current_version}
end
{version, Map.put(start_meta, :status, status)}
end
)
end
@doc false
def existing_module?(value) do
case Code.ensure_compiled(value) do
{:module, ^value} ->
{:ok, value}
_ ->
{:error, "#{__MODULE__} does not exist"}
end
end
@doc false
def validate_options(opts), do: NimbleOptions.validate(opts, @opts_schema)
defp download_and_store_term(state, deserialize_fun, put_fun) do
with {:ok, term} <- state.fetcher_mod.download(state.fetcher_state),
{:ok, deserialized} <- deserialize_fun.(term) do
put_fun.(deserialized)
end
end
end