lib/lastfm_archive.ex

defmodule LastfmArchive do
  @moduledoc """
  `lastfm_archive` is a tool for extracting and archiving Last.fm music listening data - scrobbles.

  Current usage:
  - `sync/0`, `sync/2`: create and sync Lastfm scrobble data to local file archives
  - `transform/0`, `transform/2`: transform downloaded raw data in file archives into columnar and other formats, e.g. CSV, Apache Parquet, Arrow
  - `read/2`: load daily, monthly, yearly and entire dataset from file archives into data frames
  - `load_archive/2`: load all CSV data from the archive into Solr

  """

  alias LastfmArchive.Archive.Metadata
  alias LastfmArchive.Behaviour.Archive
  alias LastfmArchive.LastfmClient.Impl, as: LastfmClient
  alias LastfmArchive.LastfmClient.LastfmApi

  import LastfmArchive.Archive.Transformers.Transformer, only: [transformer: 1]

  @path_io Application.compile_env(:lastfm_archive, :path_io, Elixir.Path)

  @type metadata :: Metadata.t()
  @type time_range :: {integer, integer}
  @type solr_url :: atom | Hui.URL.t()

  @type options :: LastfmArchive.Behaviour.Archive.options()

  @doc """
  Returns the total playcount and registered, i.e. earliest scrobble time for a user.
  """
  defdelegate info, to: LastfmClient

  @doc """
  Returns the default coofigured Lastfm user
  """
  defdelegate default_user, to: LastfmClient

  @doc """
  Sync scrobbles for a Lastfm user.

  ### Example

  ```
    LastfmArchive.sync("a_lastfm_user")
  ```

  You can also specify a default user is in configuration,
  for example `user_a` in `config/config.exs`:

  ```
    config :lastfm_archive,
      user: "user_a",
      ... # other archiving options
  ```

  And run:

  ```
    LastfmArchive.sync
  ```

  The first sync downloads all daily scrobbles in 200-track (gzip compressed)
  chunks that are written into a local file archive. Subsequent syncs extract
  further scrobbles starting from the date of latest downloaded scrobbles.

  The data is currently in raw Lastfm `recenttracks` JSON format, chunked into
  200-track (max) `gzip` compressed pages and stored within directories corresponding
  to the days when tracks were scrobbled.

  Options:

  - `:data_dir` - default `lastfm_data`, file archives are stored within this directory

  - `:date` - archive scrobbles from this date only

  - `:interval` - default `1000`(ms), the duration between successive Lastfm API requests.
  This default interval ensures a safe request rate that is
  within Lastfm's term of service: no more than 5 requests per second

  - `:overwrite` - default `false`, if sets to true
  the tool will (re)fetch and overwrite any previously downloaded
  data. Use this option to refresh the file archive. Otherwise (false),
  the system will not be making calls to Lastfm to re-fetch data
  if existing data chunks / pages are found.

  - `:per_page` - default `200`, number of scrobbles per file. The default is the
  max number of tracks per request permissible by Lastfm

  - `:year` - archive scrobbles from this year only

  `:interval`, `:per_page` and `:data_dir` options can be configured in `config/config.exs`:

  ```
    config :lastfm_archive,
      ...
      data_dir: "./lastfm_data/"
  ```

  """
  @spec sync(binary, keyword) :: {:ok, metadata()} | {:error, :file.posix()}
  def sync(user \\ default_user(), options \\ []) do
    user
    |> impl().describe(options)
    |> then(fn {:ok, metadata} -> impl().archive(metadata, options, LastfmApi.new()) end)
  end

  @doc """
  Read from an archive of a Lastfm user.

  This returns scrobbles for a single day or month period
  in a lazy Explorer.DataFrame for further data manipulation
  and visualisation.

  ### Example
  ```
    # read a single-day scrobbles from the configured
    # archive (FileArchive) and default user
    LastfmArchive.read(day: ~D[2022-12-31])

    # read a single-month scrobbles for a user
    LastfmArchive.read("a_lastfm_user",  month: ~D[2022-12-31])
  ```

  Options:
  - `:day` - read scrobbles for this particular date (`Date.t()`)
  - `:month` - read scrobbles for this particular month (`Date.t()`)

  This function can also return a lazy data frame from derived archive.
  i.e. CSV, Parquet archives created via `transform/2`.

  ### Example
  ```
    # read a single year of scrobbles for a user from Parquet archive
    LastfmArchive.read("a_lastfm_user", format: :parquet, year: 2023)

    # read everything for a user from Parquet archive
    LastfmArchive.read("a_lastfm_user", format: :parquet)
  ```

  Options:
  - `:format` (required) - derived archive format: `:csv`, `:parquet`, `:ipc`, `:ipc_stream`
  - `:facet` - type of archive: `:scrobbles` (default), `:albums`, `:artists` or `:tracks`
  - `:year` - only read scrobbles for this particular year
  - `:columns` - an atom list for retrieving only a columns subset, available columns:
  #{%LastfmArchive.Archive.Scrobble{} |> Map.keys() |> List.delete(:__struct__) |> Enum.map_join(", ", &(("`:" <> Atom.to_string(&1)) <> "`"))}
  """
  @spec read(binary, keyword()) :: {:ok, Explorer.DataFrame} | {:error, term()}
  def read(user \\ default_user(), options) do
    user
    |> impl(options).describe(options)
    |> then(fn {:ok, metadata} -> impl(options).read(metadata, options) end)
  end

  @doc """
  Transform downloaded file archive into CSV or Apache Parquet formats for a Lastfm user.

  ### Example

  ```
    LastfmArchive.transform("a_lastfm_user", format: :csv)

    # transform archive of the default user into CSV files
    LastfmArchive.transform()
  ```

  The function only transforms downloaded archive data on local filesystem. It does not fetch data from Lastfm,
  which can be done via `sync/2`.

  The transformed files are created on a yearly basis and stored in `gzip` compressed format.
  They are stored in a `csv` or `parquet` directory within either the default `./lastfm_data/`
  or the directory specified in config/config.exs (`:lastfm_archive, :data_dir`).

  Options:
  - `:format` - format into which file archive is transformed: `:csv`, `:parquet`, `:ipc`, `:ipc_stream` (default)
  - `:facet` - type of archive: `:scrobbles` (default), `:albums`, `:artists` or `:tracks`
  - `:overwrite` existing data, default: false
  - `:year` - optionally transform data from this particular year
  """
  @spec transform(binary, options) :: any
  def transform(user \\ default_user(), options \\ [format: :ipc_stream])

  def transform(user, options) when is_binary(user) do
    facet = Keyword.get(options, :facet, :scrobbles)
    options = options |> Keyword.merge(facet: facet)

    user
    |> impl(options).describe(options)
    |> then(fn {:ok, metadata} -> impl(options).post_archive(metadata, transformer(facet), options) end)
    |> then(fn {:ok, metadata} -> impl(options).update_metadata(metadata, options) end)
  end

  defp impl(options \\ []) do
    case Keyword.has_key?(options, :format) do
      true -> Archive.impl(:derived_archive)
      false -> Archive.impl(:file_archive)
    end
  end

  # return all archive file paths in a list
  defp ls_archive_files(user, options \\ []) do
    LastfmArchive.Utils.Archive.user_dir(user, options)
    |> Path.join("**/*.gz")
    |> @path_io.wildcard([])
    |> Enum.map(&(String.split(&1 |> to_string, user <> "/") |> tl |> hd))
  end

  @doc """
  Load all CSV data from the archive into Solr for a Lastfm user.

  The function finds CSV files from the archive and sends them to
  Solr for ingestion one at a time. It uses `Hui` client to interact
  with Solr and the `t:Hui.URL.t/0` struct
  for Solr endpoint specification.

  ### Example

  ```
    # define a Solr endpoint with %Hui.URL{} struct
    headers = [{"Content-type", "application/json"}]
    url = %Hui.URL{url: "http://localhost:8983/solr/lastfm_archive", handler: "update", headers: headers}

    LastfmArchive.load_archive("a_lastfm_user", url)
  ```

  CSV files must be pre-created before the loading - see
  `transform/2`.
  """
  @spec load_archive(binary, solr_url) :: :ok | {:error, Hui.Error.t()}
  def load_archive(user, url) when is_atom(url) and url != nil do
    url_config = Application.get_env(:hui, url)
    url_struct = if url_config, do: struct(Hui.URL, url_config), else: nil
    load_archive(user, url_struct)
  end

  def load_archive(user, url) when is_map(url) do
    with {status1, _} <- LastfmArchive.Load.ping_solr(url.url),
         {status2, _} <- LastfmArchive.Load.check_solr_schema(url.url) do
      case {status1, status2} do
        {:ok, :ok} -> _load_archive(user, url)
        _ -> {:error, %Hui.Error{reason: :ehostunreach}}
      end
    end
  end

  def load_archive(_, _), do: {:error, %Hui.Error{reason: :einval}}

  defp _load_archive(user, url) do
    archive_files = ls_archive_files(user)

    for tsv_file <- archive_files, String.match?(tsv_file, ~r/^csv/) do
      IO.puts("Loading #{tsv_file} into Solr")
      {status, _resp} = LastfmArchive.Load.load_solr(url, user, tsv_file)
      IO.puts("#{status}\n")
    end

    :ok
  end
end