lib/semaphore.ex

defmodule Semaphore do
  alias :ets, as: ETS

  @table :semaphore
  @call_safe_table :semaphore_call_safe

  ## Application Callbacks
  use GenServer

  def start(_type, _args) do
    import Supervisor.Spec, warn: false
    Supervisor.start_link([worker(__MODULE__, [])], strategy: :one_for_one)
  end

  def start_link() do
    sweep_interval = Application.get_env(:semaphore, :sweep_interval, 5_000)
    GenServer.start_link(__MODULE__, sweep_interval, name: __MODULE__)
  end

  ## Client API

  @doc """
  Acquire a semaphore, incrementing the internal count by one.
  """
  @spec acquire(term, integer) :: boolean
  def acquire(name, max) do
    case ETS.update_counter(@table, name, [{2, 0}, {2, 1, max, max}], {name, 0}) do
      [^max, _] -> false
      _ -> true
    end
  end

  @doc """
  Release a semaphore, decrementing the internal count by one.
  """
  @spec release(term) :: :ok
  def release(name) do
    ETS.update_counter(@table, name, {2, -1, 0, 0})
    :ok
  end

  @doc """
  Acquire a semaphore, incrementing the internal count by one.

  If the current process exits without releasing the semaphore, it will be automatically swept in the background. Like
  `call_linksafe`, tThis function has higher overhead than `acquire/2`.
  """
  @spec acquire_linksafe(term, any, integer) :: boolean
  def acquire_linksafe(name, id, max) do
    if acquire(name, max) do
      safe_key = {name, self(), id}

      if ETS.insert_new(@call_safe_table, {safe_key}) do
        true
      else
        release(name)
        false
      end
    else
      false
    end
  end

  @doc """
  Release a semaphore, decrementing the internal count by one.

  If the current process exits without releasing the semaphore, it will be automatically swept in the background. Like
  `call_linksafe`, tThis function has higher overhead than `acquire/2`.
  """
  @spec release_linksafe(term, any) :: :ok
  def release_linksafe(name, id) do
    safe_key = {name, self(), id}

    release(name)
    ETS.delete(@call_safe_table, safe_key)
    :ok
  end

  @doc """
  Number of acquired semaphores.
  """
  @spec count(term) :: integer
  def count(name) do
    case ETS.lookup(@table, name) do
      [{_, count}] -> count
      _ -> 0
    end
  end

  @doc """
  Reset sempahore to a specific count.
  """
  @spec reset(term, integer) :: :ok
  def reset(name, count \\ 0) do
    ETS.update_element(@table, name, {2, count})
    :ok
  end

  @doc """
  Attempt to acquire a semaphore and call a function and then automatically release.
  """
  @spec call(term, integer, function) :: term | {:error, :max}
  def call(_name, -1, func), do: func.()
  def call(_name, 0, _func), do: {:error, :max}
  def call(name, max, func) do
    if acquire(name, max) do
      try do
        func.()
      after
        release(name)
      end
    else
      {:error, :max}
    end
  end

  @doc """
  Attempt to acquire a semaphore and call a function that might link to another process, and then automatically release.

  If the current process dies in a way that is unable to be caught by the try block (e.g. a linked process dies, while
  `func` is being called. The semaphore will be automatically released by the sweeper in the background.

  This function has higher overhead than `call/3` and should only be used if you know that you might be linking to
  something in the func.
  """
  @spec call_linksafe(term, integer, function) :: term | {:error, :max}
  def call_linksafe(_name, -1, func), do: func.()
  def call_linksafe(_name, 0, _func), do: {:error, :max}
  def call_linksafe(name, max, func) do
    if acquire(name, max) do
      safe_key = {name, self(), nil}
      inserted = ETS.insert_new(@call_safe_table, {safe_key})
      try do
        func.()
      after
        if inserted do
          ETS.delete(@call_safe_table, safe_key)
        end
        release(name)
      end
    else
      {:error, :max}
    end
  end

  ## Private

  def init(sweep_interval) do
    ETS.new(@table, [:set, :public, :named_table, {:write_concurrency, true}])
    ETS.new(@call_safe_table, [:set, :public, :named_table, {:write_concurrency, true}])
    {:ok, sweep_interval, sweep_interval}
  end

  def handle_info(:timeout, sweep_interval) do
    do_sweep()
    {:noreply, sweep_interval, sweep_interval}
  end

  defp do_sweep() do
    ETS.foldl(
      fn {{name, pid, _id}} = key, :ok ->
        with false <- Process.alive?(pid),
             1 <- :ets.select_delete(@call_safe_table, [{key, [], [true]}]) do
          release(name)
        else
          _ -> :ok
        end
      end,
      :ok,
      @call_safe_table
    )
  end
end