Skip to main content

README.md

# IdempotencyKit

<p align="left">
  <img src="https://raw.githubusercontent.com/metacircu1ar/idempotency_kit/main/logo.png" alt="IdempotencyKit logo" width="180">
</p>

`IdempotencyKit` helps you make mutation endpoints safe to retry.

It is built for Plug/Phoenix apps with Ecto/Postgres.

## Installation

```elixir
def deps do
  [
    {:idempotency_kit, "~> 0.1.0"}
  ]
end
```

## What it does

For the same `(user_id, scope, idempotency_key)`:

1. first request claims execution
2. duplicate while first is running returns `processing`
3. after completion, duplicate replays stored response
4. same key + different payload returns `payload_mismatch`

This prevents accidental duplicate creates when clients retry after lost responses.

## How it works

- payload is hashed deterministically (`sha256`)
- claim row is stored in DB with unique key `(user_id, scope, idempotency_key)`
- action executes once
- final HTTP status/body is persisted
- later duplicates replay that persisted response
- stale `processing` rows can be reclaimed after a timeout

Payload hashing caveat:

- hashing is deterministic for the exact Elixir term shape
- `"1"` and `1` hash differently
- `%{"a" => 1}` and `%{a: 1}` hash differently
- normalize payloads before hashing if your client shapes vary

## Important caveat (read this)

This is a **toolkit**, not a full drop-in service.

You still need to provide:

1. your own Ecto schema + migration for idempotency rows
2. your own Repo module
3. a small app adapter module implementing `IdempotencyKit.Store`
4. a scheduled cleanup job for old rows
5. client-side key generation and retry behavior

So yes, it is app-independent, but host apps must wire the infrastructure around it.

## Real integration examples (YourApp)

Below are concrete examples using a placeholder app name (`YourApp`). This maps
directly to the 5 required host responsibilities above.

### 1) Ecto schema + migration

Paths in YourApp:

- `server/lib/your_app/idempotency/request.ex`
- `server/priv/repo/migrations/<timestamp>_create_idempotency_requests.exs`

```elixir
# server/lib/your_app/idempotency/request.ex
defmodule YourApp.Idempotency.Request do
  use Ecto.Schema
  import Ecto.Changeset

  @all_statuses ["processing", "succeeded", "failed"]
  @completed_statuses ["succeeded", "failed"]

  schema "idempotency_requests" do
    field :scope, :string
    field :idempotency_key, :string
    field :request_hash, :string
    field :status, :string, default: "processing"
    field :response_status, :integer
    field :response_body, :map
    field :completed_at, :utc_datetime
    belongs_to :user, YourApp.Accounts.User
    timestamps()
  end

  def create_changeset(request, attrs) do
    request
    |> cast(attrs, [:user_id, :scope, :idempotency_key, :request_hash, :status])
    |> validate_required([:user_id, :scope, :idempotency_key, :request_hash, :status])
    |> validate_length(:scope, min: 1, max: 120)
    |> validate_length(:idempotency_key, min: 1, max: 255)
    |> validate_length(:request_hash, is: 64)
    |> validate_inclusion(:status, @all_statuses)
    |> unique_constraint(:idempotency_key, name: :idempotency_requests_user_scope_key_idx)
  end

  def completion_changeset(request, attrs) do
    request
    |> cast(attrs, [:status, :response_status, :response_body, :completed_at])
    |> validate_required([:status, :response_status, :response_body, :completed_at])
    |> validate_inclusion(:status, @completed_statuses)
    |> validate_number(:response_status, greater_than_or_equal_to: 100, less_than: 600)
  end
end
```

```elixir
# server/priv/repo/migrations/20260517162721_create_idempotency_requests.exs
def change do
  create table(:idempotency_requests) do
    add :user_id, references(:users, on_delete: :delete_all), null: false
    add :scope, :string, null: false
    add :idempotency_key, :string, null: false
    add :request_hash, :string, null: false
    add :status, :string, null: false, default: "processing"
    add :response_status, :integer
    add :response_body, :map
    add :completed_at, :utc_datetime
    timestamps()
  end

  create constraint(:idempotency_requests, :idempotency_requests_status_check,
           check: "status IN ('processing', 'succeeded', 'failed')"
         )

  create unique_index(:idempotency_requests, [:user_id, :scope, :idempotency_key],
           name: :idempotency_requests_user_scope_key_idx
         )

  create index(:idempotency_requests, [:inserted_at],
           name: :idempotency_requests_inserted_at_idx
         )
end
```

