defmodule Bzip2 do
@input_block_size 900 * 1024
@doc """
Compress a stream or a string
Compressing a stream,
```elixir
File.stream!("foo.txt", [], 100 * 1024)
|> Bzip2.compress!()
|> Stream.into(File.stream!("foo.txt.bz2"))
|> Stream.run()
```
Compressing a string,
```elixir
Bzip2.compress!("foo")
|> Bzip2.decompress!()
# "foo"
```
"""
@spec compress!(binary() | Enumerable.t()) :: binary() | Enumerable.t()
def compress!(input) when is_binary(input) do
{:ok, pid} = StringIO.open(input)
# FIXME: this block size is arbitrary
IO.binstream(pid, @input_block_size)
|> compress!()
|> Enum.into("")
end
def compress!(input) do
Stream.transform(input, :start, fn chunk, status ->
compress_block!(status, chunk)
end)
end
# FIXME: clean up state vs status types
@type compression_state :: {Bzip2.Driver.compression_status(), reference()}
@spec compress_block!(compression_state(), binary() | :halt, list(binary())) ::
{list(binary()), compression_state()}
defp compress_block!(state, input, output \\ [])
# Initialize a new stream.
defp compress_block!(:start, input, output) do
state = Bzip2.Driver.compress_init!()
compress_block!({:cont, state}, input, output)
end
# Consume input until finished.
defp compress_block!({:cont, state}, input, output) do
{:ok, next, next_output} = Bzip2.Driver.compress_block!(state, input)
# Input is fully cached, so don't carry it on.
compress_block!({next, state}, "", output ++ [next_output])
end
# Finished this input, close the stream and fetch another chunk.
defp compress_block!({:stream_end, state}, _, output) do
Bzip2.Driver.compress_end!(state)
{output, :start}
end
@doc """
Decompress a stream or a string
Decompressing a stream,
```elixir
File.stream!("foo.txt.bz2", [], 100 * 1024)
|> Bzip2.decompress!()
|> Stream.into(File.stream!("foo.txt"))
|> Stream.run()
```
Decompressing a string,
```elixir
Bzip2.compress!("foo")
|> Bzip2.decompress!()
# "foo"
```
"""
@spec decompress!(binary() | Enumerable.t()) :: binary() | Enumerable.t()
def decompress!(input) when is_binary(input) do
{:ok, pid} = StringIO.open(input)
IO.binstream(pid, @input_block_size)
|> decompress!()
|> Enum.into("")
end
def decompress!(input) do
Stream.transform(input, :start, fn chunk, status ->
decompress_block!(status, chunk)
end)
end
# FIXME: clean up state vs status types
@type decompression_state :: {Bzip2.Driver.decompression_status(), reference()}
@spec decompress_block!(decompression_state(), binary(), list(binary())) ::
{list(binary()), decompression_state()}
defp decompress_block!(state, input, output \\ [])
# Initialize a new stream.
defp decompress_block!(:start, input, output) do
state = Bzip2.Driver.decompress_init!()
decompress_block!({:cont, state}, input, output)
end
defp decompress_block!({:cont, state}, input, output) do
{:ok, next, next_output} = Bzip2.Driver.decompress_block!(state, input)
# FIXME: looks like `state` should be included in the NIF response.
decompress_block!({next, state}, input, output ++ [next_output])
end
defp decompress_block!({:next_input, state}, _, output) do
{output, {:cont, state}}
end
defp decompress_block!({{:stream_end, 0}, state}, _, output) do
# TODO: deduplicate
Bzip2.Driver.decompress_end!(state)
{output, :start}
end
defp decompress_block!({{:stream_end, remaining}, state}, input, output) do
Bzip2.Driver.decompress_end!(state)
input = binary_part(input, byte_size(input) - remaining, remaining)
decompress_block!(:start, input, output)
end
end