src/csv.erl

%%% vim:ts=2:sw=2:et
%%%-----------------------------------------------------------------------------
%%% @doc CSV file parsing functions
%%%
%%% @author Serge Aleynikov <saleyn@gmail.com>
%%% @copyright 2021 Serge Aleynikov
%%% @end
%%%-----------------------------------------------------------------------------
%%% Created 2021-06-01
%%%-----------------------------------------------------------------------------
-module(csv).
-author('saleyn@gmail.com').

-export([parse/1, parse/2, parse_line/1]).
-export([max_field_lengths/2, guess_data_types/2, guess_data_type/1, load_to_mysql/4]).

-type load_options() ::
  [{load_type,        recreate|replace|ignore_dups|update_dups}|
   {col_types,        #{binary() => ColType::atom() | {ColType::atom(), ColLen::integer()}}}|
   {batch_size,       integer()}|
   {blob_size,        integer()}|
   {create_table,     boolean()}|
   {save_create_sql_to_file, string()}|
   {transforms,       #{binary() => fun((term()) -> term())}}|
   {guess_types,      boolean()}|
   {guess_limit_rows, integer()}|
   {max_nulls_pcnt,   float()}|
   {primary_key,      PKColumns::binary()|[binary()|list()]}|
   {drop_temp_table,  boolean()}|
   {encoding,    string()|atom()}|
   {verbose,     boolean()|integer()}].
%% Options for loading data to a database.
%% <dl>
%% <dt>{load_type, Type}</dt>
%%   <dd>Type of loading to perform. `recreate' will replace the table by atomically
%%       dropping the old one, creating/loading the new one, and replacing the table.
%%       `replace' will do an insert by using `REPLACE INTO' statement. `ignore_dups'
%%       will use `INSERT IGNORE INTO' statement to ignore records with duplicate keys.
%%       `update_dups' will do an `INSERT INTO' and `ON DUPLICATE KEY UPDATE', so that
%%       the old records are updated and the new ones are inserted.</dd>
%% <dt>{create_table, Allow}</dt>
%%   <dd>Allow to create a table if it doesn't exist</dd>
%% <dt>{col_types, Map}</dt>
%%   <dd>Types of data for all or some columens. The Map is in the format:
%%       `ColName::binary() => ColInfo', where
%%       ColInfo is `ColType | {ColType, ColLen::integer()}', and `ColType' is:
%%       `date | datetime | integer | float | blob | number'.
%%       </dd>
%% <dt>{transforms, Map}</dt>
%%   <dd>Value transformation function for columns. The Map is in the format:
%%       `ColName::binary() => fun((Value::term()) -> term())'.</dd>
%% <dt>{batch_size, Size}</dt>
%%   <dd>Number of records per SQL insert/update/replace call</dd>
%% <dt>{blob_size, Size}</dt>
%%   <dd>Threshold in number of bytes at which a VARCHAR field is defined as BLOB</dd>
%% <dt>{save_create_sql_to_file, Filename::string()}</dt>
%%   <dd>Save CREATE TABLE sql statement to a file</dd>
%% <dt>guess_types</dt>
%%   <dd>When specified, the function will try to guess the type of data in columns
%%       instead of treating all data as string fields. The possible data typed guessed:
%%       integer, float, date, datetime, number, string</dd>
%% <dt>{guess_limit_rows, Limit}</dt>
%%   <dd>Limit the number of rows for guessing the column data types</dd>
%% <dt>{max_nulls_pcnt, Percent}</dt>
%%   <dd>A percentage threshold of permissible NULLs in a column (0-100), above which
%%       the column data type is forced to be treated as `string'</dd>
%% <dt>{primary_key, Fields}</dt>
%%   <dd>Names of primary key fields in the created table</dd>
%% <dt>{drop_temp_table, boolean()}</dt>
%%   <dd>When true (default), temp table is dropped.</dd>
%% <dt>{encoding, Encoding}</dt>
%%   <dd>The name of the encoding to use for storing data. For the list of permissible
%%       values [see this
%%       link](https://dev.mysql.com/doc/refman/8.0/en/charset-unicode-sets.html)</dd>
%% <dt>verbose</dt>
%%   <dd>Print additional details to stdout</dd>
%% </dl>
-export_type([load_options/0]).

-type parse_options() ::
  [fix_lengths            |
   binary                 |
   list                   |
   {open,       list()}   |
   {columns,    [binary() |string()]}|
   {converters, [{binary()|string()|all, fun((binary(), binary()) -> binary())|
                                         {rex, binary(), binary()}}]}
  ].
%% CSV Parsing Options.
%% <dl>
%%   <dt>fix_lengths</dt><dd>if a record has a column count
%%       greater than what's found in the header row, those extra columns will be
%%       dropped, and if a row has fewer columns, empty columns will be added.</dd>
%%   <dt>binary</dt><dd>Return fields as binaries (default)</dd>
%%   <dt>list</dt><dd>Return fields as lists</dd>
%%   <dt>{open, list()}</dt><dd>Options given to file:open/2</dd>
%%   <dt>{columns, Names}</dt><dd>Return data only in given columns</dd>
%%   <dt>{converters, [{Col, fun((ColName, Value) -> NewValue)|
%%                           {rex, RegEx, Replace}]}</dt>
%%     <dd>Data format converter.
%%       If `Col' is `all', the same converting function is used for all
%%       columns. If the converter is a `{rex, RegEx, Replace}' then
%%       the regular expression replacement will be run on a value in the
%%       requested column.</dd>
%% </dl>
-export_type([parse_options/0]).

%%------------------------------------------------------------------------------
%% CSV parsing
%%------------------------------------------------------------------------------
%% @doc Parse a CSV file using default options.
-spec parse(string()) -> [[binary()]].
parse(File) when is_list(File); is_binary(File) ->
  parse(File, []).

%%------------------------------------------------------------------------------
%% @doc Parse a given CSV file.
%% @end
%%------------------------------------------------------------------------------
-spec parse(binary()|string(), parse_options()) -> [[string()]].
parse(File, Opts) when is_binary(File) ->
  parse(binary_to_list(File), Opts);
parse(File, Opts) when is_list(File), is_list(Opts) ->
  FileOpts = proplists:get_value(open,   Opts, []),
  Mode     = case proplists:get_value(binary, Opts,  true) andalso
              not proplists:get_value(list,   Opts, false)
             of
               true  -> binary;
               false -> list
             end,
  Columns  = case proplists:get_value(columns, Opts, all) of
               all   -> all;
               L0    -> [to_binary(I) || I <- L0]
             end,
  {ok, F}  = file:open(File, [read, raw, binary]++FileOpts),
  FstLine  = case file:read_line(F) of
               {ok, <<I/utf8, Line/binary>>} when I == 65279 -> %% Utf-8  <<239,187,191,...>>
                 {ok, Line};
               Other ->
                 Other
             end,
  FixLen   = proplists:get_value(fix_lengths, Opts, false),
  CSV      = case parse_csv_file(F, 1, FstLine, []) of
               [] ->
                 [];
               LL when FixLen ->
                 HLen = length(hd(LL)),
                 [fix_length(HLen, length(R), R) || R <- LL];
               LL ->
                 LL
             end,
  % Get converters, which are in the form {ColName|all, fun((V) -> NewV)}.
  Converts = 
    case {CSV, proplists:get_value(converters, Opts, none)} of
      {[], _} ->
        [];
      {[Headers|_], none} ->
        list_to_tuple(lists:duplicate(length(Headers), undefined));
      {[Headers|_], L1} when is_list(L1) ->
        L2 = [{if H==all -> all; true -> to_binary(H) end,
               if is_function(Fun,2) ->
                    Fun;
                  element(1,Fun)==rex ->
                    Fun;
                  true ->
                    throw("Invalid converter type for column: " ++ to_string(H))
               end} || {H,Fun} <- L1],
        list_to_tuple([case proplists:get_value(I, L2, proplists:get_value(all, L2)) of
                         Fun when is_function(Fun,2); element(1,Fun)==rex ->
                           Fun;
                         undefined ->
                           undefined
                       end || I <- Headers])
    end,

  case CSV of
    [] ->
      [];
    [HD|Rows] ->
      BitMask = list_to_tuple([Columns==all orelse lists:member(I, Columns) || I <- HD]),
      %% Get resulting column headers
      H = foldr(fun(I, Col, S) ->
                  case element(I, BitMask) of
                    true  -> [if Mode==list -> binary_to_list(Col); true -> Col end | S];
                    false -> S
                  end
                end, [], HD),
      %% Get resulting data rows
      {D,_} = lists:mapfoldl(
            fun(Row, RowNum) ->
              NewRow = foldr(fun(I, V, A) ->
                               case element(I,BitMask) of
                                 true  -> [convert(Converts, RowNum, I, V, Mode)|A];
                                 false -> A
                               end
                             end, [], Row),
              {NewRow, RowNum+1}
            end,
            1,
            Rows),
      [H | D]
  end.

convert(CvtMask, RowNum, I, V, Mode) ->
  case element(I, CvtMask) of
    undefined ->
      if Mode == list -> binary_to_list(V);
         true         -> V
      end;
    Fun when is_function(Fun, 2) ->
      if Mode == list ->
        binary_to_list(Fun(V, RowNum));
      true ->
        Fun(V, RowNum)
      end;
    {rex, RegEx, Replace} ->
      re:replace(V, RegEx, Replace, [{return,Mode}]);
    {rex, RegEx, Replace, Options} ->
      re:replace(V, RegEx, Replace, [{return,Mode}|Options])
  end.

parse_csv_file(F, _, eof, Done) ->
  file:close(F),
  lists:reverse(Done);

parse_csv_file(F, LineNo, {ok, Line}, Done) ->
  parse_csv_file(F, LineNo+1, file:read_line(F), [parse_line(LineNo, Line)|Done]);

parse_csv_file(_F, LineNo, {error, Reason}, _) ->
  throw({error, [{line, LineNo}, {reason, file:format_error(Reason)}]}).

trim_eol(Line) ->
  trim_eol(0, Line).

trim_eol(N, Line) when N < byte_size(Line) ->
  Z = byte_size(Line),
  M = N+1,
  case binary:at(Line, Z-M) of
    C when C == $\r; C == $\n; C == $  ->
      trim_eol(M, Line);
    _ ->
      Z-N
  end;
trim_eol(N, Line) ->
  byte_size(Line) - N.

%% @doc Parse a CSV line
-spec parse_line(binary()) -> list().
parse_line(Line) ->
  parse_line(1, Line).

parse_line(LineNo, Line) ->
  try
    parse_csv_field(0, trim_eol(Line), Line, 0,0, [], false)
  catch E:R:S ->
    erlang:raise(E, {line_parse_error, LineNo, Line, R}, S)
  end.

field(Line, Pos, Len, true = _HasEscDblQuote) ->
  binary:replace(binary:part(Line, Pos, Len), <<"\"\"">>, <<"\"">>, [global]);
field(Line, Pos, Len, _) ->
  binary:part(Line, Pos, Len).

trim(_Line, I, I, _Inc) ->
  I;
trim(Line, I, End, Inc) ->
  case binary:at(Line, I) of
    $  ->
      trim(Line, I+Inc, End, Inc);
    _ ->
      I
  end.

trim(Line, Pos, End) when Pos < End ->
  Front = trim(Line, Pos, End,    1),
  Back  = trim(Line, End, Front, -1),
  {Front, Back+1};
trim(_Line, Pos, End) ->
  {Pos, End+1}.

parse_csv_field(From, To, Line, Pos,Len, Fields, HasEscDblQuote) when From >= To ->
  {Start, End} = trim(Line, Pos, Pos+Len-1),
  lists:reverse([field(Line, Start, End-Start, HasEscDblQuote) | Fields]);
parse_csv_field(From, To, Line, Pos,Len, Fields, HasEscDblQuote) ->
  case Line of
    <<_:From/binary, "\"", _/binary>> ->
      parse_csv_field_q(From+1, To, Line, From+1, 0, Fields, HasEscDblQuote);
    <<_:From/binary, ",", _/binary>> ->
      {Start, End} = trim(Line, Pos, Pos+Len-1),
      parse_csv_field(From+1,To,Line, From+1, 0, [field(Line,Start,End-Start,HasEscDblQuote)|Fields], false);
    _ ->
      parse_csv_field(From+1,To,Line, Pos,Len+1, Fields, HasEscDblQuote)
  end.

parse_csv_field_q(From, To, Line, Pos,Len, Fields, HasEscDblQuote) when From < To ->
  case Line of
    <<_:From/binary, "\"\"", _/binary>> ->
      parse_csv_field_q(From+2, To, Line, Pos, Len+2, Fields, true);
    <<_:From/binary, "\\",   _/binary>> ->
      parse_csv_field_q(From+1, To, Line, Pos, Len+1, Fields, HasEscDblQuote);
    <<_:From/binary, "\"",   _/binary>> ->
      parse_csv_field  (From+1, To, Line, Pos, Len,   Fields, HasEscDblQuote);
    _ ->
      parse_csv_field_q(From+1, To, Line, Pos, Len+1, Fields, HasEscDblQuote)
  end;
parse_csv_field_q(From, To, Line, Pos,Len, Fields, HasEscDblQuote) ->
  parse_csv_field(From, To, Line, Pos,Len, Fields, HasEscDblQuote).

fix_length(HLen, HLen, Data) -> Data;
fix_length(HLen, DLen, Data) when HLen < DLen ->
  {RR, _} = lists:split(HLen, Data),
  RR;
fix_length(HLen, DLen, Data) when HLen > DLen ->
  Value = if is_binary(hd(Data)) -> <<"">>; true -> "" end,
  RR    = lists:duplicate(HLen-DLen, Value),
  Data ++ RR.

%%------------------------------------------------------------------------------
%% @doc Get max field lengths for a list obtained by parsing a CSV file with
%%      `parse_csv_file(File,[fix_lengths])'.
%% @end
%%------------------------------------------------------------------------------
-spec max_field_lengths(HasHeaderRow::boolean(), Rows::[Fields::list()]) -> [Len::integer()].
max_field_lengths(true = _HasHeaders, [_Headers | Rows ] = _CSV) ->
  max_field_lengths(false, Rows);
max_field_lengths(false = _HasHeaders, CsvRows) ->
  % Calculate length of a field in a row:
  %   CSV -> [[R1FieldLen0, ..., R1FieldLenN], ...,
  %           [RnFieldLen0, RnFieldLenN]]
  F     = fun(Row) -> [if is_binary(I) -> byte_size(I); true -> length(I) end || I <- Row] end,
  CsvL  = [F(Row) || Row <- CsvRows],
  MLens =
    fun
      G([[] | _], Acc) ->
        lists:reverse(Acc);
      G(RRows, Acc) ->
        {MaxLen, F1Nrows} = lists:foldl(fun([H|T], {A,L}) -> {max(H,A),[T|L]} end, {0,[]}, RRows),
        G(F1Nrows, [MaxLen | Acc])
    end,
  MLens(CsvL, []).

%%------------------------------------------------------------------------------
%% @doc Guess data types of fields in the given CSV list of rows obtained by
%%      parsing a CSV file with `parse(File,[fix_lengths])'.
%%      The function returns a list of tuples `{Type, MaxFieldLen, NumOfNulls}',
%%      where the `Type' is a field type, `MaxFieldLen' is the max length of
%%      data in this column, and `NumOfNulls' is the number of rows with empty
%%      values in this column.
%% @end
%%------------------------------------------------------------------------------
-spec guess_data_types(HasHeaderRow::boolean(), Rows::[Fields::[binary()]]) ->
        {Type::string|integer|number|float|date|datetime,
         MaxFieldLen::integer(),
         NumOfNulls::integer()}.
guess_data_types(HasHeaders, CSV) when is_boolean(HasHeaders), is_list(CSV) ->
  guess_data_types(HasHeaders, CSV, 20.0, 1_000_000).

-spec guess_data_types(HasHeaderRow::boolean(), Rows::[Fields::list()], number(), integer()) ->
        {[{FieldType::string|date|datetime|integer|float|number,
           MaxFieldLength::integer(),
           NullsCount::integer()}],
         [Row::[term()]]}.
guess_data_types(true = _HasHeaders, [Headers | Rows ] = _CSV, NullsMaxPcnt, SniffRows) ->
  {TpLenNulls, DataRows} = guess_data_types(false, Rows, NullsMaxPcnt, SniffRows),
  {TpLenNulls, [Headers | DataRows]};
guess_data_types(false = _HasHeaders, CsvRows, NullsMaxPcnt, SniffRows)
    when is_number(NullsMaxPcnt), NullsMaxPcnt >= 0.0, NullsMaxPcnt =< 100.0
       , is_integer(SniffRows) ->
  %   CSV -> [[R1Field0, ..., R1FieldN], ..., [RnField0, RnFieldN]]
  NRows = length(CsvRows),
  F     = fun(UseString, Row) ->
            [guess_data_type2(I,UseString) || I <- Row]
          end,
  CsvL  = [F(I > SniffRows, Row) || {I, Row} <- lists:zip(lists:seq(1,NRows), CsvRows)],
  Res   = scan_column(CsvL, NullsMaxPcnt, NRows),
  Data  = [ [V || {_,V,_} <- R] || R <- CsvL],
  {Res, Data}.

sort_fun(A,A)    -> true;
sort_fun(null,_) -> true;
sort_fun(_,null) -> false;
sort_fun(A,B)    -> A =< B.

scan_column([], _NullsMaxPcnt, _NRows) ->
  [];
scan_column(L, NullsMaxPcnt, NRows) when is_number(NullsMaxPcnt) ->
  scan_column(L, NRows, NullsMaxPcnt, []).

scan_column([[]|_], _NRows, _NullsMaxPcnt, Acc) ->
  lists:reverse(Acc);
scan_column(L, NRows, NullsMaxPcnt, Acc) ->
  {_Type, _MLen, _Nulls} = Res = scan_mlt(L, NRows, NullsMaxPcnt),
  Acc1 = [Res | Acc],
  case L of
    [[_|_]|_] ->
      scan_column([tl(I) || I <- L], NRows, NullsMaxPcnt, Acc1);
    _ ->
      scan_column([[]], NRows, NullsMaxPcnt, Acc1)
  end.

scan_mlt(L, NRows, NullsMaxPcnt) when is_number(NullsMaxPcnt) ->
  {CTypes, MLen, Nulls, _} = scan_mlt2(L, {#{}, 0, 0, false}),
  PcntNulls = if NRows == 0 -> 100; true -> Nulls / NRows * 100 end,
  Fun = fun
          ([T]) when T /= null -> T;
          ([date,datetime])    -> datetime; 
          ([float,integer])    -> number;
          (_)                  -> string
        end,
  Type =
    case lists:sort(fun sort_fun/2, maps:keys(CTypes)) of
      [null|T] when PcntNulls =< NullsMaxPcnt ->
        Fun(T);
      T ->
        Fun(T)
    end,
  {Type, MLen, Nulls}.

scan_mlt2([[{I,_,V} | _] | T], {ATypes, MLen, Nulls, HasStrings}) ->
  case I of
    null ->
      scan_mlt2(T, {ATypes#{null => 1}, MLen, Nulls+1, HasStrings});
    string when HasStrings ->
      scan_mlt2(T, {ATypes, max(byte_size(V),MLen), Nulls, true});
    string ->
      scan_mlt2(T, {ATypes#{string => 1}, max(byte_size(V),MLen), Nulls, true});
    Type ->
      scan_mlt2(T, {ATypes#{Type => 1}, max(byte_size(V),MLen), Nulls, HasStrings})
  end;
scan_mlt2([[]|_], Acc) ->
  Acc;
scan_mlt2([], Acc) ->
  Acc.

%%------------------------------------------------------------------------------
%% @doc Load CSV data from a `File' to a `MySQL' database.
%% `Tab' is the name of a table where to load data.  `MySqlPid' is the pid of a
%% MySQL database connection returned by `mysql:start_link/1'.
%% The data in the table is replaced according to `{import_type, Type}':
%% <ul>
%% <li>`recreate' - The table is entirely replaced with the data from file.
%% The data from the file is loaded atomically - i.e. either
%% the whole file loading succeeds or fails.  This is accomplished by first
%% loading data to a temporary table, and then using the database's ACID
%% properties to replace the target table with the temporary table.</li>
%% <li>`replace'     - Use "REPLACE INTO" instead of "INSERT INTO" existing
%% table</li>
%% <li>`ignore_dups' - The insert in the existing table is performed and the
%% records with duplicate keys are ignored</li>
%% <li>`update_dups' - The insert in the existing table is performed and the
%% records with duplicate keys are updated</li>
%% <li>`upsert'      - The insert/update in the existing table is performed
%% without creating a temporary table</li>
%% </ul>
%%
%% NOTE: this function requires [https://github.com/mysql-otp/mysql-otp.git]
%% @end
%%------------------------------------------------------------------------------
-spec load_to_mysql(File::string(), Tab::string(),
                    MySqlPid::pid(), Opts::load_options()) ->
        {Columns::list(), AffectedCount::integer(), RecCount::integer()}.
load_to_mysql(File, Tab, MySqlPid, Opts)
    when is_list(File), is_list(Tab), is_pid(MySqlPid), is_list(Opts) ->
  try
    BatSz  = proplists:get_value(batch_size,  Opts, 100), % Insert this many records per call
    BlobSz = proplists:get_value(blob_size,   Opts, 1000),% Length at which VARCHAR becomes BLOB
    Create = proplists:get_value(create_table,Opts, true),% Allow to create table
    Types  = proplists:get_value(col_types,   Opts, #{}), % Column types
    Trans0 = proplists:get_value(transforms,  Opts, #{}), % Column transforms
    Enc    = encoding(proplists:get_value(encoding, Opts, undefined)),
    Verbose= case proplists:get_value(verbose,     Opts, 0) of
               true  -> 1;
               false -> 0;
               _I when is_integer(_I) -> _I
             end,
    SaveSQL= proplists:get_value(save_create_sql_to_file, Opts, false),
    LoadTp = proplists:get_value(load_type,   Opts, recreate),
             lists:member(LoadTp,[recreate, replace, ignore_dups, update_dups, upsert])
               orelse throw({badarg, {load_type, LoadTp}}),
    Drop   = proplists:get_value(drop_temp_table, Opts, true),
    PKey   = case proplists:get_value(primary_key,  Opts, undefined) of
               K when is_binary(K) ->
                 [K];
               [H|_] = K when is_binary(H); is_list(H) ->
                 [to_binary(I) || I <- K];
               undefined ->
                 [];
               [] ->
                 [];
               Other ->
                 throw({badarg, {primary_key, Other}})
             end,
    CSV0   = parse(File, [fix_lengths, binary]),
    ColCnt = length(hd(CSV0)),
    CSV1   = [[list_to_binary(cleanup_header(to_string(S))) || S <- hd(CSV0)] | tl(CSV0)],
    Heads  = hd(CSV1),
    %% If no records to insert, then just short-circuit the processing
    tl(CSV1) == [] andalso
      throw({ignore, {Heads, 0, 0}}),
    PKey  /= [] andalso
      lists:foreach(fun(K) ->
                      lists:member(K, Heads) orelse throw({primary_key_not_found, K, Heads})
                    end, PKey),
    Merge  = fun M([],     [])                        -> [];
                 M([H|T1], [{T,I,J}|T2])              -> [{H,T,I,J}|M(T1,T2)];
                 M([H|T1], [I|T2]) when is_integer(I) -> [{H,string,I,0}|M(T1,T2)]
             end,
    {ColNmTpMxLens, CSV} =
      case proplists:get_value(guess_types, Opts, false) of
        true ->
          SniffRows    = proplists:get_value(guess_limit_rows, Opts, 1_000_000),
          MaxNullsPcnt = proplists:get_value(max_nulls_pcnt,   Opts, 40.0),
          {TLN, CSV2}  = guess_data_types(true, CSV1, MaxNullsPcnt, SniffRows),
          {Merge(Heads, TLN), CSV2};
        false ->
          {Merge(Heads, max_field_lengths(true, CSV1)), CSV1}
      end,
    Verbose > 0 andalso
      io:format(standard_error,
                "LoadType: ~p\n"
                "Columns:\n  ~p\n", [LoadTp, ColNmTpMxLens]),
    %% Check if the table exists
    Exists = case mysql:query(MySqlPid, "SELECT count(*) as a\n"
                                        "  FROM information_schema.tables\n"
                                        " WHERE table_schema=database() AND\n"
                                        "       table_name='" ++ Tab ++ "';\n")
             of
               {ok,_,[[N]]} ->
                 N > 0;
               {error, {Code01, _, Msg01}} ->
                 throw({error_checking_if_table_exists, Code01, binary_to_list(Msg01), Tab})
             end,
    TmpTab = case LoadTp of
               upsert -> Tab;
               _      -> Tab ++ "_tmp"
             end,
    CrTab  = lists:flatten([
               if LoadTp == upsert -> [];
                  true             -> ["DROP TABLE IF EXISTS `", TmpTab, "`;\n"]
               end,
               case Exists of
                 true when LoadTp==replace
                         ; LoadTp==update_dups 
                         ; LoadTp==ignore_dups
                         ;(LoadTp==recreate andalso not Create) ->
                   ["CREATE TABLE `", TmpTab, "` AS SELECT * FROM `", Tab, "` where false;\n"];
                 true when LoadTp /= upsert
                         , LoadTp /= recreate ->
                   throw({invalid_load_type, LoadTp});
                 false when not Create
                          , LoadTp /= recreate ->
                   throw("Table "++Tab++" doesn't exist and creation not allowed!");
                 _ ->
                   ["CREATE TABLE `", TmpTab, "` (",
                     string:join(
                       [
                        begin
                          {CTp, I} =
                            case maps:get(S, Types, T) of
                              CA when is_atom(CA) ->
                                {CA, CLen};
                              {CA, CLen1} ->
                                {CA, CLen1}
                            end,
                          case CTp of
                            blob              -> io_lib:format("`~s` BLOB",       [to_string(S)]);
                            _ when I > BlobSz -> io_lib:format("`~s` BLOB",       [to_string(S)]);
                            date              -> io_lib:format("`~s` DATE",       [to_string(S)]);
                            datetime          -> io_lib:format("`~s` DATETIME",   [to_string(S)]);
                            integer           -> io_lib:format("`~s` BIGINT",     [to_string(S)]);
                            float             -> io_lib:format("`~s` DOUBLE",     [to_string(S)]);
                            number            -> io_lib:format("`~s` DOUBLE",     [to_string(S)]);
                            _                 -> io_lib:format("`~s` VARCHAR(~w)",[to_string(S),I])
                          end
                        end || {S,T,CLen,_} <- ColNmTpMxLens], ","),
                   ");\n"]
               end,
               %% Add primary key
               case PKey of
                 [] -> "";
                 _  -> SS = string:join(["`"++binary_to_list(S)++"`" || S <- PKey], ","),
                       io_lib:format("ALTER TABLE `~s` ADD PRIMARY KEY (~s);\n", [TmpTab, SS])
               end]),

    case SaveSQL of
      false ->
        ok;
      true ->
        SQLF  = filename:join(filename:dirname(File), filename:basename(File, ".csv") ++ ".sql"),
        ok    = file:write_file(SQLF, CrTab);
      SQLF when is_list(SQLF) ->
        ok    = file:write_file(SQLF, CrTab)
    end,

    if CrTab == [] ->
      ok;
    true ->
      Verbose > 0 andalso io:format(standard_error, "SQL:\n====\n~s\n", [CrTab]),

      case mysql:query(MySqlPid, CrTab) of
        ok -> ok;
        {error, {Code, _, Msg}} ->
          throw({error_creating_table, Code, binary_to_list(Msg), CrTab})
      end
    end,

    %% Remove NOT NULL constraints from all columnts
    if Exists, LoadTp == update_dups ->
      {ok, _, NullCols} =
        mysql:query(MySqlPid,
                    "select column_name,column_type,collation_name,column_default,extra "
                      "from information_schema.columns "
                     "where table_schema=database() AND table_name='"++Tab++"' "
                       "and IS_NULLABLE='NO' and COLUMN_KEY!='PRI';"),
      NotNullSQL = lists:flatten(
                     [["ALTER TABLE `",TmpTab,"` MODIFY `",cc(C),"` ",cc(Tp)," NULL",
                       cc("COLLATE",CN),cc("DEFAULT",CD),cc(EXT),";\n"]
                      || [C,Tp,CN,CD,EXT] <- NullCols]),
      Verbose > 0 andalso io:format(standard_error, "Replacing NULL columns:\n  ~p\n", [NotNullSQL]),
      ok = mysql:query(MySqlPid, NotNullSQL);
    true ->
      ok
    end,

    [HD|Rows] = CSV,
    Trans     = lists:foldl(fun(H,{I,A}) ->
                              {I+1, setelement(I, A, maps:get(H, Trans0, undefined))}
                            end, {1, list_to_tuple(lists:duplicate(length(HD), undefined))}, HD),
    Transform =
      fun
        G([V|Rest], I) ->
          case element(I, Trans) of
            F when is_function(1, F) -> [check_null(F(V)) | G(Rest, I+1)];
            undefined                -> [check_null(V)    | G(Rest, I+1)]
          end;
        G([], _) ->
          []
      end,

    [_|QQQ0s] = string:copies(",?", ColCnt),
    HHHs      = string:join(["`"++binary_to_list(S)++"`" || S <- HD], ","),
    QQQs      = lists:append([",(", QQQ0s, ")"]),
    ModRows   = case Trans0 of
                  #{} -> [[check_null(B) || B <- Row] || Row <- Rows];
                  _   -> [Transform(Row, 1)           || Row <- Rows]
                end,
    BatchRows = stringx:batch_split(BatSz, ModRows),

    FstBatLen = length(hd(BatchRows)),
    if Enc   /= [] ->
      Verbose > 0 andalso io:format(standard_error, "SQL:\n====\n~s\n", [Enc]),
      ok      = mysql:query(MySqlPid, Enc);
    true ->
      ok
    end,

    %% Insert data to the temp table
    
    PfxSQL    = lists:append(["INSERT INTO `", TmpTab, "` (", HHHs, ") VALUES "]),
    [_|SfxSQL]= string:copies(QQQs, FstBatLen),
    TailSQL   = case LoadTp of
                  upsert -> on_duplicate_key_update(Tab, HD, PKey);
                  _      -> []
                end,
    Verbose > 0 andalso
      io:format(standard_error, "SQL:\n====\n~s~s~s\n", [PfxSQL, tl(QQQs), TailSQL]),
    {ok,Ref}  = mysql:prepare(MySqlPid, PfxSQL++SfxSQL++TailSQL),

    NNN = lists:foldl(fun(Batch, I) ->
      NRows = length(Batch),
      {SqlRef, Unprepare} =
        if NRows == FstBatLen ->
          {Ref, false};
        true ->
          [_|Sfx2] = string:copies(QQQs, NRows),
          {ok,R}   = mysql:prepare(MySqlPid, PfxSQL++Sfx2++TailSQL),
          {R, true}
        end,
      Row = lists:append(Batch),
      Verbose > 1 andalso io:format(standard_error, "Inserting: ~p\n", [Row]),
      Res = mysql:execute(MySqlPid, SqlRef, Row),
      Unprepare andalso mysql:unprepare(MySqlPid, SqlRef),
      case Res of
        ok ->
          I + NRows;
        {error, {_Code, _, _Msg}} ->
          SQL2 = PfxSQL++tl(QQQs),
          try
            lists:foldl(fun(R, J) ->
              case mysql:query(MySqlPid, SQL2, R) of
                ok ->
                  J+1;
                {error, {Code1, _, Msg1}} ->
                  throw({error_inserting_records, Code1, binary_to_list(Msg1), {row, J, R}})
              end
            end, I, Batch)
          catch _:E ->
            mysql:unprepare(MySqlPid, Ref),
            throw(E)
          end
      end
    end, 1, BatchRows),

    mysql:unprepare(MySqlPid, Ref),

    %% Now the temp table is populated - update the actual table
    SQL = lists:flatten(
      case LoadTp of
        upsert ->
          [];
        recreate ->
          OldTab = Tab ++ "_OLD",
          %% Rename the actual table X to X_OLD, and the tmp table T to X, and drop X_OLD
          %% in one transaction:
          ["DROP TABLE IF EXISTS `", OldTab, "`;\n"
           "-- Check if the real prod table exists\n"
           "SELECT count(*) INTO @exists\n"
           "  FROM information_schema.tables\n"
           " WHERE table_schema=database() AND\n"
           "       table_name='", Tab, "';\n"
           "-- If so, atomically rename it to another table,\n"
           "-- rename temp table to prod table, and drop the old table\n"
           "-- Otherwise, just atomically rename the temp table into prod\n"
           "SELECT count(*) from `", TmpTab, "` INTO @cnt;\n"
           "SET @query = IF(@exists>0,\n"
           "  'RENAME TABLE `", Tab,    "` TO `", OldTab, "`, `", TmpTab, "` TO `", Tab, "`',\n"
           "  'RENAME TABLE `", TmpTab, "` TO `", Tab,    "`');\n"
           "PREPARE stmt FROM @query;\n"
           "EXECUTE stmt;\n"
           "DEALLOCATE PREPARE stmt;\n"
           "DROP TABLE IF EXISTS `", OldTab, "`;\n"
           "SELECT @cnt,@cnt;\n"];
        replace ->
          ["REPLACE INTO `", Tab, "` (",HHHs,") SELECT ",HHHs," FROM `", TmpTab, "`;\n"
           "SELECT ROW_COUNT(),FOUND_ROWS();\n"];
        ignore_dups ->
          ["INSERT IGNORE INTO `", Tab, "` (",HHHs,") SELECT ",HHHs," FROM `", TmpTab, "`;\n"
           "SELECT ROW_COUNT(),FOUND_ROWS();\n"];
        update_dups ->
          ["INSERT INTO `", Tab, "` (",HHHs,") SELECT ",HHHs," FROM `", TmpTab, "`\n",
           on_duplicate_key_update(Tab, HD, PKey), ";\n",
           "SELECT ROW_COUNT(),FOUND_ROWS();\n"]
      end),

    case SQL of
      [] ->
        {HD, NNN, NNN};
      _ ->
        Verbose > 0 andalso io:format(standard_error, "SQL:\n====\n~s\n", [SQL]),

        case mysql:query(MySqlPid, SQL) of
          {ok, _, [[Changed,SelectedRows]]} ->
            if Drop ->
              ok = mysql:query(MySqlPid, lists:flatten(["DROP TABLE IF EXISTS `", TmpTab, "`;"]));
            true ->
              ok
            end,

            Selected = if SelectedRows < 0 -> 0; true -> SelectedRows end,
            {HD, Changed, Selected};
          {error, {Code3, _, Msg3}} ->
            throw({error_inserting_records, Code3, binary_to_list(Msg3), SQL})
        end
    end
  catch
    throw:{ignore, Result} ->
      Result
  end.

on_duplicate_key_update(Tab, HD, PKey) ->
  ["ON DUPLICATE KEY UPDATE ",
   string:join(
     [begin
        LL=binary_to_list(S),
        ["`",LL,"`=IFNULL(VALUES(",LL,"),`",Tab,"`.`",LL,"`)"]
      end || S <- HD, not lists:member(S, PKey)],
     ",")].
 
encoding(undefined) -> [];
encoding(A) when is_atom(A) -> encoding2(atom_to_list(A));
encoding(L) when is_list(L) -> encoding2(L).
encoding2(L) ->
  ["SET NAMES ", L, ";\n", encoding3(L)].
encoding3("utf8"++_) ->
  "SET CHARACTER SET utf8;\n";
encoding3(Other) ->
  ["SET CHARACTER SET ", Other].

to_string(L) when is_binary(L) -> binary_to_list(L);
to_string(L) when is_list(L)   -> L;
to_string(L) when is_atom(L)   -> atom_to_list(L).

to_binary(I) when is_binary(I) -> I;
to_binary(L) when is_list(L)   -> list_to_binary(L).

check_null(<<>>)               -> null;
check_null(B)                  -> B.

cc(null)                       -> "";
cc(B)        when is_binary(B) -> binary_to_list(B).
cc(L,null)   when is_list(L)   -> "";
cc(L,B)      when is_binary(B) -> " "++L++" "++binary_to_list(B).

cleanup_header([$ |T]) -> [$_|cleanup_header(T)];
cleanup_header([C|T]) when (C >= $a andalso C =< $z);
                           (C >= $A andalso C =< $Z);
                           (C >= $0 andalso C =< $9);
                           (C == $_)
                       -> [C|cleanup_header(T)];
cleanup_header([_|T])  -> cleanup_header(T);
cleanup_header([])     -> [].

%% @doc Guess the type of data by its value
-spec guess_data_type(binary()) ->
  {null | date | datetime | integer | float | string, term(), string()}.
guess_data_type(S) ->
  guess_data_type2(S, false).

guess_data_type2(<<"">>, _) ->
  {null, null, <<"">>};
guess_data_type2(S, true) ->
  {string, S, S};
guess_data_type2(S, _) ->
  guess_data_type3(S).

guess_data_type3(<<C, _/binary>> = S) when C >= $0, C =< $9
                                         ; C == $-; C == $+ ->
  try
    I = binary_to_integer(S),
    %case binary_to_integer(S) of
    %  I when I < -2147483648; I > 2147483647 ->
    %    {integer, I, S};
    %  I ->
    %    {integer, I, S}
    %end;
    {integer, I, S}
  catch _:_ ->
    try F = binary_to_float(S), {float, F, S} catch _:_ ->
      guess_data_type4(S)
    end
  end;
guess_data_type3(S) ->
  guess_data_type4(S).

guess_data_type4(<<Y1,Y2,Y3,Y4,$-,M1,M2,$-,D1,D2, _/binary>> = V)
  when Y1 >= $0, Y1 =< $9, Y2 >= $0, Y2 =< $9, Y3 >= $0, Y3 =< $9, Y4 >= $0, Y4 =< $9
     , M1 >= $0, M1 =< $9, M2 >= $0, M2 =< $9
     , D1 >= $0, D1 =< $9, D2 >= $0, D2 =< $9
     ->
  Y = i(Y1)*1000 + i(Y2)*100 + i(Y3)*10 + i(Y4),
  M = i(M1)*10   + i(M2),
  D = i(D1)*10   + i(D2),
  if Y < 1000; Y > 2500; M < 1; M > 12; D < 1; D > 31 ->
    def_type(V);
  true ->
    case V of
      <<_:10/binary, C,H1,H2,$:,N1,N2,$:,S1,S2, _/binary>>
        when (C == $  orelse C == $T)
           , H1 >= $0, H1 =< $9, H2 >= $0, H2 =< $9
           , N1 >= $0, N1 =< $9, N2 >= $0, N2 =< $9
           , S1 >= $0, S1 =< $9, S2 >= $0, S2 =< $9 ->
        H = i(H1)*10 + i(H2),
        N = i(N1)*10 + i(N2),
        S = i(S1)*10 + i(S2),
        if H < 0; H > 23; N < 0; N > 59; S < 0; S > 59 ->
          def_type(V);
        true ->
          {datetime, {{Y,M,D},{H,N,S}}, V}
        end;
      _ when byte_size(V) == 10 ->
        {date, {Y,M,D}, V};
      _ ->
        def_type(V)
    end
  end;
guess_data_type4(S) ->
  def_type(S).

def_type(V) ->
  {string, V, V}.

i(C) -> C - $0.

-spec foldr(fun((Position::integer(), Item::term(), Acc::term()) -> NewAcc::term()),
            Init::term(), list()) -> term().
foldr(Fun, Init, List) ->
  N = length(List),
  element(2, lists:foldr(fun(V, {I, S}) -> R = Fun(I, V, S), {I-1, R} end, {N, Init}, List)).

%%------------------------------------------------------------------------------
%% Tests
%%------------------------------------------------------------------------------
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

parse_test() ->
  ?assertEqual([<<>>],            parse_line(<<"\n">>)),
  ?assertEqual([<<>>],            parse_line(<<"\r\n">>)),
  ?assertEqual([<<>>],            parse_line(<<"\n\r">>)),
  ?assertEqual([<<>>],            parse_line(<<"\n\r\n">>)),
  ?assertEqual([<<"a">>],         parse_line(<<"a\r\n">>)),
  ?assertEqual([<<>>,<<>>],       parse_line(<<",\r\n">>)),
  ?assertEqual([<<>>,<<>>],       parse_line(<<",">>)),
  ?assertEqual([<<>>,<<"a">>],    parse_line(<<",a">>)),
  ?assertEqual([<<"a">>,<<"b">>], parse_line(<<"a , b">>)),
  ?assertEqual([<<"a">>,<<"b">>], parse_line(<<" a , b ">>)),
  ?assertEqual([<<"a">>,<<"b">>], parse_line(<<" a, b ">>)),
  ?assertEqual([<<"a">>,<<"b">>], parse_line(<<" a,b ">>)),
  ?assertEqual([<<"a">>,<<"b">>], parse_line(<<" a, \"b\" ">>)),
  ?assertEqual([<<"a">>,<<"b">>], parse_line(<<" a , b \r\n">>)),
  ?assertEqual([<<"a">>,<<"b \r\nbb">>,<<"c">>],parse_line(<<"\"a\",\"b \r\nbb\",\"c\"\r\n">>)),
  ?assertEqual([<<"a\"b">>,<<"c\"">>],          parse_line(<<"\"a\"\"b\",\"c\"\"\"">>)),
  ?assertEqual([<<"a,b">>],                     parse_line(<<"\"a\,b\"">>)),
  ?assertEqual([<<"a, b">>],                    parse_line(<<"\"a\, b\"">>)),
  Lines = [<<"a,bb,ccc">>,
           <<",b,c">>,
           <<",b,\"c,d\"">>,
           <<"\"c,d\"\r">>,
           <<"xx,yyy,zzzz">>,
           <<"xxx,yyyy,zzzzz">>,
           <<"a,b\n">>,
           <<"x,y\r">>,
           <<"z\r\n">>],
  CSV   = [parse_line(L) || L <- Lines],
  Res   = [[<<"a">>,   <<"bb">>,   <<"ccc">>],
           [<<"">>,    <<"b">>,    <<"c">>],
           [<<"">>,    <<"b">>,    <<"c,d">>],
           [<<"c,d">>],
           [<<"xx">>,  <<"yyy">>,  <<"zzzz">>],
           [<<"xxx">>, <<"yyyy">>, <<"zzzzz">>],
           [<<"a">>,<<"b">>],
           [<<"x">>,<<"y">>],
           [<<"z">>]
          ],

  ?assertEqual(Res, CSV).

parse2_test_() ->
  Name = case os:type() of
           {unix, _} -> "/tmp/csv.test.tmp";
           {win32,_} -> os:getenv("TEMP") ++ "/csv.test.tmp"
         end,
  {setup,
    %% Setup
    fun() ->
      ok = file:write_file(Name, "a,b,c\nr11,r12,r13\nr21,r22,r23\n"),
      Name
    end,
    %% Cleanup
    fun(_Name) ->
      file:delete(_Name)
    end,
    %% Tests
    [
     ?_assertEqual([[<<"a">>,<<"b">>,<<"c">>],
                    [<<"r11">>,<<"r12">>,<<"r13">>],
                    [<<"r21">>,<<"r22">>,<<"r23">>]
                   ],
                   parse(Name, [])),
     ?_assertEqual([[<<"a">>,<<"b">>],
                    [<<"r11">>,<<"r12">>],
                    [<<"r21">>,<<"r22">>]
                   ],
                   parse(Name, [{columns, ["a","b"]}])),
     ?_assertEqual([[<<"a">>,<<"c">>],
                    [<<"r11">>,<<"r13">>],
                    [<<"r21">>,<<"r23">>]
                   ],
                   parse(Name, [{columns, ["a","c"]}])),
     ?_assertEqual([[<<"b">>,<<"c">>],
                    [<<"r12">>,<<"r13">>],
                    [<<"r22">>,<<"r23">>]
                   ],
                   parse(Name, [{columns, ["b","c"]}])),
     ?_assertEqual([["a","b","c"],
                    ["r11","r12","r13"],
                    ["r21","r22","r23"]
                   ],
                   parse(Name, [list])),
     ?_assertEqual([["a","b"],
                    ["r11","r12"],
                    ["r21","r22"]
                   ],
                   parse(Name, [list, {columns, ["a","b"]}])),
     %% Test column converters based on a function and a regex:
     ?_assertEqual([["a","b"],
                    ["XX11","Row1"],
                    ["XX21","Row2"]
                   ],
                   parse(Name, [list, {columns,   ["a","b"]},
                                      {converters,[{"a", {rex, "r(\\d\\d)", "XX\\1"}},
                                                   {"b", fun(_, RowNum) ->
                                                           <<"Row", (integer_to_binary(RowNum))/binary>>
                                                         end}]}]))
    ]
  }.

max_lens_test() ->
  Lines = [[<<"a">>,   <<"bb">>,   <<"ccc">>],
           [<<"xx">>,  <<"yyy">>,  <<"zzzz">>],
           [<<"xxx">>, <<"yyyy">>, <<"zzzzz">>]],
  Res   = max_field_lengths(true, Lines),
  ?assertEqual([3,4,5], Res).

guess_type_test() ->
  ?assertEqual({integer, 1,   <<"1">>},   guess_data_type(<<"1">>)),
  ?assertEqual({float,   1.0, <<"1.0">>}, guess_data_type(<<"1.0">>)),
  ?assertEqual({date,    {2021,1,1}, <<"2021-01-01">>},                          guess_data_type(<<"2021-01-01">>)),
  ?assertEqual({datetime,{{2021,1,1},{0,0,0}}, <<"2021-01-01 00:00:00">>},       guess_data_type(<<"2021-01-01 00:00:00">>)),
  ?assertEqual({datetime,{{2021,1,1},{0,0,0}}, <<"2021-01-01 00:00:00+01:00">>}, guess_data_type(<<"2021-01-01 00:00:00+01:00">>)),
  ?assertEqual({datetime,{{2021,1,1},{0,0,0}}, <<"2021-01-01 00:00:00-01:00">>}, guess_data_type(<<"2021-01-01 00:00:00-01:00">>)),
  ?assertEqual({string,  <<"abc">>, <<"abc">>}, guess_data_type(<<"abc">>)),
  ?assertEqual({null,    null,  <<"">>},        guess_data_type(<<"">>)).

col_types_test() ->
  LineS = [["a", "b",   "c",   "d",          "e",                  "f",   "g", "h",  "i"],
           ["1", "1.0", "1.0", "2021-01-02", "2021-01-02",         "abc", "1", "1.0","2021-01-02"],
           ["1", "2.0", "3.0", "2021-03-02", "2021-03-02",         "abc", "1", "3.0","2023-01-02"],
           ["1", "2.0", "3.0", "2021-03-02", "2021-03-02",         "abc", "1", "3.0","2023-01-02"],
           ["1", "2.0", "3.0", "2021-03-02", "2021-03-02",         "abc", "1", "3.0","2023-01-02"],
           ["2", "2",   "2.0", "2021-02-03", "2021-01-02 00:01:02","efg", "",  "",   ""]],
  Lines = [[list_to_binary(I) || I <- Row] || Row <- LineS],
  Res   = guess_data_types(true, Lines),
  ?assertEqual({[{integer,  1,0},
                 {number,   3,0},
                 {float,    3,0},
                 {date,    10,0},
                 {datetime,19,0},
                 {string,   3,0},
                 {integer,  1,1},
                 {float,    3,1},
                 {date,    10,1}],
                [
                  [<<"a">>,<<"b">>,<<"c">>,<<"d">>,<<"e">>,<<"f">>,<<"g">>,<<"h">>,<<"i">>],
                  [1,1.0,1.0,{2021,01,02},{2021,01,02},           <<"abc">>, 1, 1.0, {2021,01,02}],
                  [1,2.0,3.0,{2021,03,02},{2021,03,02},           <<"abc">>, 1, 3.0, {2023,01,02}],
                  [1,2.0,3.0,{2021,03,02},{2021,03,02},           <<"abc">>, 1, 3.0, {2023,01,02}],
                  [1,2.0,3.0,{2021,03,02},{2021,03,02},           <<"abc">>, 1, 3.0, {2023,01,02}],
                  [2,2,  2.0,{2021,02,03},{{2021,01,02},{0,1,2}}, <<"efg">>, null, null, null]
                ]},
               Res),
  Res2 = guess_data_types(true, Lines, 10.0, 5),
  ?assertEqual({[{integer,  1,0},
                 {number,   3,0},
                 {float,    3,0},
                 {date,    10,0},
                 {datetime,19,0},
                 {string,   3,0},
                 {string,   1,1},
                 {string,   3,1},
                 {string,  10,1}],
                [
                  [<<"a">>,<<"b">>,<<"c">>,<<"d">>,<<"e">>,<<"f">>,<<"g">>,<<"h">>,<<"i">>],
                  [1,1.0,1.0,{2021,01,02},{2021,01,02},           <<"abc">>, 1, 1.0, {2021,01,02}],
                  [1,2.0,3.0,{2021,03,02},{2021,03,02},           <<"abc">>, 1, 3.0, {2023,01,02}],
                  [1,2.0,3.0,{2021,03,02},{2021,03,02},           <<"abc">>, 1, 3.0, {2023,01,02}],
                  [1,2.0,3.0,{2021,03,02},{2021,03,02},           <<"abc">>, 1, 3.0, {2023,01,02}],
                  [2,2,  2.0,{2021,02,03},{{2021,01,02},{0,1,2}}, <<"efg">>, null, null, null]
                ]},
               Res2).

-endif.