lib/avalanche/steps/decode_data.ex

defmodule Avalanche.Steps.DecodeData do
  @moduledoc """
  A custom `Req` pipeline step to decode the `body.data` returned by Snowflake.
  """

  require Logger

  @unix_epoch ~D[1970-01-01]

  @doc """
  Decodes response `body.data` based on the `resultSetMetaData`.

  https://docs.snowflake.com/en/developer-guide/sql-api/reference.html#label-sql-api-reference-resultset-resultsetmetadata

  ## Options

    * `:downcase_column_names` - Downcase the result's column names.
  """
  def attach(%Req.Request{} = request, options \\ []) do
    request
    |> Req.Request.register_options([:downcase_column_names])
    |> Req.Request.merge_options(options)
    |> Req.Request.append_response_steps(decode_data: &decode_data/1)
  end

  def decode_data(request_response)

  def decode_data({request, %{body: ""} = response}) do
    {request, response}
  end

  def decode_data({request, %{status: 200, body: body} = response}) do
    downcase_column_names = Map.fetch!(request.options, :downcase_column_names)

    row_types =
      case Map.get(body, "resultSetMetaData") do
        nil -> Req.Request.get_private(request, :avalanche_row_types, [])
        metadata -> Map.fetch!(metadata, "rowType")
      end

    data = Map.fetch!(body, "data")

    decoded_data = decode_data_rows(row_types, data, downcase_column_names)

    {request, %Req.Response{response | body: Map.put(body, "data", decoded_data)}}
  end

  def decode_data(request_response), do: request_response

  defp decode_data_rows(types, data, downcase_column_names) do
    Enum.map(data, fn row ->
      Enum.zip_reduce(types, row, %{}, fn type, value, result ->
        column_name = maybe_downcased_column_name(type, downcase_column_names)
        column_value = decode(type, value)
        Map.put(result, column_name, column_value)
      end)
    end)
  end

  defp maybe_downcased_column_name(type, true), do: type |> Map.fetch!("name") |> String.downcase()
  defp maybe_downcased_column_name(type, false), do: Map.fetch!(type, "name")

  defp decode(_type, value) when is_nil(value), do: nil

  defp decode(%{"type" => "fixed" = type, "scale" => scale}, value) when scale > 0 do
    case Float.parse(value) do
      {_float, _rest} -> Decimal.new(value)
      :error -> return_raw(type, value, :fixed_float_parse_error)
    end
  end

  defp decode(%{"type" => "fixed" = type}, value) do
    case Integer.parse(value) do
      {integer, _rest} -> integer
      :error -> return_raw(type, value, :integer_parse_error)
    end
  end

  defp decode(%{"type" => "float" = type}, value) do
    case Float.parse(value) do
      {float, _rest} -> float
      :error -> return_raw(type, value, :float_parse_error)
    end
  end

  defp decode(%{"type" => "real" = type}, value) do
    case Float.parse(value) do
      {float, _rest} -> float
      :error -> return_raw(type, value, :real_parse_error)
    end
  end

  defp decode(%{"type" => "text"}, value), do: value

  defp decode(%{"type" => "boolean"}, value), do: value == "true"

  # Integer value (in a string) of the number of days since the epoch (e.g. 18262).
  defp decode(%{"type" => "date" = type}, value) do
    case Integer.parse(value) do
      {days, _rest} -> Date.add(@unix_epoch, days)
      :error -> return_raw(type, value, :date_parse_error)
    end
  end

  defp decode(%{"type" => "time" = type}, value) do
    case Time.from_iso8601(value) do
      {:ok, time} -> time
      {:error, error} -> return_raw(type, value, error)
    end
  end

  defp decode(%{"type" => "timestamp_ltz" = type}, value) do
    case DateTime.from_iso8601(value) do
      {:ok, datetime, _utc_offset} -> datetime
      {:error, error} -> return_raw(type, value, error)
    end
  end

  defp decode(%{"type" => "timestamp_ntz" = type}, value) do
    case NaiveDateTime.from_iso8601(value) do
      {:ok, datetime} -> datetime
      {:error, error} -> return_raw(type, value, error)
    end
  end

  defp decode(%{"type" => "timestamp_tz" = type}, value) do
    case DateTime.from_iso8601(value) do
      {:ok, datetime, _utc_offset} -> datetime
      {:error, error} -> return_raw(type, value, error)
    end
  end

  defp decode(%{"type" => "object" = type}, value) do
    case Jason.decode(value) do
      {:ok, json} -> json
      {:error, error} -> return_raw(type, value, error)
    end
  end

  # maybe json, maybe something else
  defp decode(%{"type" => "variant" = type}, value) do
    case Jason.decode(value) do
      {:ok, json} -> json
      {:error, error} -> return_raw(type, value, error)
    end
  end

  defp decode(%{"type" => "array" = type}, value) do
    case Jason.decode(value) do
      {:ok, json} -> json
      {:error, error} -> return_raw(type, value, error)
    end
  end

  defp decode(%{"type" => type}, value) do
    Logger.warning("Failed decode of unsupported type: #{type}")
    value
  end

  defp return_raw(type, value, error) do
    error_msg =
      case error do
        %{__exception__: true} = exception -> Exception.message(exception)
        _ -> error
      end

    Logger.warning("Failed decode of '#{type}' type: #{error_msg}")
    value
  end
end