lib/markov/model_server.ex

defmodule Markov.ModelServer do
  use GenServer
  @moduledoc """
  GenServer in charge of one model
  """

  @main_table_options [keys: 3, compress: true, part_initial: 512, part_size: 10_000, part_timeout: 250, slot_size: 1024]
  @log_entry_mapping %{start: 1, end: 2, train: 3, gen: 4}
  def log_entry_mapping, do: @log_entry_mapping
  def rev_log_entry_map, do: @log_entry_mapping |> Enum.map(fn {k, v} -> {v, k} end) |> Enum.into(%{})

  require Logger
  alias Markov.ModelActions

  defmodule State do
    defstruct [:path, options: [], main_table: nil, history_file: nil, aboba: 1]
    @type t :: %Markov.ModelServer.State{
      path: String.t,
      options: [Markov.model_option],
      main_table: Sidx.Table.t,
      history_file: :file.io_device,
      aboba: integer()
    }
  end

  # Semi-public API

  @type start_options() :: [
    path: String.t(),
    create_opts: [Markov.model_option]
  ]

  @spec start(options :: start_options()) :: DynamicSupervisor.on_start_child()
  def start(options), do:
    DynamicSupervisor.start_child(Markov.ModelSup, %{
      id: options[:path],
      start: {Markov.ModelServer, :start_link, [options]},
      restart: :transient
    })

  @spec start_link(options :: start_options()) :: GenServer.on_start()
  def start_link(options) do
    proc_name = {:via, Registry, {Markov.ModelServers, options[:path]}}
    GenServer.start_link(__MODULE__, options, name: proc_name)
  end

  @spec init(options :: start_options()) :: {:ok, State.t()} | {:stop, term()}
  def init(options) do
    # for terminate/2 to work properly
    Process.flag(:trap_exit, true)

    # read state
    path = options[:path]
    File.mkdir_p(path)
    state = case File.read(Path.join(path, "state.etf")) do
      {:ok, data} -> :erlang.binary_to_term(data)
      {:error, :enoent} -> %State{options: options[:create_opts]}
      {:error, err} -> raise err
    end

    # open tables
    main = Sidx.open!(Path.join(path, "main"), @main_table_options)
    {:ok, history} = :file.open(Path.join(path, "history.log"), [:append, :binary, :raw])

    state = %State{state | main_table: main, history_file: history, path: path}
    log(state, "loaded state and tables")
    write_log_entry(state, :start, nil)
    {:ok, state}
  end

  @spec terminate(reason :: term(), state :: State.t()) :: term()
  def terminate(_reason, state) do
    write_log_entry(state, :end, nil)

    # dump everything
    state_bin = :erlang.term_to_binary(state)
    File.write!(Path.join(state.path, "state.etf"), state_bin)
    :file.close(state.history_file)
    Sidx.close!(state.main_table)
  end

  @spec handle_call(request :: {:configure, [Markov.model_option()]},
    from :: term(), state :: State.t()) :: {:reply, term(), State.t()}
  def handle_call({:configure, options}, _, state) do
    case configure(state, options) do
      {:ok, new_state} ->
        {:reply, :ok, new_state}
      {:error, _} = error ->
        {:reply, error, state}
    end
  end

  @spec handle_call(request :: :get_config, from :: term(), state :: State.t()) :: {:reply, term(), State.t()}
  def handle_call(:get_config, _, state) do
    {:reply, {:ok, state.options}, state}
  end

  @spec handle_call(request :: {:train, [term()], [term()]}, from :: term(), state :: State.t()) :: {:reply, term(), State.t()}
  def handle_call({:train, tokens, tags}, _, state) do
    write_log_entry(state, :train, tokens)
    {:reply, ModelActions.train(state, tokens, tags), state}
  end

  @spec handle_call(request :: {:generate, Markov.tag_query()}, from :: term(), state :: State.t()) :: {:reply, term(), State.t()}
  def handle_call({:generate, tag_query}, _, state) do
    {result, state} = ModelActions.generate(state, tag_query)
    write_log_entry(state, :gen, result)
    {:reply, result, state}
  end

  def handle_info({:EXIT, pid, reason}, state) do
    cond do
      pid == self() ->
        {:stop, reason, state}
      reason == :normal or reason == :shutdown ->
        {:noreply, state}
      true ->
        {:stop, reason, state}
    end
  end

  # Internal functions

  @spec write_log_entry(state :: State.t(), type :: Markov.log_entry_type(), data :: term()) :: :ok | :ignored | {:error, term()}
  defp write_log_entry(state, type, data) do
    if type in state.options[:store_log] do
      type = Map.get(@log_entry_mapping, type)
      ts = :erlang.system_time(:millisecond)
      data = :erlang.term_to_binary(data)
      :file.write(state.history_file, <<type::8, ts::64, byte_size(data)::16, data::binary>>)
    else :ignored end
  end

  @spec log(state :: State.t(), string :: String.t()) :: term()
  defp log(state, string), do:
    Logger.debug("model \"#{state.path}\" (#{inspect(self())}): #{string}")

  @spec configure(old_state :: State.t(), opts :: [Markov.model_option()]) :: {:ok, State.t()} | {:error, term()}
  defp configure(old_state, opts) do
    log(old_state, "reconfiguring: #{inspect opts}")

    # special set-up and error detection for some options
    had_sanitation = old_state.options[:sanitize_tokens]
    previous_order = old_state.options[:order]

    statuses = for {key, value} <- opts do case key do
      :sanitize_tokens when had_sanitation != value and had_sanitation != nil ->
        {:error, :cant_change_sanitation}
      :order when previous_order != value and previous_order != nil ->
        {:error, :cant_change_order}

      _ -> :ok
    end end

    # report first error or merge options
    error = Enum.find(statuses, & &1 != :ok)
    if error !== nil, do: error, else:
      {:ok, %State{old_state | options: Keyword.merge(old_state.options, opts)}}
  end
end