### 2) Repo module

Path in YourApp:

- `server/lib/your_app/repo.ex`

```elixir
defmodule YourApp.Repo do
  use Ecto.Repo,
    otp_app: :your_app,
    adapter: Ecto.Adapters.Postgres
end
```

### 3) App adapter module implementing `IdempotencyKit.Store`

Paths in YourApp:

- `server/lib/your_app/idempotency/store/ecto.ex`
- `server/lib/your_app/idempotency.ex`

```elixir
# server/lib/your_app/idempotency/store/ecto.ex
defmodule YourApp.Idempotency.Store.Ecto do
  @behaviour IdempotencyKit.Store

  alias YourApp.Idempotency.Request
  alias YourApp.Repo
  alias IdempotencyKit.Store.Ecto, as: KitEctoStore

  # Deterministically hash payloads before claim/mismatch checks.
  defdelegate request_hash(payload), to: KitEctoStore

  # Optional read-only exact-retry pre-check. Useful for app policy, such as
  # avoiding a rate-limit debit before the real claim path runs.
  def replay_candidate?(user_id, scope, idempotency_key, request_payload) do
    KitEctoStore.replay_candidate?(
      Repo,
      Request,
      user_id,
      scope,
      idempotency_key,
      request_payload
    )
  end

  # Main claim state machine for execute/processing/replay/mismatch outcomes.
  def claim_request(user_id, scope, idempotency_key, request_hash) do
    KitEctoStore.claim_request(
      Repo,
      Request,
      user_id,
      scope,
      idempotency_key,
      request_hash,
      Application.get_env(:your_app, :idempotency, [])
    )
  end

  # Persist the terminal response used for future idempotent replays.
  def complete_request(request, status, response_status, response_body) do
    KitEctoStore.complete_request(
      Repo,
      Request,
      request,
      status,
      response_status,
      response_body
    )
  end

  # Retention cleanup for old idempotency records.
  def purge_stale_requests do
    KitEctoStore.purge_stale_requests(
      Repo,
      Request,
      Application.get_env(:your_app, :idempotency, [])
    )
  end
end

# server/lib/your_app/idempotency.ex
defmodule YourApp.Idempotency do
  alias YourApp.Idempotency.Store.Ecto, as: EctoStore

  defdelegate request_hash(payload), to: EctoStore

  # Expose the optional exact-retry pre-check for controllers/plugs that need it.
  defdelegate replay_candidate?(user_id, scope, idempotency_key, request_payload),
    to: EctoStore

  defdelegate claim_request(user_id, scope, idempotency_key, request_hash),
    to: EctoStore

  defdelegate complete_request(request, status, response_status, response_body),
    to: EctoStore

  defdelegate purge_stale_requests(), to: EctoStore
end
```

### 4) Scheduled cleanup job

Paths in YourApp:

- `server/lib/your_app/scheduler.ex`
- `server/config/config.exs`

```elixir
# server/lib/your_app/scheduler.ex
defmodule YourApp.Scheduler do
  use Quantum, otp_app: :your_app
end
```

```elixir
# server/config/config.exs
config :your_app, YourApp.Scheduler,
  timezone: "Etc/UTC",
  jobs: [
    cleanup_idempotency_requests: [
      schedule: "@daily",
      task: {YourApp.Idempotency, :purge_stale_requests, []}
    ]
  ]
```

### 5) Client-side key generation and retry behavior

Path in YourApp:

- `client/api.ts`

