lib/gtimer.ex

defmodule Gtimer do
  @moduledoc """
  A small library which provides a global timer facility in Elixir (or Erlang). 

  ## Example: 
  iex> timer_ref = Gtimer.new_timer(timeout,options \\ [])

  Starts a new timer running for `timeout` millseconds, returns a "timer reference".

  iex> Gtimer.cancel_timer(timer_ref)

  Cancels a running timer.

  If a running timer is not cancelled when the timer expires a
  `timeout_action` function (defined as an option)
  will be called with the process identifier of the process that created the timer
  as a single argument. If the timeout_action is not a function of arity 1, then the 
  default action of logging an informative warning message, will be taken.
  
  Timer management is done in a separate process. Inside the process
  timers are stored in a priority queue, and using a map, which
  affords logarithmic worst-case time complexity for both function
  calls (in terms of the number of timers running).

  ## Example:

  iex> timer_ref = Gtimer.new_timer(1000,fn pid -> Process.exit(pid,:because) end)

  This will start a new timer which when it expires kills the process which invoked
  the call to `new_timer`.
  """
  use GenServer
  require Logger

  @doc false
  def init(_) do
    {:ok,pqueue} = :epqueue.new()
    {:ok, %{pqueue: pqueue, map: %{}, counter: 0, expired: []}}
  end

  def handle_call({:new_timer, timeout, pid, info, options}, _from, state) do
    timer_ref = state.counter
    timeout_time = :os.system_time(:millisecond)+timeout
    queue_item = %{timer_ref: timer_ref, timeout: timeout, timeout_time: timeout_time, pid: pid, info: info, options: options}
    {:ok, item_ref} = :epqueue.insert(state.pqueue, queue_item, timeout_time)
    map_item = Map.put(queue_item,:pqueue_ref,item_ref)
    state =
      %{ state |
         map: Map.put(state.map,timer_ref,map_item),
         counter: state.counter+1 }
    case calculate_timeout(state) do
      {:ok, next_time} ->
        {:reply, timer_ref, state, next_time}
      _ ->
        {:reply, timer_ref, state}
    end
  end

  def handle_call({:cancel_timer, timer_ref}, _from, state) do
    state = 
      case Map.get(state.map,timer_ref,:undefined) do
        :undefined ->
          state
        map_item ->
          pqueue_ref = map_item.pqueue_ref
          :epqueue.remove(state.pqueue,pqueue_ref)
          %{ state | map: Map.drop(state.map,[timer_ref]) }
      end
    case calculate_timeout(state) do
      {:ok, next_time} ->
        {:reply, :ok, state, next_time}
      _ ->
        {:reply, :ok, state}
    end
  end

  def handle_call(:expired_timers,_,state) do
    case calculate_timeout(state) do
      {:ok, next_time} ->
        {:reply, state.expired, state, next_time}
      _ ->
        {:reply, state.expired, state}
    end
  end

  def handle_info(:timeout, state) do
    {:ok, item, _} = :epqueue.pop(state.pqueue)
    case Keyword.get(item.options,:timeout_action) do
      fun when is_function(fun,1) ->
        try do
          fun.(item)
        rescue
          _ -> IO.puts("*** ERROR: Gtimer: timeout_function for "<>print_info(item.info)<>" raised an exception")
        end
      _ -> 
        level = Keyword.get(item.options,:log_level,:warning)
        Logger.log(level,"Corsa timer timed out after #{IO.inspect(item.timeout)} milliseconds #{print_info(item.info)}")
    end
    state = %{ state |
               map: Map.drop(state.map,[item.timer_ref]),
               expired: [ item | state.expired ] }
    case calculate_timeout(state) do
      {:ok, next_time} ->
        {:noreply, state, next_time}
      _ ->
        {:noreply, state}
    end
  end

  def handle_info(other, state) do
    IO.puts("*** WARNING: Gtimer: unexpected message #{inspect other} received")
    {:noreply, state}
  end

  defp calculate_timeout(state) do
    case :epqueue.size(state.pqueue) do
      n when n>0 ->
        {:ok, _, timeout_time} = :epqueue.peek(state.pqueue)
        current_time = :os.system_time(:millisecond)
        next_timeout = max(0,timeout_time-current_time)
        {:ok, next_timeout}
      _ ->
        nil
    end
  end

  
  @doc """
  Starts a new timer running for `timeout` millseconds, returns a "timer reference".
  If a running timer is not cancelled when the timer expires the timeout_action function 
  will be called with the process identifier of the process that created the timer as 
  a single argument. Options may define a {`timeout_action`,fun}
  which is the action taken when a timer expires, or {`log_level`,level}
  which determines the Log level for timer expire messages.
  """
  def new_timer(timeout,info,options \\ []) do
    case Process.whereis(:gtimer) do
      nil ->
        # Do not link to the calling process; we are likely to kill it!
        GenServer.start(Gtimer, [], [{:name, :gtimer}])
        new_timer(timeout,info,options)
      _pid ->
        GenServer.call(:gtimer,{:new_timer, timeout, self(), info, options})
    end
  end

  @doc """
  Cancels a running timer.
  """
  def cancel_timer(time_ref) do
    case Process.whereis(:gtimer) do
      nil ->
        GenServer.start(Gtimer, [], [{:name, :gtimer}])
        cancel_timer(time_ref)
      _pid ->
        GenServer.call(:gtimer,{:cancel_timer, time_ref})
    end
  end

  @doc """
  Returns expired timers.
  """
  def expired_timers() do
    case Process.whereis(:gtimer) do
      nil ->
        GenServer.start(Gtimer, [], [{:name, :gtimer}])
        expired_timers()
      _pid ->
        GenServer.call(:gtimer,:expired_timers)
    end
  end

  def stop() do
    case Process.whereis(:gtimer) do
      nil -> :ok
      _pid -> GenServer.stop(:gtimer)
    end
  end

  defp print_info(info) do
    case info do
      {:call,_,_,_} -> "for call "<>pretty_print_mfa(info)
      _ when is_binary(info) -> ": "<>info
      _ -> IO.inspect(info)
    end
  end

  defp pretty_print_mfa({:call, module, function, args}) do
    module_string = inspect(module)
    function_string = Atom.to_string(function)
    args_string = Enum.map(args, &inspect/1) |> Enum.join(", ")
    "#{module_string}.#{function_string}(#{args_string})"
  end
  
end