README.md

![Build Status](https://github.com/gpedic/ex_q/actions/workflows/ci.yml/badge.svg?branch=master)
[![Coverage Status](https://coveralls.io/repos/github/gpedic/ex_q/badge.svg?branch=master)](https://coveralls.io/github/gpedic/ex_q?branch=master)
[![License](https://img.shields.io/hexpm/l/ex_q.svg)](https://github.com/gpedic/ex_q/blob/master/LICENSE.md)
[![Last Updated](https://img.shields.io/github/last-commit/gpedic/ex_q.svg)](https://github.com/gpedic/ex_q/commits/master)

# Q

Q provides powerful pipeline composition inspired by Ecto.Multi, preserving each operation's output for full visibility of your pipeline's state — including when something goes wrong.

## Why Q?

Q makes data processing pipelines simpler and safer in three key ways:
* **Full Error Context**: When something fails, you see all data from every previous step, making errors easier to understand and fix

* **Simple Function Reuse**: Use your existing functions as-is in most cases. Q allows you to choose which parts of the state are passed to function arguments at each step of the pipeline

* **Easy Composition**: Build complex pipelines by combining smaller ones, just like regular Elixir functions. Break down large operations while keeping data flow clear

## Additional benefits of Q

* **Explicit State Management**: Every step's output is preserved and labeled, making it easy to understand and audit your data's transformation journey

* **Flexible Parameter Passing**: Choose between passing the full state map or specific parameters to each function, adapting to your needs without changing the function itself

* **Early Exit Support**: Gracefully handle conditional processing with the ability to halt execution early when appropriate, while still maintaining access to all processed data

* **Visibility**: The queue structure makes it clear what operations will be performed and in what order, improving maintainability

## Example

Here's a practical example showing data validation and transformation:

```elixir
defmodule UploadProcessor do
  import Q

  def process_upload(file_data) do
    queue do
      put(:file_data, file_data)
      run(:parsed, &DataProcessor.parse_csv/1, [:file_data])
      run(:validated, fn %{parsed: rows} ->
        Enum.reduce_while(rows, {:ok, []}, fn row, {:ok, acc} ->
          case DataProcessor.validate_row(row) do
            {:ok, valid} -> {:cont, {:ok, [valid | acc]}}
            {:error, _} = err -> {:halt, err}
          end
        end)
      end)
      run(:enriched, {DataProcessor, :enrich_data, [[use_ai: true]]}, [:validated])
    end
    |> exec()
  end
end
```

## Error handling

One of the major benefits of using Q is that it provides a complete context even in case of errors - you know exactly what step failed and have access to all previous results

```elixir
def process_data(data) do
  queue do
    put(:raw_data, data)
    run(:decoded, &Jason.decode/1, [:raw_data])
    run(:validated, &validate_schema/1, [:decoded])
    run(:normalized, &normalize_data/1, [:validated])
    run(:enriched, &add_metadata/1, [:normalized])
    exec()
  end
end

# If JSON parsing fails
{:error, :decoded, %Jason.DecodeError{data: "invalid", position: 0, token: nil}, %{raw_data: "invalid"}}

# If validation fails, notice both previous processing steps results are available
{:error, :validated, :invalid_schema, %{
  raw_data: "{\"foo\": 123}",
  decoded: %{"foo" => 123}
}}

```

### Without Q
```elixir
def process_data_without_q(data) do
  with {:ok, decoded} <- Jason.decode(data),
    {:ok, validated} <- validate_data(data),
    {:ok, transformed} <- normalize_data(validated),
    {:ok, enriched} <- add_metadata(transformed) do
    {:ok, enriched}
  else
    {:error, reason} -> 
      # We know the failure reason, but not the state when it failed
      # as the data from of processing steps is not available here
      {:error, reason}
  end
end
```

## Parameter Passing

Functions in Q can receive parameters in two ways:

1. Full context - by default, functions receive the entire state as a map
2. Specific params - by providing a list of operation keys, functions receive only those values as function arguments in the order they are specified

```elixir
defmodule ParamDemo do
  import Q

  def run_stuff(data) do
    queue do
      # Store some data for subsequent steps
      put(:some_data, data)

      # Pass a single argument from the state
      run(:single_arg, &MyModule.process_single/1, [:some_data])

      # Pass the complete state
      run(:custom_extract, fn %{single_arg: val} ->
        MyModule.do_something(val)
      end)

      # Pass a multiple arguments from the state
      run(:multi_arg, &MyModule.process_multiple/1, [:custom_extract, :some_data])

      # The mfa version allows us to pass additional params directly
      # by default args will be prepended i.e.
      # MyModule.delete(single_arg, some_data, soft_delete: true)
      run(:prepend_args, {MyModule, :delete, [[soft_delete: true]]}, [:single_arg, :some_data])

      # To append the arguments instead we can define the order
      # MyModule.changeset(%MyStruct{}, single_arg, some_data)
      run(:appended_args, {MyModule, :changeset, [%MyStruct{}]}, {[:single_arg, :some_data], order: :append})
    end
    |> exec()
  end
end
```

## Halting Execution Early

Functions can return `{:halt, value}` to stop execution early with a success state. This is useful for conditional processing where early termination is a valid outcome.

```elixir
    defmodule ExpensiveProcessor do
      import Q

      def process_data(input) do
        queue do
          put(:input, input)
          
          # Check cache first
          run(:cache_check, fn %{key: key} ->
            cached = get_from_cache(key)
            if not is_nil(cached) do
              {:halt, cached}  # Skip expensive operation if cached
            else
              {:ok, :not_found}
            end
          end)

          # Only runs if not in cache
          run(:processed, &expensive_operation/1, [:input])
          run(:cache, &cache_processed_data/1, [:processed])

          exec()
        end
      end

  # halt early on cache hit
  {:ok, %{
        input: %{key: "BWBeN28Vb7cMEx7Ym8AUzs", data: %{...}},
        cache_check: %{data: %{...}}
      }

  # run full pipeline when not cached
    {:ok, %{
        input: %{key: "BWBeN28Vb7cMEx7Ym8AUzs", data: %{...}},
        cache_check: :not_found
        processed: %{data: %{...}}
        cache: :ok
      }
```

## API

Q provides both a functional API and a DSL for creating queues. Here are both approaches side by side:

```elixir
import Q

# DSL
decode_q = queue do
  put(:base64_text, "aGVsbG8=")
  run(:decoded, &Base.decode64/1, [:base64_text])
  run(:decoded_mfa, {Base, :decoded64, []}, [:base64_text])
end

exec(decode_q)

# Returns:
{:ok, %{base64_text: "aGVsbG8=", decoded: "hello", decoded_mfa: "hello"}}
```

```elixir
# Functional
decode_q = Q.new()
  |> Q.put(:base64_text, "aGVsbG8=")
  |> Q.run(:decoded, &Base.decode64/1, [:base64_text])
  |> Q.run(:decoded_mfa, {Base, :decoded64, []}, [:base64_text])


Q.exec(decode_q)

# Returns:
{:ok, %{base64_text: "aGVsbG8=", decoded: "hello", decoded_mfa: "hello"}}
```

## Installation

If [available in Hex](https://hex.pm/docs/publish), the package can be installed
by adding `ex_q` to your list of dependencies in `mix.exs`:

```elixir
def deps do
  [
    {:ex_q, "~> 1.1"}
  ]
end
```

Documentation can be found at [https://hexdocs.pm/ex_q](https://hexdocs.pm/ex_q).