lib/idempotency_plug/store/ecto_store.ex

if Code.ensure_loaded?(Ecto) do
defmodule IdempotencyPlug.EctoStore do
  @moduledoc """
  Module that defines an Ecto store.

  A migration file should be generated with
  `mix idempotency_plug.ecto.gen.migration`.

  ## Examples

      defmodule MyApp.Application do
        # ..

        def start(_type, _args) do
          children = [
            {IdempotencyPlug.RequestTracker, [
              store: {IdempotencyPlug.EctoStore, repo: MyApp.Repo}]}
            # ...
          ]

          Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
        end
      end
  """

  defmodule ErlangTerm do
    @moduledoc false
    use Ecto.Type

    @impl true
    def type, do: :binary

    @impl true
    def cast(term), do: {:ok, term}

    @impl true
    def load(bin) when is_binary(bin), do: {:ok, :erlang.binary_to_term(bin)}

    @impl true
    def dump(term), do: {:ok, :erlang.term_to_binary(term)}
  end

  defmodule IdempotentRequest do
    @moduledoc false
    use Ecto.Schema

    import Ecto.Changeset

    @primary_key {:id, :string, autogenerate: false}
    @timestamps_opts [type: :utc_datetime_usec]

    schema "idempotency_plug_requests" do
      field :fingerprint, :string
      field :data, ErlangTerm
      field :expires_at, :utc_datetime_usec

      timestamps()
    end

    def changeset(struct) do
      struct
      |> change()
      |> unique_constraint(:id, name: :idempotency_plug_requests_pkey)
    end
  end

  @behaviour IdempotencyPlug.Store

  import Ecto.Query

  @impl true
  def setup(opts) do
    case repo(opts) do
      {:ok, repo} ->
        # This will raise an error if the migration haven't been generate
        # for the repo
        repo.exists?(IdempotentRequest)

        :ok

      {:error, error} ->
        {:error, error}
    end
  end

  @impl true
  def lookup(request_id, opts) do
    case repo!(opts).get(IdempotentRequest, request_id) do
      nil -> :not_found
      request -> {request.data, request.fingerprint, request.expires_at}
    end
  end

  @impl true
  def insert(request_id, data, fingerprint, expires_at, opts) do
    changeset =
      IdempotentRequest.changeset(%IdempotentRequest{
        id: request_id,
        data: data,
        fingerprint: fingerprint,
        expires_at: expires_at
      })

    case repo!(opts).insert(changeset) do
      {:ok, _} -> :ok
      {:error, error} -> {:error, error}
    end
  end

  @impl true
  def update(request_id, data, expires_at, opts) do
    repo = repo!(opts)
    updates = [set: [data: data, expires_at: expires_at, updated_at: DateTime.utc_now()]]

    IdempotentRequest
    |> where(id: ^request_id)
    |> repo.update_all(updates)
    |> case do
      {1, _} -> :ok
      {0, _} -> {:error, "key #{request_id} not found in store"}
    end
  end

  @impl true
  def prune(opts) do
    repo = repo!(opts)

    IdempotentRequest
    |> where([r], r.expires_at < ^DateTime.utc_now())
    |> repo.delete_all()

    :ok
  end

  defp repo(opts) do
    case Keyword.fetch(opts, :repo) do
      {:ok, repo} -> {:ok, repo}
      :error -> {:error, ":repo must be specified in options for #{inspect __MODULE__}"}
    end
  end

  defp repo!(opts) do
    case repo(opts) do
      {:ok, repo} -> repo
      {:error, error} -> raise error
    end
  end
end
end