README.md

# GenTask

[![Deps Status](https://beta.hexfaktor.org/badge/all/github/Nebo15/gen_task.svg)](https://beta.hexfaktor.org/github/Nebo15/gen_task) [![Hex.pm Downloads](https://img.shields.io/hexpm/dw/gen_task.svg?maxAge=3600)](https://hex.pm/packages/gen_task) [![Latest Version](https://img.shields.io/hexpm/v/gen_task.svg?maxAge=3600)](https://hex.pm/packages/gen_task) [![License](https://img.shields.io/hexpm/l/gen_task.svg?maxAge=3600)](https://hex.pm/packages/gen_task) [![Build Status](https://travis-ci.org/Nebo15/gen_task.svg?branch=master)](https://travis-ci.org/Nebo15/gen_task) [![Coverage Status](https://coveralls.io/repos/github/Nebo15/gen_task/badge.svg?branch=master)](https://coveralls.io/github/Nebo15/gen_task?branch=master)

Generic Task behavior that helps to encapsulate worker errors and recover from them in classic GenStage's.

## Motivation

Whenever you use RabbitMQ or similar tool for background job processing you may want to leverage acknowledgments mechanism.

For example, we spawn a supervisioned GenServer worker each time we receive a job from RabbitMQ. Job `payload` comes with a `tag` that should be used to send acknowledgment (ack) or negative acknowledgment (nack) when it is finished. All nack'ed jobs will be re-scheduled and retried (to reach "at-least once" job processing). Also, RabbitMQ remembers which tasks was sent to a connection and will nack all unacknowledged tasks when connection dies. After task is processed we send ack or nack (depending on business logic) and exit with a normal reason. (This our supervisor restart strategy is `:transient`).

Jobs intensity is limited by `prefetch_count` option that limits maximum amount of unacknowledged jobs that may be processed on a single node at a single moment in time.

But in real life jobs can have bugs or other errors because of third-party services unavailability, in this case GenServer will die. Of course Supervisor will try to restart it, but in most cases of third-party outages it will reach max restart intensity within seconds and die taking all active jobs with itself.

Supervisor gets restarted, but it **won't receive receive any jobs** resulting in a zombie background processing node. This happens because connection is not linked to a individual jobs or their supervisors, and will stay alive after supervisor restart, so RabbitMQ will think that node "is working on all jobs at max capacity" (because of `prefetch_count`) and will not send any additional jobs to it. Additionally we will loose all tags and won't be able to nack died processes within node.

### Possible solutions

  1. Leverage [GenServer `terminate/2`](https://hexdocs.pm/elixir/GenServer.html#c:terminate/2) callback.

  This option is not safe by-default, because process that doesn't trap exits will not call this callback when supervisor is sending exit signal to it (due to supervisor restart).

  2. Linking RabbitMQ client lib channel/connection processes to a workers.

  May be a bad solution because all jobs will be re-scheduled whenever a single job fails, resulting in a many duplicate-processed jobs.

  2. Store tags in a separate process which monitors supervisor and it's workers.

  3. Keep storing tags and job payload within GenStage state, but wrap any unsafe code in a [`Task`](https://hexdocs.pm/elixir/Task.html). [[1]](https://github.com/elixir-lang/gen_stage/issues/131#issuecomment-265758380)

  Internally this looks familiar to pt. 2, but doesn't require us to re-invent supervisor behavior.

## Picked solution description

Each time job is started we spawn a "sentitel" GenServer process that stores job that needs to be processed, this job is later started under `Task.Supervisor` via `async_nolink/2` that allows to process job asynchronously without linking it to the caller process.

To receive job status sentitel process leverages `Task.yield/1` function, that blocks current process until task completes (which saves reduction for sentinel process).

## Installation and usage

It's [available in Hex](https://hex.pm/docs/publish), the package can be installed as:

  1. Add `gen_task` to your list of dependencies in `mix.exs`:

    ```elixir
    def deps do
      [{:gen_task, "~> 0.1.1"}]
    end
    ```

  2. Ensure `gen_task` is started before your application:

    ```elixir
    def application do
      [applications: [:gen_task]]
    end
    ```

  3. Define your business logic and result handling:

    ```elixir
    defmodule MyWorker do
      use GenTask
      require Logger

      # Define business logic
      def run(%{payload: _payload, tag: tag}) do
        # Simulated errors
        if :rand.uniform(2) == 1 do
          throw "Error!"
        end

        Logger.info("Processed job ##{tag}")
        :timer.sleep(100)
        :ok
      end

      # Handle task statuses
      def handle_result(:ok, _result, %{tag: tag} = state) do
        # MyQueue.ack(tag)
        {:stop, :normal, state}
      end

      def handle_result(:exit, reason, %{tag: tag} = state) do
        Logger.error("Task with tag #{inspect tag} terminated with reason: #{inspect reason}")
        # MyQueue.nack(tag)
        {:stop, :normal, state}
      end

      def handle_result(:timeout, task, state) do
        Task.shutdown(task) # Shut down task on yield timeout
        handle_result(:exit, :timeout, state)
      end
    end
    ```

The docs can be found at [https://hexdocs.pm/gen_task](https://hexdocs.pm/gen_task)