defmodule WeChat.Refresher.Default do
@moduledoc """
AccessToken 刷新器 - 用于定时刷新 AccessToken
[官方说明](https://developers.weixin.qq.com/doc/offiaccount/Getting_Started/Getting_Started_Guide.html#_1-5-%E9%87%8D%E8%A6%81%E4%BA%8B%E6%83%85%E6%8F%90%E5%89%8D%E4%BA%A4%E4%BB%A3)
本模块为默认的刷新器
需要修改为自定义的刷新器,可以这样配置:
config :wechat, :refresher, YourRefresher
修改刷新器的配置,支持多种配置方式:
### 方式1
config :wechat, :refresh_settings, [ClientA, ClientB, ClientC]
以上配置会自动为三个 `Client` 定时刷新 `AccessToken` ,
默认会在 `AccessToken` 过期前 `30` 分钟刷新,
`AccessToken` 刷新失败的重试间隔为 `1` 分钟,
可以通过接口 获取默认的 `AccessToken` 刷新列表:`WeChat.Refresher.DefaultSettings.get_refresh_options_by_client/1`
### 方式2
config :wechat, :refresh_settings, [{ClientA, client_setting}, ClientB, ClientC]
# or
config :wechat, :refresh_settings, %{ClientA => client_setting, ClientB => client_setting, ClientC => client_setting}
`client_setting` 配置说明见:`t:client_setting/0`
为了适应 `Storage` 在 `Refresher` 启动之后才启动,可以开启延时启动刷新:
config :wechat, #{inspect(__MODULE__)}, wait_for_signal: true
当所有的 `Storage` 都已经完成,可以即可通过 `#{inspect(__MODULE__)}.start_monitor/0` 方法刷新 `AccessToken`
不配置默认为立即启动刷新
"""
use GenServer
require Logger
alias WeChat.Storage.Adapter, as: StorageAdapter
alias WeChat.{Utils, TokenChecker, Storage.Cache, Refresher.DefaultSettings}
# 过期前 30 分钟刷新
@refresh_before_expired 30 * 60
# 刷新失败重试时间间隔 1 分钟
@refresh_retry_interval 60
@default_state %{wait_for_signal: false, components: %{}}
@typedoc """
在 `AccessToken` 超时前多少秒刷新,单位:秒
如果`server_role` = `hub`, `hub server` 的值请大于 `hub client`
"""
@type refresh_before_expired :: non_neg_integer
@typedoc "刷新 `AccessToken` 失败的重试间隔,单位:秒"
@type refresh_retry_interval :: non_neg_integer
@typedoc """
option
- `:refresh_before_expired`: 在 `AccessToken` 超时前多少秒刷新,单位:秒,可选,
为保证 `hub` & `hub_client` 刷新正常,请保持两者的时间一致;
`server_role=hub_client` 时, 默认值:`#{@refresh_before_expired} + 30` 秒;
其余角色默认值:`#{@refresh_before_expired}` 秒
- `:refresh_retry_interval`: 刷新 `AccessToken` 失败的重试间隔,单位:秒,可选,默认值:`#{@refresh_retry_interval * 1000}` 秒
- `:refresh_options`: 刷新 `AccessToken` 配置,可选,默认值:`WeChat.Refresher.DefaultSettings.get_refresh_options_by_client/1` 的输出结果
"""
@type client_setting ::
%{
optional(:refresh_before_expired) => refresh_before_expired,
optional(:refresh_retry_interval) => refresh_retry_interval,
optional(:refresh_options) => DefaultSettings.refresh_options()
}
@type client_settings :: [WeChat.client()] | %{WeChat.client() => client_setting}
@type state :: %{
:wait_for_signal => boolean,
:clients => [WeChat.client()],
WeChat.client() => client_setting
}
@spec start_monitor() :: :ok
def start_monitor do
GenServer.call(__MODULE__, :start_monitor)
end
@spec add(WeChat.client(), client_setting) :: :ok
def add(client, opts \\ %{}) do
GenServer.call(__MODULE__, {:add, client, Map.new(opts)})
end
@spec append_work_agent(WeChat.client(), WeChat.Work.Agent.t()) :: :ok | :client_not_in
def append_work_agent(client, agent) do
GenServer.call(__MODULE__, {:append_work_agent, client, agent})
end
@spec remove(WeChat.client()) :: :ok | :not_found
def remove(client) do
GenServer.call(__MODULE__, {:remove, client})
end
@spec refresh(WeChat.client()) :: :ok | :not_found
def refresh(client) do
GenServer.call(__MODULE__, {:refresh, client})
end
@spec refresh(WeChat.client(), StorageAdapter.store_id(), StorageAdapter.store_key()) ::
:ok | :not_found
def refresh(client, store_id, store_key) do
GenServer.call(__MODULE__, {:refresh, client, store_id, store_key})
end
@spec refresh_component(WeChat.component_appid(), StorageAdapter.store_key()) ::
:ok | :not_found
def refresh_component(component_appid, store_key) do
GenServer.call(__MODULE__, {:refresh_component, component_appid, store_key})
end
@spec refresh_key(
WeChat.client(),
StorageAdapter.store_id(),
StorageAdapter.store_key(),
StorageAdapter.value(),
expires :: integer()
) :: :ok
def refresh_key(client, store_id, store_key, value, expires) do
GenServer.cast(__MODULE__, {:refresh_key, client, store_id, store_key, value, expires})
end
@spec client_options(WeChat.client()) :: client_setting | nil
def client_options(client), do: GenServer.call(__MODULE__, {:client_options, client})
@spec clients() :: [WeChat.client()]
def clients, do: GenServer.call(__MODULE__, :clients)
@spec components() :: %{
WeChat.component_appid() => %{
keys: [StorageAdapter.store_key()],
clients: [WeChat.client()]
}
}
def components, do: GenServer.call(__MODULE__, :components)
@spec start_link(client_settings) :: GenServer.on_start()
def start_link(client_settings \\ %{}) do
GenServer.start_link(__MODULE__, client_settings, name: __MODULE__)
end
@impl true
def init(client_settings) do
state =
Map.new(client_settings, fn
client when is_atom(client) ->
{client, init_client_options(client, %{})}
{client, opts} ->
{client, init_client_options(client, opts)}
end)
clients = Map.keys(state)
options =
:wechat
|> Application.get_env(__MODULE__, %{})
|> Map.new()
state =
@default_state
|> Map.merge(options)
|> Map.merge(state)
|> Map.put(:clients, clients)
continue =
if state.wait_for_signal do
:wait_for_signal
else
:start_monitor
end
{:ok, state, {:continue, continue}}
end
@impl true
def handle_continue(:wait_for_signal, state) do
{:noreply, state}
end
def handle_continue(:start_monitor, state) do
state = Enum.reduce(state.clients, state, &start_monitor_client/2)
{:noreply, %{state | wait_for_signal: false}}
end
@impl true
def handle_call(:start_monitor, _from, state) do
{:reply, :ok, state, {:continue, :start_monitor}}
end
def handle_call({:add, client, options}, _from, state) do
state =
if client in state.clients do
remove_client(state, client)
else
state
end
|> add_client(client, options)
{:reply, :ok, state}
end
def handle_call({:append_work_agent, client, agent}, _from, state) do
if client in state.clients do
state = append_client_agent(state, client, agent)
{:reply, :ok, state}
else
{:reply, :client_not_in, state}
end
end
def handle_call({:remove, client}, _from, state) do
if client in state.clients do
state = remove_client(state, client)
{:reply, :ok, state}
else
{:reply, :not_found, state}
end
end
def handle_call({:refresh, client}, _from, state) do
with opts when is_map(opts) <- Map.get(state, client) do
opts = do_refresh(client, opts)
state = Map.put(state, client, opts)
{:reply, :ok, state}
else
_ ->
{:reply, :not_found, state}
end
end
def handle_call({:refresh, client, store_id, store_key}, _from, state) do
with opts when is_map(opts) <- Map.get(state, client) do
opts = do_refresh(client, store_id, store_key, opts)
state = Map.put(state, client, opts)
{:reply, :ok, state}
else
_ ->
{:reply, :not_found, state}
end
end
def handle_call({:refresh_component, component_appid, store_key}, _from, state) do
with component when is_map(component) <- Map.get(state.components, component_appid),
true <- store_key in component.keys do
client =
Enum.find(component.clients, fn c ->
Map.get(state, c)
|> Map.get(:refresh_options)
|> Enum.find(&match?({{^component_appid, ^store_key}, _, _}, &1))
end)
opts = do_refresh(client, component_appid, store_key, state[client])
state = Map.put(state, client, opts)
{:reply, :ok, state}
else
_ -> {:reply, :not_found, state}
end
end
def handle_call({:client_options, client}, _from, state) do
{:reply, Map.get(state, client), state}
end
def handle_call(:clients, _from, state) do
{:reply, state.clients, state}
end
def handle_call(:components, _from, state) do
{:reply, state.components, state}
end
@impl true
def handle_cast({:refresh_key, client, store_id, store_key, value, expires}, state) do
cache_and_store(store_id, store_key, value, expires, client)
{:ok, state}
end
@impl true
def handle_info(
{:timeout, _timer,
{:refresh_token, %{store_id: store_id, store_key: store_key, client: client}}},
state
) do
case Map.get(state, client) do
opts when opts != nil ->
key = {store_id, store_key}
{{_key, fun, _timer}, refresh_options} = List.keytake(opts.refresh_options, key, 0)
timer = refresh_token(store_id, store_key, fun, client, opts)
state =
Map.put(state, client, %{opts | refresh_options: [{key, fun, timer} | refresh_options]})
{:noreply, state}
_ ->
{:noreply, state}
end
end
defp cache_and_store(store_id, store_key, value, expires, client) do
store_map = %{"value" => value, "expired_time" => expires}
Cache.put_cache(store_id, store_key, value)
Cache.put_cache({:store_map, store_id}, store_key, store_map)
with storage when storage != nil <- client.storage(),
# 因为 hub_client 是从 storage 中读取 token 的,因此不需要再做写入操作
true <- client.server_role() != :hub_client do
result = storage.store(store_id, store_key, store_map)
Logger.info(
"Call #{inspect(storage)}.store(#{store_id}, #{store_key}, #{inspect(store_map)}) => #{inspect(result)}."
)
end
end
defp restore_and_cache(store_id, store_key, client) do
with storage when storage != nil <- client.storage(),
{:ok, %{"value" => value, "expired_time" => expires} = store_map} <-
storage.restore(store_id, store_key) do
diff = expires - Utils.now_unix()
if diff > 0 do
Cache.put_cache(store_id, store_key, value)
Cache.put_cache({:store_map, store_id}, store_key, store_map)
Logger.info(
"Call #{inspect(storage)}.restore(#{store_id}, #{store_key}) succeed, the expires_in is: #{diff}s."
)
{true, diff}
else
Logger.info(
"Call #{inspect(storage)}.restore(#{store_id}, #{store_key}) succeed, but the token expired."
)
false
end
else
nil ->
false
error ->
Logger.warning(
"Call #{inspect(client.storage())}.restore(#{store_id}, #{store_key}) error: #{inspect(error)}."
)
false
end
end
defp init_client_options(client, options) do
Logger.info(
"Initialize WeChat Client: #{inspect(client)} by AppType: #{client.app_type()}, Storage: #{inspect(client.storage())}."
)
default_refresh_before_expired =
if match?(:hub_client, client.server_role()) do
@refresh_before_expired + 30
else
@refresh_before_expired
end
options =
Map.merge(options, %{
refresh_before_expired:
Map.get(options, :refresh_before_expired, default_refresh_before_expired),
refresh_retry_interval:
Map.get(options, :refresh_retry_interval, @refresh_retry_interval) * 1000
})
Cache.set_client(client)
refresh_options = init_refresh_options(client, options)
Map.put(options, :refresh_options, refresh_options)
end
defp start_monitor_client(client, state) do
Logger.info("Monitoring WeChat Client: #{inspect(client)}.")
options = Map.get(state, client)
{refresh_options, state} = filter_duplicate_component(client, options.refresh_options, state)
refresh_options = start_refresh_timers(client, options, refresh_options)
options = %{options | refresh_options: refresh_options}
TokenChecker.maybe_add_client(client, refresh_options)
Map.put(state, client, options)
end
defp filter_duplicate_component(client, refresh_options, state) do
if client.by_component?() do
component_appid = client.component_appid()
{refresh_options, components} =
Enum.reduce(refresh_options, {[], state.components}, fn
{{^component_appid, _store_key}, _fun, _timer} = option, {acc, components} ->
update_component(option, client, acc, components)
record, {acc, components} ->
{[record | acc], components}
end)
{Enum.reverse(refresh_options), %{state | components: components}}
else
{refresh_options, state}
end
end
defp update_component(
option = {{component_appid, store_key}, _fun, _timer},
client,
refresh_options,
components
) do
{refresh_options, component} =
if component = Map.get(components, component_appid) do
if store_key in component.keys do
Logger.info(
"Ignore refresh_option: #{inspect(option)} for #{inspect(client)}, because duplicated."
)
{refresh_options,
%{component | clients: Utils.uniq_and_sort([client | component.clients])}}
else
{[option | refresh_options],
%{
component
| keys: Utils.uniq_and_sort([store_key | component.keys]),
clients: Utils.uniq_and_sort([client | component.clients])
}}
end
else
{[option | refresh_options], %{keys: [store_key], clients: [client]}}
end
{refresh_options, Map.put(components, component_appid, component)}
end
defp do_refresh(client, %{refresh_options: refresh_options} = opts) do
Logger.info(
"Refreshing WeChat Client: #{inspect(client)} with list: #{inspect(refresh_options)}."
)
refresh_options =
for {{store_id, store_key}, fun, timer} <- refresh_options do
cancel_timer(timer)
timer = refresh_token(store_id, store_key, fun, client, opts)
{{store_id, store_key}, fun, timer}
end
%{opts | refresh_options: refresh_options}
end
defp do_refresh(client, store_id, store_key, %{refresh_options: refresh_options} = opts) do
Logger.info(
"Refreshing WeChat Client: #{inspect(client)} with list: #{inspect(refresh_options)}."
)
refresh_options =
for {{id, key}, fun, timer} = option <- refresh_options do
case {id, key} do
{^store_id, ^store_key} ->
cancel_timer(timer)
timer = refresh_token(store_id, store_key, fun, client, opts)
{{store_id, store_key}, fun, timer}
_ ->
option
end
end
%{opts | refresh_options: refresh_options}
end
defp cancel_timer(nil), do: :ignore
defp cancel_timer(timer), do: :erlang.cancel_timer(timer)
defp refresh_token(store_id, store_key, fun, client, options) do
case fun.(client) do
{:ok, list, expires_in} when is_list(list) ->
now = Utils.now_unix()
Enum.each(list, fn {key, token, expires_in} ->
expires = now + expires_in
cache_and_store(store_id, key, token, expires, client)
end)
Logger.info(
"Refresh appid: #{store_id}, key: #{store_key} succeed, get expires_in: #{expires_in}s."
)
((expires_in - options.refresh_before_expired) * 1000)
|> max(options.refresh_retry_interval)
{:ok, token, expires_in} ->
expires = Utils.now_unix() + expires_in
cache_and_store(store_id, store_key, token, expires, client)
Logger.info(
"Refresh appid: #{store_id}, key: #{store_key} succeed, get expires_in: #{expires_in}s."
)
((expires_in - options.refresh_before_expired) * 1000)
|> max(options.refresh_retry_interval)
error ->
refresh_retry_interval = options.refresh_retry_interval
Logger.warning(
"Refresh appid: #{store_id}, key: #{store_key} error: #{inspect(error)}, will be retry again #{refresh_retry_interval}s later."
)
refresh_retry_interval
end
|> start_refresh_token_timer(store_id, store_key, client)
end
defp start_refresh_timers(client, options, refresh_options) do
for {{store_id, store_key}, fun, _timer} <- refresh_options do
timer =
case restore_and_cache(store_id, store_key, client) do
false ->
refresh_token(store_id, store_key, fun, client, options)
{true, expires_in} ->
((expires_in - options.refresh_before_expired) * 1000)
|> max(0)
|> start_refresh_token_timer(store_id, store_key, client)
end
{{store_id, store_key}, fun, timer}
end
end
defp start_refresh_token_timer(time, store_id, store_key, client) do
Logger.info("Start Refresh Timer for appid: #{store_id}, key: #{store_key}, time: #{time}ms.")
info = %{store_id: store_id, store_key: store_key, client: client}
:erlang.start_timer(time, self(), {:refresh_token, info})
end
defp init_refresh_options(client, opts) do
refresh_options =
case Map.get(opts, :refresh_options) do
fun when is_function(fun, 0) ->
fun.()
fun when is_function(fun, 1) ->
fun.(client)
refresh_options when is_list(refresh_options) ->
refresh_options
nil ->
DefaultSettings.get_refresh_options_by_client(client)
end
for {store_id, store_key, fun} <- refresh_options do
{{store_id, store_key}, fun, nil}
end
end
defp add_client(state, client, options) do
options = init_client_options(client, options)
state =
state
|> Map.put(client, options)
|> Map.put(:clients, [client | state.clients])
if state.wait_for_signal do
state
else
start_monitor_client(client, state)
end
end
defp remove_client(state, client) do
Logger.info("Removing WeChat Client: #{inspect(client)}.")
{get, state} = Map.pop(state, client)
clients = List.delete(state.clients, client)
with {_client, opts} <- get do
Enum.each(opts.refresh_options, fn {_key, _fun, timer} ->
cancel_timer(timer)
end)
end
%{state | clients: clients}
end
defp append_client_agent(state, client, agent) do
options = Map.fetch!(state, client)
Cache.set_work_agent(client, agent)
refresh_options =
DefaultSettings.work_refresh_options(client, agent)
|> Enum.map(fn {store_id, store_key, fun} ->
{{store_id, store_key}, fun, nil}
end)
if state.wait_for_signal do
Map.put(options, :refresh_options, options.refresh_options ++ refresh_options)
|> then(&Map.put(state, client, &1))
else
Logger.info("Monitoring WeChat work agent: #{agent.name} for Client: #{inspect(client)}.")
refresh_options = start_refresh_timers(client, options, refresh_options)
TokenChecker.maybe_add_client(client, refresh_options)
Map.put(options, :refresh_options, options.refresh_options ++ refresh_options)
|> then(&Map.put(state, client, &1))
end
end
end