defmodule OddJob do
@external_resource readme = Path.join([__DIR__, "../README.md"])
@moduledoc readme
|> File.read!()
|> String.split("<!-- MDOC -->")
|> Enum.fetch!(1)
|> String.replace("#usage", "#module-usage")
|> String.replace("#supervising-job-pools", "#module-supervising-job-pools")
@moduledoc since: "0.1.0"
alias OddJob.{Async, Job, Queue, Registry, Scheduler}
@type pool :: atom
@type not_found :: {:error, :not_found}
@type timer :: non_neg_integer
@type date_time :: DateTime.t()
@type job :: Job.t()
@type queue :: Queue.t()
@type name :: Registry.name()
@type options :: OddJob.Pool.options()
@type start_option :: OddJob.Pool.start_option()
@type child_spec :: OddJob.Pool.child_spec()
@doc false
@doc since: "0.4.0"
defguard is_enumerable(term) when is_list(term) or is_map(term)
@doc false
@doc since: "0.4.0"
defguard is_timer(timer) when is_integer(timer) and timer >= 0
@doc """
A macro for creating jobs with an expressive DSL.
`perform_this/2` works like `perform/2` except it accepts a `do` block instead of an anonymous function.
## Examples
You must `import` or `require` `OddJob` to use macros:
import OddJob
alias MyApp.Job
perform_this Job do
some_work()
some_other_work()
end
perform_this Job, do: something_hard()
"""
@doc since: "0.3.0"
defmacro perform_this(pool, contents)
defmacro perform_this(pool, do: block) do
quote do
OddJob.perform(unquote(pool), fn -> unquote(block) end)
end
end
defmacro perform_this(pool, [{key, val}, {:do, block}]) do
quote do
perform_this(unquote(pool), [{unquote(key), unquote(val)}], do: unquote(block))
end
end
@doc """
A macro for creating jobs with an expressive DSL.
`perform_this/3` accepts a single configuration option as the second argument that will control execution of
the job. The available options provide the functionality of `async_perform/2`, `perform_at/3`,
and `perform_after/3`.
## Options
* `:async` - Passing the atom `:async` as the second argument before the `do` block creates an async
job that can be awaited on. See `async_perform/2`.
* `at: time` - Use this option to schedule the job for a specific `time` in the future. `time` must be
a valid `Time` or `DateTime` struct. See `perform_at/3`.
* `after: timer` - Use this option to schedule the job to perform after the given `timer` has elapsed. `timer`
must be in milliseconds. See `perform_after/3`.
## Examples
import OddJob
alias MyApp.Job
time = ~T[03:00:00.000000]
perform_this Job, at: time do
scheduled_work()
end
perform_this Job, after: 5000, do: something_important()
perform_this Job, :async do
get_data()
end
|> await()
iex> (perform_this OddJob.Pool, :async, do: 10 ** 2) |> await()
100
"""
@doc since: "0.3.0"
defmacro perform_this(pool, option, contents)
defmacro perform_this(pool, [{key, val}], do: block) do
quote do
OddJob.unquote(:"perform_#{key}")(unquote(val), unquote(pool), fn -> unquote(block) end)
end
end
defmacro perform_this(pool, :async, do: block) do
quote do
OddJob.async_perform(unquote(pool), fn -> unquote(block) end)
end
end
@doc """
Returns a specification to start this module under a supervisor.
## Examples
iex> children = [OddJob.child_spec(name: MyApp.Media)]
[%{id: MyApp.Media, start: {OddJob.Pool, :start_link, [[name: MyApp.Media]]}, type: :supervisor}]
iex> {:ok, _pid} = Supervisor.start_link(children, strategy: :one_for_one)
iex> (MyApp.Media |> OddJob.workers() |> length()) == System.schedulers_online()
true
Normally you would start an `OddJob` pool under a supervision tree with a child specification tuple
and not call `child_spec/1` directly.
children = [{OddJob, name: MyApp.Media}]
Supervisor.start_link(children, strategy: :one_for_one)
You must pass a keyword list of options to `child_spec/1`, or as the second element of the
child specification tuple.
## Options
* `:name` - An `atom` that names the pool. This argument is required.
* `:pool_size` - A positive `integer`, the number of concurrent workers in the pool. Defaults to 5 or your application's
config value.
* `:max_restarts` - A positive `integer`, the number of worker restarts allowed in a given timeframe before all
of the workers are restarted. Set a higher number if your jobs have a high rate of expected failure.
Defaults to 5 or your application's config value.
* `:max_seconds` - A positive `integer`, the timeframe in seconds in which `max_restarts` applies. Defaults to 3
or your application's config value. See `Supervisor` for more info on restart intensity options.
See the `Supervisor` module for more about child specs.
"""
@doc since: "0.1.0"
@spec child_spec(options) :: child_spec
def child_spec(opts) when is_list(opts) do
opts
|> validate_name!()
|> OddJob.Pool.child_spec()
end
@doc """
Starts an `OddJob.Pool` supervision tree linked to the current process.
You can start an `OddJob` pool dynamically with `start_link/1` to start processing concurrent jobs:
iex> {:ok, _pid} = OddJob.start_link(name: MyApp.Event, pool_size: 10)
iex> OddJob.async_perform(MyApp.Event, fn -> :do_something end) |> OddJob.await()
:do_something
In most cases you would instead start your pools inside a supervision tree:
children = [{OddJob, name: MyApp.Event, pool_size: 10}]
Supervisor.start_link(children, strategy: :one_for_one)
See `OddJob.child_spec/1` for a list of available options.
See `Supervisor` for more on child specifications and supervision trees.
"""
@doc since: "0.4.0"
@spec start_link(options) :: Supervisor.on_start()
def start_link(opts) when is_list(opts) do
opts
|> validate_name!()
|> OddJob.Pool.start_link()
end
defp validate_name!(opts) do
name = Keyword.fetch!(opts, :name)
unless is_atom(name) do
raise ArgumentError,
message: """
Expected `name` to be an atom. Got #{inspect(name)}
"""
end
opts
end
@doc """
Performs a fire and forget job.
Casts the `function` to the job `pool`, and if a worker is available it is performed
immediately. If no workers are available the job is added to the back of a FIFO queue to be
performed as soon as a worker is available.
Returns `:ok` if the pool process exists at the time of calling, or `{:error, :not_found}`
if it doesn't. The job is casted to the pool, so there can be no guarantee that the pool process
will still exist if it crashes between the beginning of the function call and when the message
is received.
Use this function for interaction with other processes or the outside world where you don't
care about the return results or whether it succeeds.
## Examples
iex> parent = self()
iex> :ok = OddJob.perform(OddJob.Pool, fn -> send(parent, :hello) end)
iex> receive do
...> msg -> msg
...> end
:hello
iex> OddJob.perform(:does_not_exist, fn -> "never called" end)
{:error, :not_found}
"""
@doc since: "0.1.0"
@spec perform(pool, function) :: :ok | not_found
def perform(pool, function) when is_atom(pool) and is_function(function, 0) do
case pool |> queue_name() do
{:error, :not_found} -> {:error, :not_found}
queue -> Queue.perform(queue, function)
end
end
@doc """
Sends a collection of jobs to the pool.
Enumerates over the collection, using each member as the argument to an anonymous function
witn an arity of `1`. The collection of jobs is sent in bulk to the pool where they will be
immediately performed by any available workers. If there are no available workers the jobs
are added to the FIFO queue. Jobs are guaranteed to always process in the same order of the
original collection, and insertion into the queue is atomic. No jobs sent from other processes
can be inserted between any member of the collection.
Returns `:ok` if the pool process exists at the time of calling, or `{:error, :not_found}`
if it doesn't. See `perform/2` for more.
## Examples
iex> caller = self()
iex> OddJob.perform_many(OddJob.Pool, 1..5, fn x -> send(caller, x) end)
iex> for x <- 1..5 do
...> receive do
...> ^x -> x
...> end
...> end
[1, 2, 3, 4, 5]
iex> OddJob.perform_many(:does_not_exist, ["never", "called"], fn x -> x end)
{:error, :not_found}
"""
@doc since: "0.4.0"
@spec perform_many(pool, list | map, function) :: :ok | not_found
def perform_many(pool, collection, function)
when is_atom(pool) and is_enumerable(collection) and is_function(function, 1) do
case pool |> queue_name() do
{:error, :not_found} -> {:error, :not_found}
queue -> Queue.perform_many(queue, collection, function)
end
end
@doc """
Performs an async job that can be awaited on for the result.
Returns the `OddJob.Job.t()` struct if the pool server exists at the time of calling, or
{:error, :not_found} if it doesn't.
Functions similarly to `Task.async/1` and `Task.await/2` with some tweaks. Since the workers in the
`pool` already exist when this function is called and may be busy performing other jobs, it is
unknown at the time of calling when the job will be performed and by which worker process.
An `OddJob.Async.Proxy` is dynamically created and linked and monitored by the caller. The `Proxy`
handles the transactions with the pool and is notified when a worker is assigned. The proxy
links and monitors the worker immediately before performance of the job, forwarding any exits or
crashes to the caller. The worker returns the results to the proxy, which then passes them to the
caller with a `{ref, result}` message. The proxy immediately exits with reason `:normal` after the
results are sent back to the caller.
See `await/2` and `await_many/2` for functions to retrieve the results, and the `Task` module for
more information on the `async/await` patter and when you may want to use it.
## Examples
iex> job = OddJob.async_perform(OddJob.Pool, fn -> :math.exp(100) end)
iex> OddJob.await(job)
2.6881171418161356e43
iex> OddJob.async_perform(:does_not_exist, fn -> "never called" end)
{:error, :not_found}
"""
@doc since: "0.1.0"
@spec async_perform(pool, function) :: job | not_found
def async_perform(pool, function) when is_atom(pool) and is_function(function, 0) do
case pool |> queue_name() do
{:error, :not_found} -> {:error, :not_found}
queue -> Async.perform(pool, queue, function)
end
end
@doc """
Sends a collection of async jobs to the pool.
Returns a list of `OddJob.Job.t()` structs if the pool server exists at the time of
calling, or {:error, :not_found} if it doesn't.
Enumerates over the collection, using each member as the argument to an anonymous function
with an arity of `1`. See `async_perform/2` and `perform/2` for more information.
There's a limit to the number of jobs that can be started with this function that
roughly equals the BEAM's process limit.
## Examples
iex> jobs = OddJob.async_perform_many(OddJob.Pool, 1..5, fn x -> x ** 2 end)
iex> OddJob.await_many(jobs)
[1, 4, 9, 16, 25]
iex> OddJob.async_perform_many(:does_not_exist, ["never", "called"], fn x -> x end)
{:error, :not_found}
"""
@doc since: "0.4.0"
@spec async_perform_many(pool, list | map, function) :: [job] | not_found
def async_perform_many(pool, collection, function)
when is_atom(pool) and is_enumerable(collection) and is_function(function, 1) do
case pool |> queue_name() do
{:error, :not_found} -> {:error, :not_found}
queue -> Async.perform_many(pool, queue, collection, function)
end
end
@doc """
Awaits on an async job reply and returns the results.
The current process and the worker process are linked during execution of the job by an
`OddJob.Async.Proxy`. In case the worker process dies during execution, the current process
will exit with the same reason as the worker.
A timeout, in milliseconds or :infinity, can be given with a default value of 5000. If the
timeout is exceeded, then the current process will exit. If the worker process is linked to the
current process which is the case when a job is started with async, then the worker process will
also exit.
This function assumes the proxy's monitor is still active or the monitor's :DOWN message is in the
message queue. If it has been demonitored, or the message already received, this function will wait
for the duration of the timeout awaiting the message.
## Examples
iex> OddJob.async_perform(OddJob.Pool, fn -> :math.log(2.6881171418161356e43) end)
...> |> OddJob.await()
100.0
"""
@doc since: "0.1.0"
@spec await(job, timeout) :: term
def await(job, timeout \\ 5000) when is_struct(job, Job),
do: Async.await(job, timeout)
@doc """
Awaits replies from multiple async jobs and returns them in a list.
This function receives a list of jobs and waits for their replies in the given time interval.
It returns a list of the results, in the same order as the jobs supplied in the `jobs` input argument.
The current process and each worker process are linked during execution of the job by an
`OddJob.Async.Proxy`. If any of the worker processes dies, the caller process will exit with the same
reason as that worker.
A timeout, in milliseconds or `:infinity`, can be given with a default value of `5000`. If the timeout
is exceeded, then the caller process will exit. Any worker processes that are linked to the caller process
(which is the case when a job is started with `async_perform/2`) will also exit.
This function assumes the proxies' monitors are still active or the monitor's :DOWN message is in the
message queue. If any jobs have been demonitored, or the message already received, this function will
wait for the duration of the timeout.
## Examples
iex> job1 = OddJob.async_perform(OddJob.Pool, fn -> 2 ** 2 end)
iex> job2 = OddJob.async_perform(OddJob.Pool, fn -> 3 ** 2 end)
iex> [job1, job2] |> OddJob.await_many()
[4, 9]
"""
@doc since: "0.2.0"
@spec await_many([job], timeout) :: [term]
def await_many(jobs, timeout \\ 5000) when is_list(jobs),
do: Async.await_many(jobs, timeout)
@doc """
Sends a job to the `pool` after the given `timer` has elapsed.
`timer` is an integer that indicates the number of milliseconds that should elapse before
the job is sent to the pool. The timed message is executed under a separate supervised process,
so if the current process crashes the job will still be performed. A timer reference is returned,
which can be read with `Process.read_timer/1` or cancelled with `OddJob.cancel_timer/1`. Returns
`{:error, :not_found}` if the `pool` server does not exist at the time this function is
called.
## Examples
timer_ref = OddJob.perform_after(5000, OddJob.Pool, fn -> deferred_job() end)
Process.read_timer(timer_ref)
#=> 2836 # time remaining before job is sent to the pool
OddJob.cancel_timer(timer_ref)
#=> 1175 # job has been cancelled
timer_ref = OddJob.perform_after(5000, OddJob.Pool, fn -> deferred_job() end)
Process.sleep(6000)
OddJob.cancel_timer(timer_ref)
#=> false # too much time has passed to cancel the job
iex> OddJob.perform_after(5000, :does_not_exist, fn -> "never called" end)
{:error, :not_found}
"""
@doc since: "0.2.0"
@spec perform_after(timer, pool, function) :: reference | not_found
def perform_after(timer, pool, fun)
when is_atom(pool) and is_timer(timer) and is_function(fun, 0) do
case pool |> queue_name() do
{:error, :not_found} -> {:error, :not_found}
_ -> Scheduler.perform(timer, pool, fun)
end
end
@doc """
Sends a collection of jobs to the `pool` after the given `timer` has elapsed.
Enumerates over the collection, using each member as the argument to an anonymous function
with an arity of `1`. Returns a single timer `reference`. The `timer` is watched by a single
process that will send the entire batch of jobs to the pool when the timer expires. See
`perform_after/3` for more information about arguments and timers.
Consider using this function instead of `perform_after/3` when scheduling a large batch of jobs.
`perform_after/3` starts a separate scheduler process per job, whereas this function starts a
single scheduler process for the whole batch.
Returns `{:error, :not_found}` if the `pool` does not exist at the time the function
is called.
## Examples
timer_ref = OddJob.perform_many_after(5000, OddJob.Pool, 1..5, fn x -> x ** 2 end)
#=> #Reference<0.1431903625.286261254.39156>
# fewer than 5 seconds pass
OddJob.cancel_timer(timer_ref)
#=> 2554 # returns the time remaining. All jobs in the collection have been canceled
timer_ref = OddJob.perform_many_after(5000, OddJob.Pool, 1..5, fn x -> x **2 end)
#=> #Reference<0.1431903625.286261254.39178>
# more than 5 seconds pass
OddJob.cancel_timer(timer_ref)
#=> false # too much time has passed, all jobs have been queued
iex> OddJob.perform_many_after(5000, :does_not_exist, ["never", "called"], fn x -> x end)
{:error, :not_found}
"""
@doc since: "0.4.0"
@spec perform_many_after(timer, pool, list | map, function) :: [job] | not_found
def perform_many_after(timer, pool, collection, function)
when is_atom(pool) and is_timer(timer) and is_enumerable(collection) and
is_function(function, 1) do
case pool |> queue_name() do
{:error, :not_found} -> {:error, :not_found}
_ -> Scheduler.perform_many(timer, pool, collection, function)
end
end
@doc """
Sends a job to the `pool` at the given `time`.
`time` must be a `DateTime` struct for a time in the future. The timer is executed
under a separate supervised process, so if the caller crashes the job will still be performed.
A timer reference is returned, which can be read with `Process.read_timer/1` or canceled with
`OddJob.cancel_timer/1`. Returns `{:error, :not_found}` if the `pool` server does not exist at
the time this function is called.
## Examples
time = DateTime.utc_now() |> DateTime.add(600, :second)
OddJob.perform_at(time, OddJob.Pool, fn -> scheduled_job() end)
iex> time = DateTime.utc_now() |> DateTime.add(600, :second)
iex> OddJob.perform_at(time, :does_not_exist, fn -> "never called" end)
{:error, :not_found}
"""
@doc since: "0.2.0"
@spec perform_at(date_time, pool, function) :: reference | not_found
def perform_at(date_time, pool, fun)
when is_atom(pool) and is_struct(date_time, DateTime) and is_function(fun, 0) do
case pool |> queue_name() do
{:error, :not_found} ->
{:error, :not_found}
_ ->
date_time
|> validate_date_time!()
|> Scheduler.perform(pool, fun)
end
end
@doc """
Sends a collection of jobs to the `pool` at the given `time`.
Enumerates over the collection, using each member as the argument to an anonymous function
with an arity of `1`. Returns a single timer `reference`. The timer is watched by a single
process that will send the entire batch of jobs to the pool when the timer expires. See
`perform_at/3` for more information about arguments and timers.
Consider using this function instead of `perform_at/3` when scheduling a large batch of jobs.
`perform_at/3` starts a separate scheduler process per job, whereas this function starts a
single scheduler process for the whole batch.
Returns `{:error, :not_found}` if the `pool` does not exist at the time the function
is called.
## Examples
time = DateTime.utc_now() |> DateTime.add(5, :second)
timer_ref = OddJob.perform_many_at(time, OddJob.Pool, 1..5, fn x -> x ** 2 end)
#=> #Reference<0.1431903625.286261254.39156>
# fewer than 5 seconds pass
OddJob.cancel_timer(timer_ref)
#=> 2554 # returns the time remaining. All jobs in the collection have been canceled
time = DateTime.utc_now() |> DateTime.add(5, :second)
timer_ref = OddJob.perform_many_at(time, OddJob.Pool, 1..5, fn x -> x **2 end)
#=> #Reference<0.1431903625.286261254.39178>
# more than 5 seconds pass
OddJob.cancel_timer(timer_ref)
#=> false # too much time has passed, all jobs have been queued
iex> time = DateTime.utc_now() |> DateTime.add(5, :second)
iex> OddJob.perform_many_at(time, :does_not_exist, ["never", "called"], fn x -> x end)
{:error, :not_found}
"""
@doc since: "0.4.0"
@spec perform_many_at(date_time, pool, list | map, function) :: reference | not_found
def perform_many_at(date_time, pool, collection, function)
when is_struct(date_time, DateTime) and is_atom(pool) and is_enumerable(collection) and
is_function(function, 1) do
case pool |> queue_name() do
{:error, :not_found} ->
{:error, :not_found}
_ ->
date_time
|> validate_date_time!()
|> Scheduler.perform_many(pool, collection, function)
end
end
defp validate_date_time!(date_time) do
now = DateTime.utc_now()
case DateTime.diff(date_time, now, :millisecond) do
diff when diff >= 0 ->
diff
_ ->
raise ArgumentError,
message: "invalid DateTime: must be after #{now}, got: #{date_time}"
end
end
@doc """
Cancels a scheduled job.
`timer_ref` is the unique reference returned by `perform_at/3` or `perform_after/3`. This function
returns the number of milliseconds left in the timer when cancelled, or `false` if the timer already
expired. If the return is `false` you can assume that the job has already been sent to the pool
for execution.
NOTE: Cancelling the timer with this function ensures that the job is never executed *and* that
the scheduler process is exited and not left "hanging". Using `Process.cancel_timer/1` will also
cancel execution, but may leave hanging processes. A hanging scheduler process will eventually
timeout, but not until one second after the expiration of the original timer.
## Examples
iex> ref = OddJob.perform_after(500, OddJob.Pool, fn -> :never end)
iex> time = OddJob.cancel_timer(ref)
iex> is_integer(time)
true
iex> ref = OddJob.perform_after(10, OddJob.Pool, fn -> :never end)
iex> Process.sleep(11)
iex> OddJob.cancel_timer(ref)
false
"""
@doc since: "0.2.0"
@spec cancel_timer(reference) :: non_neg_integer | false
def cancel_timer(timer_ref) when is_reference(timer_ref) do
Scheduler.cancel_timer(timer_ref)
end
@doc """
Returns a two element tuple `{pid, state}` with the `pid` of the job `queue` as the first element.
The second element is the `state` of the queue represented by an `OddJob.Queue` struct.
The queue `state` has the following fields:
* `:workers` - A list of the `pid`s of all workers in the pool
* `:assigned` - The `pid`s of the workers currently assigned to jobs
* `:jobs` - A list of `OddJob.Job` structs in the queue awaiting execution
* `:pool` - The Elixir term naming the `pool` that the queue belongs to
Returns `{:error, :not_found}` if the `queue` server does not exist at
the time this function is called.
## Examples
iex> {pid, %OddJob.Queue{pool: pool}} = OddJob.queue(OddJob.Pool)
iex> is_pid(pid)
true
iex> pool
OddJob.Pool
iex> OddJob.queue(:does_not_exist)
{:error, :not_found}
"""
@doc since: "0.1.0"
@spec queue(pool) :: {pid, queue} | not_found
def queue(pool) when is_atom(pool) do
name = OddJob.Utils.queue_name(pool)
case GenServer.whereis(name) do
nil ->
{:error, :not_found}
pid ->
state = Queue.state(name)
{pid, state}
end
end
@doc """
Returns the registered name in `:via` for the `queue` process. See the
`Registry` module for more information on `:via` process name registration.
Returns `{:error, :not_found}` if the `queue` server does not exist at
the time this function is called.
## Examples
iex> OddJob.queue_name(OddJob.Pool)
{:via, Registry, {OddJob.Registry, {OddJob.Pool, :queue}}}
iex> OddJob.queue_name(:does_not_exist)
{:error, :not_found}
"""
@doc since: "0.4.0"
@spec queue_name(pool) :: name | not_found
def queue_name(pool) when is_atom(pool) do
name = OddJob.Utils.queue_name(pool)
case GenServer.whereis(name) do
nil -> {:error, :not_found}
_ -> name
end
end
@doc """
Returns the pid of the `OddJob.Pool.Supervisor` that supervises the job `pool`'s workers.
Returns `{:error, :not_found}` if the process does not exist at
the time this function is called.
There is no guarantee that the process identified by the `pid` will still be alive
after the results are returned, as it could exit or be killed or restarted at any time.
Use `pool_supervisor_name/1` to obtain the persistent registered name of the supervisor.
## Examples
OddJob.pool_supervisor(OddJob.Pool)
#=> #PID<0.239.0>
iex> OddJob.pool_supervisor(:does_not_exist)
{:error, :not_found}
"""
@doc since: "0.4.0"
@spec pool_supervisor(pool) :: pid | not_found
def pool_supervisor(pool) when is_atom(pool) do
case pool |> pool_supervisor_name() |> GenServer.whereis() do
nil -> {:error, :not_found}
pid -> pid
end
end
@doc """
Returns the registered name in `:via` of the `OddJob.Pool.Supervisor` that
supervises the `pool`'s workers
Returns `{:error, :not_found}` if the process does not exist at
the time this function is called.
## Examples
iex> OddJob.pool_supervisor_name(OddJob.Pool)
{:via, Registry, {OddJob.Registry, {OddJob.Pool, :pool_sup}}}
iex> OddJob.pool_supervisor_name(:does_not_exist)
{:error, :not_found}
"""
@doc since: "0.4.0"
@spec pool_supervisor_name(pool) :: name | not_found
def pool_supervisor_name(pool) when is_atom(pool) do
name = OddJob.Utils.pool_supervisor_name(pool)
case GenServer.whereis(name) do
nil -> {:error, :not_found}
_ -> name
end
end
@doc """
Returns a list of `pid`s for the specified worker pool.
There is no guarantee that the processes will still be alive after the
results are returned, as they could exit or be killed at any time.
Returns `{:error, :not_found}` if the `pool` does not exist at
the time this function is called.
## Examples
OddJob.workers(OddJob.Pool)
#=> [#PID<0.105.0>, #PID<0.106.0>, #PID<0.107.0>, #PID<0.108.0>, #PID<0.109.0>]
iex> OddJob.workers(:does_not_exist)
{:error, :not_found}
"""
@doc since: "0.1.0"
@spec workers(pool) :: [pid] | not_found
def workers(pool) do
case pool |> OddJob.Utils.pool_supervisor_name() |> GenServer.whereis() do
nil ->
{:error, :not_found}
sup ->
for {_id, pid, :worker, [OddJob.Pool.Worker]} <- Supervisor.which_children(sup), do: pid
end
end
@doc """
Returns the `pid` of the top level supervisor in the `pool` supervision tree, `nil`
if a process can't be found.
## Example
iex> {:ok, pid} = OddJob.start_link(name: :whereis)
iex> OddJob.whereis(:whereis) == pid
true
iex> OddJob.whereis(:where_it_is_not) == nil
true
"""
@doc since: "0.4.0"
@spec whereis(pool) :: pid | nil
def whereis(pool) do
GenServer.whereis(pool)
end
end