```ts
function createIdempotencyKey(prefix: string): string {
  const normalizedPrefix = prefix.trim() || "request";
  const randomPart = createRandomId();
  return `${normalizedPrefix}-${randomPart}`;
}

async function requestWithIdempotentProcessingRetry<T>(
  path: string,
  options: RequestInit,
  retryOptions: {
    keyPrefix?: string;
    processingErrorCode: string;
    maxPollAttempts?: number;
    pollDelayMs?: number;
    networkRetryAttempts?: number;
  }
): Promise<T> {
  // Keep one key for one logical submission and all immediate retries.
  const idempotencyKey = createIdempotencyKey(retryOptions.keyPrefix || "request");
  const maxPollAttempts = retryOptions.maxPollAttempts ?? 48;
  const pollDelayMs = retryOptions.pollDelayMs ?? 2500;
  const networkRetryAttempts = retryOptions.networkRetryAttempts ?? 3;
  let attempt = 0;

  while (true) {
    try {
      return await request<T>(
        path,
        {
          ...options,
          headers: {
            ...(options.headers as Record<string, string> | undefined),
            "Idempotency-Key": idempotencyKey
          }
        },
        { networkRetryAttempts }
      );
    } catch (error) {
      const isProcessingError =
        error instanceof ApiRequestError &&
        error.status === 409 &&
        error.code === retryOptions.processingErrorCode;

      if (!isProcessingError || attempt >= maxPollAttempts) {
        throw error;
      }

      attempt += 1;
      await wait(pollDelayMs);
    }
  }
}
```

## Main modules

- `IdempotencyKit.Core` - orchestration helpers
- `IdempotencyKit.Store` - store behaviour
- `IdempotencyKit.Store.Ecto` - generic Ecto/Postgres implementation helpers
- `IdempotencyKit.Phoenix.Action` - controller adapter

## Required storage fields

Your idempotency table should have:

- `user_id`
- `scope`
- `idempotency_key`
- `request_hash` (64-char sha256 hex)
- `status` (`processing|succeeded|failed`)
- `response_status`
- `response_body`
- `completed_at`
- timestamps

Recommended indexes/constraints:

- unique index on `(user_id, scope, idempotency_key)`
- status check constraint
- index on `inserted_at` for cleanup

Schema integration note:

- by default, `IdempotencyKit.Store.Ecto` will call
  `YourSchema.create_changeset(struct(YourSchema), attrs)` if it exists
- if your schema does not expose `create_changeset/2`, pass
  `create_changeset_fun: fn schema_module, attrs -> ... end` in store options

## Copy-paste schema and migration

Example schema:

```elixir
defmodule MyApp.Idempotency.Request do
  use Ecto.Schema
  import Ecto.Changeset

  schema "idempotency_requests" do
    field :scope, :string
    field :idempotency_key, :string
    field :request_hash, :string
    field :status, :string, default: "processing"
    field :response_status, :integer
    field :response_body, :map
    field :completed_at, :utc_datetime

    belongs_to :user, MyApp.Accounts.User

    timestamps()
  end

  def create_changeset(request, attrs) do
    request
    |> cast(attrs, [:user_id, :scope, :idempotency_key, :request_hash, :status])
    |> validate_required([:user_id, :scope, :idempotency_key, :request_hash, :status])
    |> validate_inclusion(:status, ["processing", "succeeded", "failed"])
    |> unique_constraint(:idempotency_key, name: :idempotency_requests_user_scope_key_idx)
  end
end
```

Example migration:

```elixir
def change do
  create table(:idempotency_requests) do
    add :user_id, references(:users, on_delete: :delete_all), null: false
    add :scope, :string, null: false
    add :idempotency_key, :string, null: false
    add :request_hash, :string, null: false
    add :status, :string, null: false, default: "processing"
    add :response_status, :integer
    add :response_body, :map
    add :completed_at, :utc_datetime

    timestamps()
  end

  create unique_index(
           :idempotency_requests,
           [:user_id, :scope, :idempotency_key],
           name: :idempotency_requests_user_scope_key_idx
         )

  create constraint(
           :idempotency_requests,
           :idempotency_requests_status_check,
           check: "status IN ('processing', 'succeeded', 'failed')"
         )

  create index(:idempotency_requests, [:inserted_at])
end
```

## Cleanup

The library does not schedule cleanup for you.

Run a periodic job (for example daily) that calls your store purge function.

## Controller usage example

