src/file_log_reader.erl

%%% vim:ts=4:sw=4:et
%%%----------------------------------------------------------------------------
%%% @doc Periodically read an append-only log file and parse newly added data.
%%%
%%% The user controls the interval in msec how often to check for file
%%% modifications. When new data is appended to file it triggers invocation of
%%% the user-defined parsing function that deliminates the file, and the result
%%% is delivered to the consumer by calling the consumer callback function.
%%%
%%% The log reader can be started as a `gen_server' or can be controlled
%%% synchronously by using `init/3', `run/1', and `close/1' methods.
%%%
%%% @author    Serge Aleynikov <saleyn@gmail.com>
%%% @copyright 2015 Serge Aleynikov
%%% @end
%%%----------------------------------------------------------------------------
%%% Created: 2015-02-12
%%%----------------------------------------------------------------------------
-module(file_log_reader).
-author('saleyn@gmail.com').

-behaviour(gen_server).

%%% TODO: convert into behavior!!!

%% API
-export([start_link/3, start_link/4, start/3, start/4, stop/1,
         position/1, pstate/1, update_pstate/3]).
-export([init/3, run/1, close/1]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
         code_change/3]).

-ifdef(TEST).
-define(D(Fmt,Args), io:format(Fmt, Args)).
-include_lib("eunit/include/eunit.hrl").
-else.
-define(D(Fmt,Args), ok).
-endif.

-define(MAX_READ_SIZE, 32*1024*1024).

-type consumer() ::
    fun((Msg :: any() | {'$end_of_file', string(), Res ::
                            ok | {error|throw|exit, Reason::any(), Stacktrace::list()}},
         Pos::integer(), State::any()) -> NewState::any()).
-type options()  :: [
    {pos,       StartPos::integer()}           |
    {end_pos,   ReadUntilPos::integer() | eof} |
    {max_size,  MaxReadSize::integer()  | eof} |
    {timeout,   MSec::integer()}               |
    {retry_sec, Sec::integer()}                |
    {parser, fun((Data::binary(), ParserState::any()) ->
                {ok, Msg::any(), Tail::binary(), NewParserState::any()} |
                {incomplete, NewParserState::any()} |
                {skip, Tail::binary(), NewParserState::any()}) |
             {Mod::atom(), Fun::atom()}}       |
    {pstate, fun((File::string(), consumer(), Options::list()) -> any()) | any()} |
    {pstate_update, fun((Option::atom(), Value::any(), PState::any()) ->
                        {ok, NewPState::any()} | {error, any()})}].
%% Details:
%% <dl>
%% <dt>pos</dt>
%%   <dd>Start reading from this position (default: 0)</dd>
%% <dt>end_pos</dt>
%%   <dd>Read until this position and stop. If provided and file position reaches
%%       `end_pos', the consumer() callback given to the reader will be called as:
%%       ``Consumer({'$end_of_file', Filename::string(), Result}, Pos::integer(), State)''
%%       where `Result' is `ok' or `{error|exit|exception, Error::any(), StackTrace}'
%%       if an error occured.
%%   </dd>
%% <dt>max_size</dt>
%%   <dd>Maximum chunk size to read from file in a single pass (default: 32M).</dd>
%% <dt>timeout</dt>
%%   <dd>Number of milliseconds between successive file scanning (default: 1000)</dd>
%% <dt>retry_sec</dt>
%%   <dd>Number of seconds between successive retries upon failure to open the
%%       market data file passed to one of the `start*/{3,4}' functions (default: 15).
%%       The value of 0 means that the file must exist or else the process won't
%%       start.</dd>
%% <dt>parser</dt>
%%   <dd>Is the function to be called when the next chunk is read from file. The
%%       function must return:
%%       <dl>
%%       <dt>`{ok, Msg, Tail, State}'</dt>
%%          <dd>invoke `Consumer' callback passing it the parsed message `Msg',
%%              and continue parsing the `Tail' binary</dd>
%%       <dt>`{incomplete, State}'</dt>
%%          <dd>the data contains no complete messages - wait until there's more</dd>
%%       <dt>`{skip, Tail, State}'</dt>
%%          <dd>disregard input and continue parsing `Tail' without calling
%%              `Consumer' callback</dd>
%%       </dl>
%%   </dd>
%% <dt>pstate</dt>
%%   <dd>Initial value of the parser state or a functor `fun((File::string()
%%       Consumer::consumer(), Options::options()) -> PState::any())'</dd>
%% <dt>pstate_update</dt>
%%   <dd>Update function of the parser state.  Called when the user invokes
%%       `update_pstate/3'</dd>
%% </dl>

-export_type([consumer/0, options/0]).

-record(state, {
      consumer      :: consumer()
    , tref          :: reference()
    , fd            :: port()
    , file          :: string()
    , pos = 0       :: integer() | eof
    , end_pos       :: integer() | eof | undefined  % Read until this pos (inclusive)
    , max_size      :: integer()
    , timeout       :: integer()
    , parser        :: {atom(), atom()} |
                       fun((binary(), any()) -> {any(), binary(), any()})
    , pstate        :: any()
    , pstate_update :: fun((any()) -> any())
    , part_size = 0 :: integer()         % Sz of incomple unparsed chunk at eof
    , done = false  :: false | ok |      % Processing completion indicator
                       {error|exception|exit, Reason::any(), Stacktrace::any()}
    , incompl_count :: integer()         % Counter of incomplete message reads
}).

%%%----------------------------------------------------------------------------
%%% External API
%%%----------------------------------------------------------------------------

%%-----------------------------------------------------------------------------
%% @doc To be called by the supervisor in order to start the server.
%%      If init/1 fails with Reason, the function returns `{error,Reason}'.
%%      If init/1 returns `{stop,Reason}' or ignore, the process is
%%      terminated and the function returns `{error,Reason}' or ignore,
%%      respectively.
%% @see start_link/3
%% @end
%%-----------------------------------------------------------------------------
-spec start_link(atom(), string(), consumer(), options()) ->
    {ok, pid()} | ignore | {error, any()}.
