lib/crawly/pipelines/write_to_file.ex

defmodule Crawly.Pipelines.WriteToFile do
  @moduledoc """
  Stores a given item into Filesystem

  Pipeline Lifecycle:
  1. When run (by `Crawly.Utils.pipe`), creates a file descriptor if not already created.
  2. Performs the write operation
  3. File descriptor is reused by passing it through the pipeline state with `:write_to_file_fd`

  > Note: `File.close` is not necessary due to the file descriptor being automatically closed upon the end of a the parent process.
  >
  > Refer to https://github.com/oltarasenko/crawly/pull/19#discussion_r350599526 for relevant discussion.

  ### Options
  In the absence of tuple-based options being passed, the pipeline will fallback onto the config of `:crawly`, `Crawly.Pipelines.WriteToFile`, for the `:folder` and `:extension` keys

  - `:folder`, optional. The folder in which the file will be created. Defaults to current project's folder.
     If provided folder does not exist it's created.
  - `:extension`, optional. The file extension in which the file will be created with. Defaults to `jl`.
  - `:include_timestamp`, boolean, optional, true by default. Allows to add timestamp to the filename.
  ### Example Declaration
  ```
  pipelines: [
    Crawly.Pipelines.JSONEncoder,
    {Crawly.Pipelines.WriteToFile, folder: "/tmp", extension: "csv"}
  ]
  ```
  ### Example Output

  ```
  iex> item = %{my: "item"}
  iex> WriteToFile.run(item, %{}, folder: "/tmp", extension: "csv")
  { %{my: "item"} , %{write_to_file_fd: #PID<0.123.0>} }
  ```

  """

  @behaviour Crawly.Pipeline

  require Logger

  @impl Crawly.Pipeline
  @spec run(
          item :: any,
          state :: %{
            optional(:write_to_file_fd) => pid | {:file_descriptor, atom, any}
          },
          opts :: [
            folder: String.t(),
            extension: String.t(),
            include_timestamp: boolean()
          ]
        ) ::
          {item :: any,
           state :: %{write_to_file_fd: pid | {:file_descriptor, atom, any}}}
  def run(item, state, opts \\ [])

  def run(item, %{write_to_file_fd: fd} = state, _opts) do
    :ok = write(fd, item)
    {item, state}
  end

  # No active FD
  def run(item, state, opts) do
    opts =
      Enum.into(opts, %{folder: nil, extension: nil, include_timestamp: true})

    folder = Map.get(opts, :folder, "./")

    :ok = maybe_create_folder(folder)

    # Use crawl_id in filename from now on to identify crawls
    crawl_id = Map.get(state, :crawl_id, "no_crawl_id")
    extension = Map.get(opts, :extension, "jl")

    filename =
      case Map.get(opts, :include_timestamp, false) do
        false ->
          "#{inspect(state.spider_name)}_#{crawl_id}.#{extension}"

        true ->
          ts_string =
            NaiveDateTime.utc_now()
            |> NaiveDateTime.to_string()
            |> String.replace(~r/( |-|:|\.)/, "_")

          "#{inspect(state.spider_name)}_#{ts_string}_#{crawl_id}.#{extension}"
      end

    fd = open_fd(folder, filename)
    :ok = write(fd, item)
    {item, Map.put(state, :write_to_file_fd, fd)}
  end

  defp open_fd(folder, filename) do
    # Open file descriptor to write items
    {:ok, io_device} =
      File.open(
        Path.join([folder, filename]),
        [:binary, :write, :delayed_write, :utf8]
      )

    io_device
  end

  # Performs the write operation
  @spec write(io, item) :: :ok
        when io: File.io_device(),
             item: any()
  defp write(io, item) do
    try do
      IO.write(io, item)
      IO.write(io, "\n")
    catch
      error, reason ->
        Logger.error(
          "Could not write item: #{inspect(item)} to io: #{inspect(io)}\n#{Exception.format(error, reason, __STACKTRACE__)}"
        )
    end
  end

  # Creates a folder if it does not exist
  defp maybe_create_folder(path) do
    case File.exists?(path) do
      false ->
        File.mkdir_p(path)

      true ->
        :ok
    end
  end
end