lib/rafted_value/log_entry.ex

use Croma

defmodule RaftedValue.LogEntry do
  alias RaftedValue.{TermNumber, LogIndex, Config, Data}

  @type t :: {TermNumber.t, LogIndex.t, :command           , {GenServer.from, Data.command_arg, reference}}
           | {TermNumber.t, LogIndex.t, :query             , {GenServer.from, Data.query_arg}}
           | {TermNumber.t, LogIndex.t, :change_config     , Config.t}
           | {TermNumber.t, LogIndex.t, :leader_elected    , pid} # Old format (< 0.11), newer version (>= 0.11) uses :leader_elected2. Will be removed in future versions.
           | {TermNumber.t, LogIndex.t, :add_follower      , pid}
           | {TermNumber.t, LogIndex.t, :remove_follower   , pid}
           | {TermNumber.t, LogIndex.t, :restore_from_files, pid}
           | {TermNumber.t, LogIndex.t, :leader_elected2   , [pid]}

  defun valid?(v :: any) :: boolean do
    {_, _, _, _} -> true
    _            -> false
  end

  defp entry_type_to_tag(:command           ), do: 0
  defp entry_type_to_tag(:query             ), do: 1
  defp entry_type_to_tag(:change_config     ), do: 2
  defp entry_type_to_tag(:leader_elected    ), do: 3
  defp entry_type_to_tag(:add_follower      ), do: 4
  defp entry_type_to_tag(:remove_follower   ), do: 5
  defp entry_type_to_tag(:restore_from_files), do: 6
  defp entry_type_to_tag(:leader_elected2   ), do: 7

  defp tag_to_entry_type(0), do: {:ok, :command           }
  defp tag_to_entry_type(1), do: {:ok, :query             }
  defp tag_to_entry_type(2), do: {:ok, :change_config     }
  defp tag_to_entry_type(3), do: {:ok, :leader_elected    }
  defp tag_to_entry_type(4), do: {:ok, :add_follower      }
  defp tag_to_entry_type(5), do: {:ok, :remove_follower   }
  defp tag_to_entry_type(6), do: {:ok, :restore_from_files}
  defp tag_to_entry_type(7), do: {:ok, :leader_elected2   }
  defp tag_to_entry_type(_), do: :error

  defun to_binary({term, index, entry_type, others} :: t) :: binary do
    bin = :erlang.term_to_binary(others)
    size = byte_size(bin)
    <<term :: size(64), index :: size(64), entry_type_to_tag(entry_type) :: size(8), size :: size(64), bin :: binary, size :: size(64)>>
  end

  defunpt extract_from_binary(bin :: binary) :: nil | {t, rest :: binary} do
    with <<term :: size(64), index :: size(64), type_tag :: size(8), size1 :: size(64)>> <> rest1 <- bin,
         {:ok, entry_type} = tag_to_entry_type(type_tag),
         <<others_bin :: binary-size(size1), size2 :: size(64)>> <> rest2 <- rest1 do
      if size1 == size2 do
        {{term, index, entry_type, :erlang.binary_to_term(others_bin)}, rest2}
      else
        raise "redundant size information in log entry not matched"
      end
    else
      _ -> nil # insufficient input, can be retried with subsequent binary data
    end
  end

  defunp extract_multiple_from_binary(bin :: binary) :: {[t], rest :: binary} do
    extract_multiple_from_binary_impl(bin, [])
  end

  defp extract_multiple_from_binary_impl(bin, acc) do
    case extract_from_binary(bin) do
      nil           -> {Enum.reverse(acc), bin}
      {entry, rest} -> extract_multiple_from_binary_impl(rest, [entry | acc])
    end
  end

  def read_as_stream(log_path) do
    File.stream!(log_path, [], 4096)
    |> Stream.transform(<<>>, fn(bin, carryover) ->
      extract_multiple_from_binary(carryover <> bin)
    end)
  end

  defun read_last_entry_index(log_path :: Path.t) :: nil | LogIndex.t do
    case :file.open(log_path, [:raw, :binary]) do
      {:ok, f}    -> read_last_entry_index_impl(f, File.stat!(log_path).size)
      {:error, _} -> nil
    end
  end

  defp read_last_entry_index_impl(f, size) do
    if size < 8 do
      nil
    else
      {:ok, <<binsize1 :: size(64)>>} = :file.pread(f, size - 8, 8)
      last_entry_start_offset = size - binsize1 - 33 # (term: 8, index: 8, tag: 1, size1: 8, size2: 8)
      if last_entry_start_offset < 0 do
        nil
      else
        {:ok, <<_term :: size(64), index :: size(64), type_tag :: size(8), binsize2 :: size(64)>>} = :file.pread(f, last_entry_start_offset, 25)
        case tag_to_entry_type(type_tag) do # minimal sanity checking
          {:ok, _} when binsize2 == binsize1 -> index
          _                                  -> nil
        end
      end
    end
  end
end