lib/bzip2.ex

defmodule Bzip2 do
  @input_block_size 900 * 1024

  @doc """
  Compress a stream or a string

  Compressing a stream,
  ```elixir
  File.stream!("foo.txt", [], 100 * 1024)
  |> Bzip2.compress!()
  |> Stream.into(File.stream!("foo.txt.bz2"))
  |> Stream.run()
  ```

  Compressing a string,
  ```elixir
  Bzip2.compress!("foo")
  |> Bzip2.decompress!()
  # "foo"
  ```
  """
  @spec compress!(binary() | Enumerable.t()) :: binary() | Enumerable.t()
  def compress!(input) when is_binary(input) do
    {:ok, pid} = StringIO.open(input)
    # FIXME: this block size is arbitrary
    IO.binstream(pid, @input_block_size)
    |> compress!()
    |> Enum.into("")
  end

  def compress!(input) do
    Stream.transform(input, :start, fn chunk, status ->
      compress_block!(status, chunk)
    end)
  end

  # FIXME: clean up state vs status types
  @type compression_state :: {Bzip2.Driver.compression_status(), reference()}
  @spec compress_block!(compression_state(), binary() | :halt, list(binary())) ::
          {list(binary()), compression_state()}
  defp compress_block!(state, input, output \\ [])

  # Initialize a new stream.
  defp compress_block!(:start, input, output) do
    state = Bzip2.Driver.compress_init!()
    compress_block!({:cont, state}, input, output)
  end

  # Consume input until finished.
  defp compress_block!({:cont, state}, input, output) do
    {:ok, next, next_output} = Bzip2.Driver.compress_block!(state, input)
    # Input is fully cached, so don't carry it on.
    compress_block!({next, state}, "", output ++ [next_output])
  end

  # Finished this input, close the stream and fetch another chunk.
  defp compress_block!({:stream_end, state}, _, output) do
    Bzip2.Driver.compress_end!(state)
    {output, :start}
  end

  @doc """
  Decompress a stream or a string

  Decompressing a stream,
  ```elixir
  File.stream!("foo.txt.bz2", [], 100 * 1024)
  |> Bzip2.decompress!()
  |> Stream.into(File.stream!("foo.txt"))
  |> Stream.run()
  ```

  Decompressing a string,
  ```elixir
  Bzip2.compress!("foo")
  |> Bzip2.decompress!()
  # "foo"
  ```
  """
  @spec decompress!(binary() | Enumerable.t()) :: binary() | Enumerable.t()
  def decompress!(input) when is_binary(input) do
    {:ok, pid} = StringIO.open(input)

    IO.binstream(pid, @input_block_size)
    |> decompress!()
    |> Enum.into("")
  end

  def decompress!(input) do
    # TODO: transform/5 is almost perfect for simplifying NIF init/end, but
    # can't restart for the next good block?
    Stream.transform(
      input,
      fn -> :start end,
      fn chunk, status -> decompress_block!(status, chunk) end,
      fn
        :start -> raise "No valid bzip2 data in stream."
        _ -> {:halt, nil}
      end,
      fn _ -> nil end
    )
  end

  # FIXME: clean up state vs status types
  @type decompression_state :: {Bzip2.Driver.decompression_status(), reference()}
  @spec decompress_block!(decompression_state(), binary(), list(binary())) ::
          {list(binary()), decompression_state()}
  defp decompress_block!(state, input, output \\ [])

  # Initialize a new stream.
  defp decompress_block!(:start, input, output) do
    state = Bzip2.Driver.decompress_init!()
    decompress_block!({:cont, state}, input, output)
  end

  defp decompress_block!(:restart, input, output) do
    state = Bzip2.Driver.decompress_init!()
    decompress_block!({:cont, state}, input, output)
  end

  defp decompress_block!({:cont, state}, input, output) do
    {:ok, next, next_output} = Bzip2.Driver.decompress_block!(state, input)
    # FIXME: looks like `state` should be included in the NIF response.
    decompress_block!({next, state}, input, output ++ [next_output])
  end

  defp decompress_block!({:next_input, state}, _, output) do
    {output, {:cont, state}}
  end

  defp decompress_block!({{:stream_end, 0}, state}, _, output) do
    Bzip2.Driver.decompress_end!(state)
    {output, :restart}
  end

  defp decompress_block!({{:stream_end, remaining}, state}, input, output) do
    Bzip2.Driver.decompress_end!(state)
    input = binary_part(input, byte_size(input) - remaining, remaining)
    decompress_block!(:restart, input, output)
  end

  # TODO: state machine to detect bad start
  defp decompress_block!({:bad_magic, state}, _, [""]) do
    Bzip2.Driver.decompress_end!(state)
    raise RuntimeError, "No valid bzip2 data found in stream"
  end

  defp decompress_block!({:bad_magic, state}, _, output) do
    Bzip2.Driver.decompress_end!(state)
    {output, {:halt, state}}
  end
end