```elixir
defmodule MyAppWeb.EntityController do
  use MyAppWeb, :controller

  alias IdempotencyKit.Phoenix.Action, as: IdempotentAction

  @idempotency_scope "entity_create"

  def create(
        conn,   # Plug.Conn for the incoming HTTP request.
        params  # Request payload; this is hashed for idempotency matching.
      ) do
    user = conn.assigns.current_user

    execute_fun = fn idempotent_conn ->
      # Runs only when claim result is :execute.
      create_entity(idempotent_conn, user, params)
    end

    case IdempotentAction.maybe_run_for_user(
           conn,                       # incoming conn
           user.id,                    # unique partition by user
           @idempotency_scope,         # action scope
           params,                     # payload used to compute request hash
           execute_fun,                # mutation closure
           idempotency_opts(user.id)   # adapter options
         ) do
      {:handled, handled_conn} ->
        # Already handled by idempotency adapter (execute/replay/error).
        handled_conn

      {:no_key, plain_conn} ->
        # No key header: fallback non-idempotent path.
        create_entity(plain_conn, user, params)
    end
  end

  defp idempotency_opts(user_id) do
    [
      # Required: module exporting request_hash/1, claim_request/4, complete_request/4.
      idempotency_module: MyApp.Idempotency,

      # Optional: your standardized API error renderer.
      render_error_fun: &MyAppWeb.ApiHelpers.render_error/4,

      # Optional: log prefix.
      log_context: "Entity controller user=#{user_id}"
    ]
  end
end
```

## `Idempotency-Key` header

Default header name is `Idempotency-Key` (read as `idempotency-key` in Plug).

You can override the header per endpoint with `opts[:header]`, but using the
default is recommended.

## Typical client behavior

For mutation endpoints:

1. generate a new key per user action
2. send it in `Idempotency-Key`
3. retry network failures with the same key
4. if you retry with edited payload, use a new key

## Package integration tests (Postgres)

The package includes Postgres-backed integration tests for the real Ecto
lifecycle semantics:

- claim -> processing -> complete -> replay
- payload mismatch conflicts
- stale processing reclaim
- retention purge
- concurrent identical claims

By default these tests are excluded.

Run them by setting:

- `IDEMPOTENCY_KIT_TEST_DATABASE_URL`

Example:

```bash
IDEMPOTENCY_KIT_TEST_DATABASE_URL=postgres://postgres:postgres@localhost:5432/idempotency_kit_test \
  mix test --include integration test/idempotency_kit/store_ecto_integration_test.exs
```

## Replacing DB storage with Redis

You can use Redis instead of the Ecto/Postgres helper.

The callback interface is currently (v0.1.0):

- `request_hash/1`
- `replay_candidate?/4`
- `claim_request/4`
- `complete_request/4`
- `purge_stale_requests/0`

To do this, implement your own store module with `@behaviour IdempotencyKit.Store`:

```elixir
defmodule MyApp.Idempotency.RedisStore do
  @behaviour IdempotencyKit.Store

  # Deterministically hash payload. Same logical payload must hash the same way.
  def request_hash(payload), do: ...

  # Optional read-only pre-check for callers that want to detect an exact retry
  # before claim. Return true only when key + payload hash matches an existing
  # idempotency record. Still call claim_request/4 for the authoritative result.
  def replay_candidate?(user_id, scope, idempotency_key, request_payload), do: ...

  # Main claim state machine:
  # - first request => {:execute, request}
  # - duplicate in-flight => {:processing, request}
  # - completed request with same hash => {:replay, request}
  # - same key + different hash => {:error, :payload_mismatch}
  def claim_request(user_id, scope, idempotency_key, request_hash), do: ...

  # Persist terminal outcome for a claimed request.
  # `request` should be what you returned from claim_request/4.
  # Replayed records must include response_status + response_body.
  def complete_request(request, status, response_status, response_body), do: ...

  # Retention cleanup for old rows/keys.
  def purge_stale_requests(), do: ...
end
```

Then pass your Redis-backed module as `idempotency_module` in controller options.

Note: `replay_candidate?/4` is part of the `IdempotencyKit.Store` behaviour,
but the Phoenix adapter itself only requires:
`request_hash/1`, `claim_request/4`, and `complete_request/4`.

Important requirements for Redis:

1. `claim_request/4` must be atomic (Lua script is recommended).
2. Keep the same lifecycle semantics:
   - first request => `{:execute, request}`
   - duplicate in-flight => `{:processing, request}`
   - same key + different payload => `{:error, :payload_mismatch}`
   - completed request => `{:replay, request}`
   - replay request should include `response_status` and `response_body`
     (atom or string keys) so the Phoenix adapter can render it
3. Support stale processing reclaim (or enforce a bounded processing TTL).
4. Set retention/TTL for old completed requests.

Note: Redis can work very well here, but make sure your durability settings
(AOF/RDB/replication) match your reliability expectations.