start_link(RegName, File, Consumer, Options)
    when is_atom(RegName), is_list(File), is_function(Consumer,3), is_list(Options) ->
    gen_server:start_link({local, RegName}, ?MODULE, [File, Consumer, Options], []).

%%-----------------------------------------------------------------------------
%% @doc Process `File' by calling `Consumer' callback on every delimited
%%      message. Message delimination is handled by the `{parser, Parser}'
%%      option. `Consumer' function gets called iteratively with the following
%%      arguments:
%%      <dl>
%%          <dt>(Msg, Pos::integer(), State)</dt>
%%          <dd>`Msg' is what the parser function returned. `Pos' is current
%%              file position following the `Msg'. `State' is current value
%%              of parser state that is initialized by `{pstate, PState}'
%%              option given to the `start_link/{3,4}' function</dd>
%%          <dt>({'$end_of_file', Filename::string(), Result}, Pos::integer(), PState)</dt>
%%          <dd>This call happens when end of file condition is reached (see
%%              definition of `consumer()' type)</dd>
%%      </dl>
%%      `Consumer' can end processing normally without reaching the end
%%      of file by throwing `{eof, PState}' exception.
%% @end
%%-----------------------------------------------------------------------------
-spec start_link(string(), consumer(), options()) ->
    {ok, pid()} | ignore | {error, any()}.
start_link(File, Consumer, Options)
    when is_list(File), is_function(Consumer,3), is_list(Options) ->
    gen_server:start_link(?MODULE, [File, Consumer, Options], []).

%%-----------------------------------------------------------------------------
%% @doc Start the server outside of supervision tree.
%% @end
%%-----------------------------------------------------------------------------
-spec start(atom(), string(), consumer(), options()) ->
    {ok, pid()} | {error, any()}.
start(RegName, File, Consumer, Options)
    when is_atom(RegName), is_list(File), is_function(Consumer,3), is_list(Options) ->
    gen_server:start({local, RegName}, ?MODULE, [File, Consumer, Options], []).

-spec start(string(), consumer(), options()) ->
    {ok, pid()} | {error, any()}.
start(File, Consumer, Options)
    when is_list(File), is_function(Consumer,3), is_list(Options) ->
    gen_server:start(?MODULE, [File, Consumer, Options], []).

%%-----------------------------------------------------------------------------
%% @doc Report last processed file position/size.
%% @end
%%-----------------------------------------------------------------------------
-spec position(pid() | atom()) -> {ok, Position::integer()}.
position(Pid) ->
    gen_server:call(Pid, position).

%%-----------------------------------------------------------------------------
%% @doc Return current parser state (`{pstate, any()}' initialization option).
%% @end
%%-----------------------------------------------------------------------------
-spec pstate(pid() | atom()) -> {ok, any()}.
pstate(Pid) ->
    gen_server:call(Pid, pstate).

%%-----------------------------------------------------------------------------
%% @doc Update parser state.
%% @end
%%-----------------------------------------------------------------------------
-spec update_pstate(pid(), Option::atom(), Value::any()) -> {ok, State::any()}.
update_pstate(Pid, Option, Value) when is_atom(Option) ->
    gen_server:call(Pid, {update_pstate, Option, Value}).

%%-----------------------------------------------------------------------------
%% @doc Stop the server.
%% @end
%%-----------------------------------------------------------------------------
-spec stop(pid() | atom()) -> ok.
stop(Pid) ->
    gen_server:call(Pid, stop).

%%-----------------------------------------------------------------------------
%% @doc When using file processor without gen_server, use this function to
%%      initialize the state, and then call run/1.
%% @end
%%-----------------------------------------------------------------------------
-spec init(string(), consumer(), options()) -> {ok, #state{}}.
init(File, Consumer, Options) when is_list(File), is_list(Options) ->
    try
        Offset  = proplists:get_value(pos, Options, 0),
        EndPos  = case proplists:get_value(end_pos,  Options) of
                  M when is_integer(M) -> M;
                  eof                  -> eof;
                  undefined            -> undefined;
                  Other2               -> throw({invalid_option, {end_pos, Other2}})
                  end,
        MaxSize = proplists:get_value(max_size,     Options, ?MAX_READ_SIZE),
        Parser  = proplists:get_value(parser,       Options),
        RetryS  = proplists:get_value(retry_sec,    Options, 15),
        Timeout = proplists:get_value(timeout,      Options, 1000),
        PStUpd  = proplists:get_value(pstate_update,Options),

        PStUpd =:= undefined orelse is_function(PStUpd,3)
                  orelse throw({badarg, pstate_update}),

        State   = #state{file=File,  consumer=Consumer, parser=Parser,
                         pos=Offset, end_pos=EndPos, max_size=MaxSize,
                         timeout=Timeout, incompl_count=0, pstate_update=PStUpd},
        {ok, S} = try_open_file(1, RetryS, File, State),
        PState  = case proplists:get_value(pstate, Options) of
                  F when is_function(F, 3) ->
                      F(File, Consumer, Options);
                  Other ->
                      Other
                  end,
        {ok, S#state{pstate=PState}}
    catch
        throw:ignore ->
            ignore;
        T:E:STrace ->
            {stop, {T, E, STrace}}
    end.

%%-----------------------------------------------------------------------------
%% @doc Process file from given position `Pos' to `EndPos' (or `eof').
%% @end
%%-----------------------------------------------------------------------------
-spec run(#state{}) -> #state{}.
run(#state{fd=FD, pos=Pos, end_pos=EndPos, max_size=MaxChunkSz, done=false} = S) ->
    MaxPos  = Pos+MaxChunkSz,
    NextPos = if is_integer(EndPos) -> min(MaxPos, EndPos); true -> MaxPos end,
    Size    = NextPos - Pos,
    if Size > 0 ->
        case file:pread(FD, Pos, Size) of
        {ok, Data} when byte_size(Data) > S#state.part_size ->
            try
                S1 = process_chunk(Data, S),
                run(S1)
            catch
                %% This is a special case - the consumer callback may throw
                %% an `eof' exception to force the normal end of processing:
                throw:{eof, PState} when EndPos =:= eof ->
                    end_of_file(ok, S#state{end_pos=Pos, pstate=PState});
                throw:{eof, PState} ->
                    end_of_file(ok, S#state{pstate=PState});
                %% Some other error occured - abort:
                E:W:STrace ->
                    end_of_file({E, W, STrace}, S)
            end;
        {ok, _Data} ->
            schedule_timer(S);
        eof when EndPos =:= eof ->
            % Reached the requested end of input - notify consumer:
            end_of_file(ok, S#state{end_pos=Pos});
        eof ->
            % No more data is currently available - wait until there's more.
            schedule_timer(S)
        end;
    true ->
        % Reached the requested end of input - notify consumer:
        end_of_file(ok, S#state{end_pos=Pos})
    end;
run(#state{} = S) ->
    S.
            
end_of_file(Result, #state{pos=Pos, consumer=Callback, file=F, pstate=PState} = S) ->
    PS1 = Callback({'$end_of_file', F, Result}, Pos, PState),
    S#state{pstate = PS1, part_size=0, done=Result}.

schedule_timer(#state{timeout=Timeout, fd=FD} = S) ->
    TRef = erlang:send_after(Timeout, self(), {check_md_files_timer, FD}),
    S#state{tref=TRef}.

%%-----------------------------------------------------------------------------
%% @doc Close file processor (use this method when not using gen_server)
%% @end
%%-----------------------------------------------------------------------------
close(#state{fd=FD} = State) ->
    file:close(FD),
    State#state{fd=undefined}.

%%%----------------------------------------------------------------------------
%%% Callback functions from gen_server
%%%----------------------------------------------------------------------------

%%-----------------------------------------------------------------------------
%% @private
%% @doc Initiates the server
%% @end
%%-----------------------------------------------------------------------------
-spec init(list()) ->
    {ok, #state{}} |
    {ok, #state{}, Timeout :: integer() | hibernate} | ignore | {stop, any()}.
init([File, Consumer, Options]) ->
    try
        init(File, Consumer, Options)
    catch _:What ->
        {stop, What}
    end.

%%-----------------------------------------------------------------------------
%% @private
%% @doc Handling call messages
%% @end
%%-----------------------------------------------------------------------------
-spec handle_call(any(), From::tuple(), #state{}) ->
    {reply, Reply::any(), #state{}} |
    {reply, Reply::any(), #state{}, Timeout::integer() | hibernate} |
    {noreply, #state{}} |
    {noreply, #state{}, Timeout::integer() | hibernate} |
    {stop, Reason::any(), Reply::any(), #state{}} |
    {stop, Reason::any(), #state{}}.
handle_call(position, _From, #state{pos = Pos} = State) ->
    {reply, {ok, Pos}, State};
handle_call(pstate, _From, #state{pstate = PState} = State) ->
    {reply, {ok, PState}, State};
handle_call({update_pstate, Option, Value}, _From, #state{} = State) ->
    #state{pstate_update=Fun, pstate=PState} = State,
    if is_function(Fun, 3) ->
        case Fun(Option, Value, PState) of
        {ok, NewPState} ->
            {reply, {ok, NewPState}, State#state{pstate=NewPState}};
        {error, Reason} ->
            {reply, {error, Reason}, State}
        end;
    true ->
        {reply, {error, pstate_update_not_implemented}, State}
    end;
handle_call(stop, _From, State) ->
    {stop, normal, ok, State};

handle_call(Request, _From, State) ->
    {stop, {unknown_call, Request}, State}.

%%-----------------------------------------------------------------------------
%% @private
%% @doc Handling cast messages
%% @end
%%-----------------------------------------------------------------------------
-spec handle_cast(any(), #state{}) ->
    {noreply, #state{}} | {noreply, #state{}, Timeout::integer() | hibernate} |
    {stop, Reason::any(), #state{}}.
handle_cast(Msg, State) ->
    {stop, {unknown_cast, Msg}, State}.

%%-----------------------------------------------------------------------------
%% @private
%% @doc Handling all non call/cast messages
%% @end
%%-----------------------------------------------------------------------------
-spec handle_info(any(), #state{}) ->
    {noreply, #state{}} | {noreply, #state{}, Timeout::integer() | hibernate} |
    {stop, Reason::any(), #state{}}.
handle_info({check_md_files_timer, _FD}, State) ->
    S = #state{done=Done} = run(State),
    case Done of
    false ->
        {noreply, S, hibernate};
    ok ->
        {stop, normal, S};
    {E, Reason, Stacktrace} when E=:=error; E=:=exception; E=:=exit ->
        erlang:raise(E, Reason, Stacktrace)
    end;
handle_info({open_file_timer, Attempt, RetrySec, File}, State) ->
    {ok, State1} = try_open_file(Attempt, RetrySec, File, State),
    {noreply, State1};
handle_info(_Info, State) ->
    {noreply, State}.

%%-----------------------------------------------------------------------------
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
%% @end
%%-----------------------------------------------------------------------------
-spec terminate(any(), #state{}) -> ok.
terminate(_Reason, #state{}) ->
    ok.

%%-----------------------------------------------------------------------------
%% @private
%% @doc Convert process state when code is changed
%% @end
%%-----------------------------------------------------------------------------
-spec code_change(any(), #state{}, any()) -> #state{}.
code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%----------------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------------

try_open_file(Attempt, RetrySec, File, #state{} = State) when is_list(File) ->
    case file:open(File, [read,raw,binary,read_ahead]) of
    {ok,FD} ->
        {ok,End} = file:position(FD, eof),
        Pos      = case State#state.pos of
                      N when is_integer(N) -> N;
                      eof                  -> End;
                      Other1               -> throw({invalid_option, {pos, Other1}})
                   end,
        case Attempt of
        1 -> ok;
        _ -> error_logger:info_msg
                ("~w: file ~s was opened successfully\n", [?MODULE, File])
        end,
        {ok, schedule_timer(State#state{fd=FD, pos=Pos})};
    {error, Reason} when Attempt =:= 1, RetrySec > 0 ->
        error_logger:warning_msg(
            "~w: failed to open file ~s (will retry in ~ws): ~s",
            [?MODULE, File, RetrySec, file:format_error(Reason)]),
        Msg = {open_file_timer, Attempt+1, RetrySec, File},
        erlang:send_after(RetrySec*1000, self(), Msg),
        {ok, State};
    {error, _Reason} when RetrySec > 0 ->
        Msg = {open_file_timer, Attempt+1, RetrySec, File},
        erlang:send_after(RetrySec*1000, self(), Msg),
        {ok, State};
    {error, Reason} ->
        throw({cannot_open_file, File, Reason})
    end.

process_chunk(<<>>, S) ->
    S;
process_chunk(Data, #state{part_size=N} = S) when N > 0, byte_size(Data) =< N ->
    %% Last message in the file was found to be incomplete on the
    %% previous processing iteration.  Current read attempt didn't read any more
    %% of new bytes, so do nothing and wait until there's more data available
    S#state{incompl_count=S#state.incompl_count+1};
process_chunk(Data, #state{pos=Pos, consumer=OnMsg, parser=P, pstate=PS} = S) ->
    case parse(P, Data, PS) of
    {ok, Msg, <<>>, PS1} ->
        %% The Data was fully parsed, invoke the callback and try to see if
        %% more data was appended to the file
        NPos = Pos + byte_size(Data),
        PS2  = OnMsg(Msg, NPos, PS1),
        run(S#state{pos=NPos, pstate=PS2, part_size=0});
    {ok, Msg, Tail, PS1} when is_binary(Tail) ->
        %% A chunk of Data up to the Tail was fully parsed, invoke the callback
        %% and continue parsing the Tail
        NPos = Pos + (byte_size(Data) - byte_size(Tail)),
        PS2  = OnMsg(Msg, NPos, PS1),
        process_chunk(Tail, S#state{pos=NPos, pstate=PS2, part_size=0});
    {incomplete, PS1} ->
        I = S#state.incompl_count,
        S#state{pstate=PS1, part_size=byte_size(Data), incompl_count=I+1};
    {skip, Tail, PS1} when is_binary(Tail) ->
        N = byte_size(Data) - byte_size(Tail),
        process_chunk(Tail, S#state{pos=Pos + N, pstate=PS1, part_size=0})
    end.

parse(Fun, Data, State) when is_function(Fun, 2) ->
    Fun(Data, State);
parse({M,F}, Data, State) ->
    M:F(Data, State);
parse(undefined, Data, State) ->
    {ok, Data, <<>>, State}.

%%%----------------------------------------------------------------------------
%%% Test functions
%%%----------------------------------------------------------------------------

-ifdef(EUNIT).

async_read_file_test() ->
    File  = "/tmp/test-file-reader.log",
    Self  = self(),
    N     = 10,

    %% Producer of data
    begin_produce(File, N, false),

    %% Consumer of data
    User = spawn_link(fun() -> ?assertEqual(ok, wait(N, 1)), Self ! done end),

    Prsr = fun(Bin, PState) ->
        case binary:split(Bin, <<"\n">>) of
        [<<"abc5">>, Tail] ->
            {skip, Tail, PState+1};
        [<<"abc1">>=_M,_Tail] when PState =:= 9 ->
            {incomplete, PState+1};
        [Msg, Tail] ->
            {ok, Msg, Tail, PState+1}
        end
    end,
    Opts      = [{timeout, 10}, {parser, Prsr}, {pstate, 0}],
    Consume   = fun(Msg, _Pos, State) -> User ! Msg, State end,
    {ok, Pid} = start_link(File, Consume, Opts),

    ?assertEqual(done, receive done -> done after 1000 -> timeout end),
    {ok, PState} = pstate(Pid),
    ?assertEqual(9,  PState),
    ?assertEqual(ok, stop(Pid)),
    file:delete(File).

premature_end_test() ->
    %% This test illustrates how to abort parsing gracefully in the middle
    %% of the input file processing.
    File  = "/tmp/test-file-reader.log",
    Self  = self(),
    N     = 10,

    %% Producer of data
    begin_produce(File, N, false),

    %% Consumer of data
    User = spawn_link(fun() -> ?assertEqual(ok, wait(N, 6)), Self ! done end),

    erase(eof_called),

    Prsr = fun(Bin, PState) ->
        [Msg, Tail] = binary:split(Bin, <<"\n">>),
        {ok, Msg, Tail, PState+1}
    end,
    Opts      = [{timeout, 10}, {parser, Prsr}, {pstate, 0}],
    Consume   = fun (<<"abc5">>, _Pos, State) ->
                        %%?debugFmt("Msg <<abc5>> state ~w", [State]),
                        ?assertEqual(6, State),
                        throw({eof, State});
                    ({'$end_of_file', _File, Reason}, _Pos, State) ->
                        ?assertEqual(6,    State),
                        ?assertEqual(File, _File),
                        ?assertEqual(ok,  Reason),
                        Self ! eof_called,
                        State;
                    (Msg, _Pos, State) ->
                        %%?debugFmt("Msg ~p state ~w", [Msg, State]),
                        User ! Msg,
                        State
                end,
    {ok, Pid} = start_link(File, Consume, Opts),
    monitor(process, Pid),

    ?assertEqual(ok,
         receive eof_called -> put(eof_called, true), ok after 1000 -> timeout end),
    ?assertEqual(done, receive done -> done after 1000 -> timeout end),
    ?assertEqual(ok,
         receive {'DOWN', _Ref, process, Pid, normal} -> ok after 1000 -> timeout end),
    ?assertEqual(true, get(eof_called)),
    erase(eof_called),
    file:delete(File).

begin_produce(File, N, Wait) ->
    file:delete(File),
    Self = self(),

    %% Produce data
    spawn_link(fun() ->
        Res = file:open(File, [write,raw,binary]),
        Self ! continue,
        ?assertMatch({ok,_}, Res),
        {ok, F} = Res,

        (fun
            Loop(0,_FF) -> ok;
            Loop(I, FF) ->
                Bin = <<"abc", (integer_to_binary(I))/binary, $\n>>,
                ok  = file:write(FF, Bin),
                timer:sleep(50),
                Loop(I-1, FF)
         end)(N, F),

        if Wait -> Self ! end_producer; true -> ok end
    end),

    receive continue -> ok end,
    if
        Wait -> receive end_producer -> ok end;
        true -> ok
    end.

wait(I, I) ->
    % Got all messages
    ok;
wait(5, I) ->
    % <<"abc5">> is skipped by the parser
    wait(4, I);
wait(I, End) ->
    Expect = <<"abc", (integer_to_binary(I))/binary>>,
    receive
        Expect ->
            wait(I-1, End);
        Other ->
            ?debugFmt("Unexpected message: ~p", [Other]),
            %{unexpected_msg, Other}
            wait(I-1, End)
    %after 1000 ->
    %    timeout
    end.

consume({'$end_of_file', _F, ok}, Pos, PState) when is_integer(Pos), is_integer(PState) ->
    put(count_end, get(count_end)+1),
    put(end_pos, Pos),
    PState;
consume(Msg, _Pos, PState) when is_binary(Msg), is_integer(PState) ->
    Bin = <<"abc", (integer_to_binary(PState))/binary>>,
    ?assertEqual(Msg, Bin),
    PState-1.

sync_read_file_test() ->
    File  = "/tmp/test-file-reader.log",
    N     = 10,
    file:delete(File),

    %% Produce data
    begin_produce(File, N, true),

    %% Consume data
    Prsr = fun(Bin, PState) ->
        [Msg, Tail] = binary:split(Bin, <<"\n">>),
        {ok, Msg, Tail, PState}
    end,

    put(count_end, 0),

    %% (1) Initialize file reader
    (fun() ->
        {ok, State} = init(File, fun consume/3, [{parser, Prsr}, {pstate, N}]),
        %% Execute file reader synchronously
        State1 = run(State),
        ?assertEqual(0, State1#state.pstate),
        %% Close file reader
        close(State1)
    end)(),

    ?assertEqual(0,  get(count_end)),

    %% (2) Now test reading from the end of file
    (fun() ->
        {ok, State} = init(File, fun consume/3, [{pos, eof}, {parser, Prsr}, {pstate, N}]),
        %% Execute file reader synchronously
        State1 = run(State),
        ?assertEqual(10, State1#state.pstate),
        %% Close file reader
        close(State1)
    end)(),

    ?assertEqual(0,  get(count_end)),

    %% (3) Now test reading from the 6th position
    (fun() ->
        {ok, State} = init(File, fun consume/3, [{pos, 6}, {parser, Prsr}, {pstate, N-1}]),
        %% Execute file reader synchronously
        State1 = run(State),
        ?assertEqual(0, State1#state.pstate),
        %% Close file reader
        close(State1)
    end)(),

    ?assertEqual(0,  get(count_end)),

    %% (4) Now test reading only 11 bytes: <<"abc10\nabc9\n">>
    (fun() ->
        {ok, State} = init(File, fun consume/3, [{end_pos, 11}, {parser, Prsr}, {pstate, N}]),
        %% Execute file reader synchronously
        State1 = run(State),
        ?assertEqual(8, State1#state.pstate),
        %% Close file reader
        close(State1)
    end)(),

    ?assertEqual(1,  get(count_end)),
    ?assertEqual(11, get(end_pos)),

    %% (5) Now test reading from the end of file till the end of file
    (fun() ->
        {ok, State} = init(File, fun consume/3, [{pos, eof}, {end_pos, eof}, {parser, Prsr}, {pstate, N}]),
        %% Execute file reader synchronously
        State1 = run(State),
        ?assertEqual(51, State1#state.pos),
        ?assertEqual(51, State1#state.end_pos),
        ?assertEqual(N,  State1#state.pstate),
        %% Close file reader
        close(State1)
    end)(),

    ?assertEqual(2,  get(count_end)),
    ?assertEqual(51, get(end_pos)),

    erase(count_end),
    erase(end_pos),
    file:delete(File).
       
-endif.

%%%.
%%% vim: set filetype=erlang tabstop=4 foldmarker=%%%',%%%. foldmethod=marker: