changed
README.md
|
|
@@ -14,7 +14,7 @@ The package can be installed by adding `opus` to your list of dependencies in `m
|
|
14
14
|
|
|
15
15
|
```elixir
|
|
16
16
|
def deps do
|
|
17
|
- [{:opus, "~> 0.7"}]
|
|
17
|
+ [{:opus, "~> 0.8"}]
|
|
18
18
|
end
|
|
19
19
|
```
|
|
20
20
|
|
|
|
@@ -177,8 +177,10 @@ options:
|
|
177
177
|
* `:raise`: A list of exceptions to not rescue. Defaults to `false`
|
|
178
178
|
which converts all exceptions to `{:error, %Opus.PipelineError{}}`
|
|
179
179
|
values halting the pipeline.
|
|
180
|
- * `:error_message`: A String or Atom to replace the original error when
|
|
181
|
- a stage fails.
|
|
180
|
+ * `:error_message`: An error message to replace the original error when a
|
|
181
|
+ stage fails. It can be a String or Atom, which will be used directly in place
|
|
182
|
+ of the original message, or an anonymous function, which receives the input
|
|
183
|
+ of the failed stage and must return the error message to be used.
|
|
182
184
|
* `:retry_times`: How many times to retry a failing stage, before
|
|
183
185
|
halting the pipeline.
|
|
184
186
|
* `:retry_backoff`: A backoff function to provide delay values for
|
changed
hex_metadata.config
|
|
@@ -27,5 +27,5 @@
|
|
27
27
|
{<<"name">>,<<"telemetry">>},
|
|
28
28
|
{<<"optional">>,true},
|
|
29
29
|
{<<"repository">>,<<"hexpm">>},
|
|
30
|
- {<<"requirement">>,<<"~> 0.4">>}]]}.
|
|
31
|
- {<<"version">>,<<"0.7.0">>}.
|
|
30
|
+ {<<"requirement">>,<<"~> 0.4 or ~> 1.0">>}]]}.
|
|
31
|
+ {<<"version">>,<<"0.8.1">>}.
|
changed
lib/opus.ex
|
|
@@ -1,5 +1,7 @@
|
|
1
1
|
defmodule Opus do
|
|
2
2
|
@moduledoc """
|
|
3
3
|
Documentation for Opus.
|
|
4
|
+
|
|
5
|
+ see `Opus.Pipeline`.
|
|
4
6
|
"""
|
|
5
7
|
end
|
changed
lib/opus/pipeline.ex
|
|
@@ -3,20 +3,64 @@ defmodule Opus.Pipeline do
|
|
3
3
|
Defines a pipeline.
|
|
4
4
|
|
|
5
5
|
A pipeline defines a single entry point function to start running the defined stages.
|
|
6
|
- A sample pipeline can be:
|
|
6
|
+
|
|
7
|
+ A simple pipeline module can be defined as:
|
|
7
8
|
|
|
8
9
|
defmodule ArithmeticPipeline do
|
|
9
10
|
use Opus.Pipeline
|
|
10
11
|
|
|
11
|
- step :to_integer, &:erlang.binary_to_integer/1
|
|
12
|
+ step :to_integer, with: &:erlang.binary_to_integer/1
|
|
12
13
|
step :double, with: & &1 * 2
|
|
13
14
|
end
|
|
14
15
|
|
|
16
|
+ # Invoke it with:
|
|
17
|
+ ArithmeticPipeline.call "42"
|
|
18
|
+ # => {:ok, 84}
|
|
19
|
+
|
|
15
20
|
The pipeline can be run calling a `call/1` function which is defined by `Opus.Pipeline`.
|
|
16
21
|
Pipelines are intended to have a single parameter and always return a tagged tuple `{:ok, value} | {:error, error}`.
|
|
17
22
|
A stage returning `{:error, error}` halts the pipeline. The error value is an `Opus.PipelineError` struct which
|
|
18
23
|
contains useful information to detect where the error was caused and why.
|
|
19
24
|
|
|
25
|
+ ## Stage Definition
|
|
26
|
+
|
|
27
|
+ The following types of stages can be defined:
|
|
28
|
+
|
|
29
|
+ * `step` - see `Opus.Pipeline.Stage.Step`
|
|
30
|
+ * `tee` - see `Opus.Pipeline.Stage.Tee`
|
|
31
|
+ * `check` - see `Opus.Pipeline.Stage.Check`
|
|
32
|
+ * `link` - see `Opus.Pipeline.Stage.Link`
|
|
33
|
+ * `skip` - see `Opus.Pipeline.Stage.Skip`
|
|
34
|
+
|
|
35
|
+ ## Stage Options
|
|
36
|
+
|
|
37
|
+ * `:with`: The function to call to fulfill this stage. It can be an Atom
|
|
38
|
+ referring to a public function of the module, an anonymous function or
|
|
39
|
+ a function reference.
|
|
40
|
+ * `:if`: Makes a stage conditional, it can be either an Atom referring
|
|
41
|
+ to a public function of the module, an anonymous function or a
|
|
42
|
+ function reference. For the stage to be executed, the condition *must*
|
|
43
|
+ return `true`. When the stage is skipped, the input is forwarded to
|
|
44
|
+ the next step if there's one.
|
|
45
|
+ * `:unless`: The opposite of the `:if` option, executes the step only
|
|
46
|
+ when the callback function returns `false`.
|
|
47
|
+ * `:raise`: A list of exceptions to not rescue. Defaults to `false`
|
|
48
|
+ which converts all exceptions to `{:error, %Opus.PipelineError{}}`
|
|
49
|
+ values halting the pipeline.
|
|
50
|
+ * `:error_message`: An error message to replace the original error when a
|
|
51
|
+ stage fails. It can be a String or Atom, which will be used directly in place
|
|
52
|
+ of the original message, or an anonymous function, which receives the input
|
|
53
|
+ of the failed stage and must return the error message to be used.
|
|
54
|
+ * `:retry_times`: How many times to retry a failing stage, before
|
|
55
|
+ halting the pipeline.
|
|
56
|
+ * `:retry_backoff`: A backoff function to provide delay values for
|
|
57
|
+ retries. It can be an Atom referring to a public function in the
|
|
58
|
+ module, an anonymous function or a function reference. It must return
|
|
59
|
+ an `Enumerable.t` yielding at least as many numbers as the
|
|
60
|
+ `retry_times`.
|
|
61
|
+ * `:instrument?`: A boolean which defaults to `true`. Set to `false` to
|
|
62
|
+ skip instrumentation for a stage.
|
|
63
|
+
|
|
20
64
|
## Exception Handling
|
|
21
65
|
|
|
22
66
|
All exceptions are converted to `{:error, exception}` tuples by default.
|
changed
lib/opus/pipeline/registration.ex
|
|
@@ -47,6 +47,7 @@ defmodule Opus.Pipeline.Registration do
|
|
47
47
|
[
|
|
48
48
|
define_callback(:conditional, stage_id, name, ensure_valid_conditional!(opts)),
|
|
49
49
|
define_callback(:with, stage_id, name, Access.get(opts, :with)),
|
|
50
|
+ define_callback(:error_message, stage_id, name, Access.get(opts, :error_message)),
|
|
50
51
|
define_callback(:retry_backoff, stage_id, name, Access.get(opts, :retry_backoff))
|
|
51
52
|
]
|
|
52
53
|
end
|
changed
lib/opus/pipeline/stage.ex
|
|
@@ -1,7 +1,5 @@
|
|
1
1
|
defmodule Opus.Pipeline.Stage do
|
|
2
|
- @moduledoc """
|
|
3
|
- Specification of the stage behavior
|
|
4
|
- """
|
|
2
|
+ @moduledoc false
|
|
5
3
|
|
|
6
4
|
@callback run(
|
|
7
5
|
{
|
|
|
@@ -20,8 +18,7 @@ defmodule Opus.Pipeline.Stage do
|
|
20
18
|
{module, type, name, %{conditional: {cond_type, :anonymous}, stage_id: id} = opts},
|
|
21
19
|
input
|
|
22
20
|
) do
|
|
23
|
- callback =
|
|
24
|
- (module._opus_callbacks[id] |> Enum.find(fn %{type: t} -> t == :conditional end)).name
|
|
21
|
+ callback = find_callback(module, :conditional, id)
|
|
25
22
|
|
|
26
23
|
maybe_run(
|
|
27
24
|
{module, type, name, %{opts | conditional: {cond_type, {module, callback, [input]}}}},
|
|
|
@@ -84,8 +81,7 @@ defmodule Opus.Pipeline.Stage do
|
|
84
81
|
do: with_retries({module, opts}, fn -> do_run(stage, input) end)
|
|
85
82
|
|
|
86
83
|
def with_retries({module, %{retry_times: times, stage_id: id, retry_backoff: :anonymous}}, fun) do
|
|
87
|
- callback =
|
|
88
|
- (module._opus_callbacks[id] |> Enum.find(fn %{type: t} -> t == :retry_backoff end)).name
|
|
84
|
+ callback = find_callback(module, :retry_backoff, id)
|
|
89
85
|
|
|
90
86
|
with_retries(
|
|
91
87
|
{module, %{retry_times: times, stage_id: id, retry_backoff: {module, callback, []}}},
|
|
|
@@ -144,6 +140,16 @@ defmodule Opus.Pipeline.Stage do
|
|
144
140
|
end
|
|
145
141
|
end
|
|
146
142
|
|
|
143
|
+ def handle_run(:error, %{
|
|
144
|
+ stage: {module, _type, name, %{error_message: :anonymous, stage_id: id}},
|
|
145
|
+ input: input
|
|
146
|
+ }) do
|
|
147
|
+ callback = find_callback(module, :error_message, id)
|
|
148
|
+
|
|
149
|
+ message = Safe.apply({module, callback, [input]})
|
|
150
|
+ {:halt, {:error, %PipelineError{error: message, pipeline: module, stage: name, input: input}}}
|
|
151
|
+ end
|
|
152
|
+
|
|
147
153
|
def handle_run(:error, %{stage: {module, _type, name, %{error_message: message}}, input: input}) do
|
|
148
154
|
{:halt, {:error, %PipelineError{error: message, pipeline: module, stage: name, input: input}}}
|
|
149
155
|
end
|
|
|
@@ -154,6 +160,17 @@ defmodule Opus.Pipeline.Stage do
|
|
154
160
|
{:error,
|
|
155
161
|
%PipelineError{error: "stage failed", pipeline: module, stage: name, input: input}}}
|
|
156
162
|
|
|
163
|
+ def handle_run({:error, e}, %{
|
|
164
|
+ stage: {module, _type, name, %{error_message: :anonymous, stage_id: id}},
|
|
165
|
+ input: input
|
|
166
|
+ }) do
|
|
167
|
+ callback = find_callback(module, :error_message, id)
|
|
168
|
+ message = Safe.apply({module, callback, [input]})
|
|
169
|
+
|
|
170
|
+ {:halt,
|
|
171
|
+ {:error, format_error(%{e | error: message}, %{pipeline: module, stage: name, input: input})}}
|
|
172
|
+ end
|
|
173
|
+
|
|
157
174
|
def handle_run({:error, e}, %{
|
|
158
175
|
stage: {module, _type, name, %{error_message: message}},
|
|
159
176
|
input: input
|
|
|
@@ -183,7 +200,7 @@ defmodule Opus.Pipeline.Stage do
|
|
183
200
|
end
|
|
184
201
|
|
|
185
202
|
defp do_run({module, type, name, %{with: :anonymous, stage_id: id} = opts}, input) do
|
|
186
|
- callback = (module._opus_callbacks[id] |> Enum.find(fn %{type: t} -> t == :with end)).name
|
|
203
|
+ callback = find_callback(module, :with, id)
|
|
187
204
|
do_run({module, type, name, %{opts | with: {module, callback, [input]}}}, input)
|
|
188
205
|
end
|
|
189
206
|
|
|
|
@@ -208,4 +225,8 @@ defmodule Opus.Pipeline.Stage do
|
|
208
225
|
_ -> false
|
|
209
226
|
end
|
|
210
227
|
end
|
|
228
|
+
|
|
229
|
+ defp find_callback(module, type, stage_id) do
|
|
230
|
+ (module._opus_callbacks[stage_id] |> Enum.find(fn %{type: t} -> t == type end)).name
|
|
231
|
+ end
|
|
211
232
|
end
|
changed
lib/opus/pipeline/stage/check.ex
|
|
@@ -22,6 +22,7 @@ defmodule Opus.Pipeline.Stage.Check do
|
|
22
22
|
|
|
23
23
|
@behaviour Stage
|
|
24
24
|
|
|
25
|
+ @doc false
|
|
25
26
|
def run({module, type, name, opts} = stage, input) do
|
|
26
27
|
case Stage.maybe_run(stage, input) do
|
|
27
28
|
ret when ret in [true, :stage_skipped] ->
|
changed
lib/opus/pipeline/stage/link.ex
|
|
@@ -2,7 +2,25 @@ defmodule Opus.Pipeline.Stage.Link do
|
|
2
2
|
@moduledoc ~S"""
|
|
3
3
|
The link stage calls the specified pipeline module.
|
|
4
4
|
|
|
5
|
- With a non Opus.Pipeline module, it ignores it.
|
|
5
|
+ When defined with a non `Opus.Pipeline` module, it ignores it.
|
|
6
|
+
|
|
7
|
+ ## Example
|
|
8
|
+
|
|
9
|
+ defmodule AddOnePipeline do
|
|
10
|
+ use Opus.Pipeline
|
|
11
|
+
|
|
12
|
+ step :add, with: &(&1 + 1)
|
|
13
|
+ end
|
|
14
|
+
|
|
15
|
+ defmodule MultiplicationPipeline do
|
|
16
|
+ use Opus.Pipeline
|
|
17
|
+
|
|
18
|
+ step :double, with: &(&1 * 2)
|
|
19
|
+ link AddOnePipeline
|
|
20
|
+ end
|
|
21
|
+
|
|
22
|
+ MultiplicationPipeline.call 5
|
|
23
|
+ # {:ok, 11}
|
|
6
24
|
"""
|
|
7
25
|
|
|
8
26
|
alias Opus.Pipeline.Stage
|
changed
lib/opus/pipeline/stage/skip.ex
|
|
@@ -1,20 +1,21 @@
|
|
1
1
|
defmodule Opus.Pipeline.Stage.Skip do
|
|
2
2
|
@moduledoc ~S"""
|
|
3
3
|
The skip stage is meant to halt the pipeline with no error if the given condition is true.
|
|
4
|
- This stage must be called with an `if` option, in order to decide if the pipeline is going to be
|
|
5
|
- halted or not.
|
|
6
4
|
|
|
7
|
- When the given conditional is `true`, the pipeline will return {:ok, :skipped} and all the following
|
|
5
|
+ This stage must always be defined with an `if` option, in order to decide if
|
|
6
|
+ the pipeline is going to be halted or not.
|
|
7
|
+
|
|
8
|
+ When the given conditional is `true`, the pipeline will return `{:ok, :skipped}` and all the following
|
|
8
9
|
steps will be skipped.
|
|
9
10
|
|
|
10
|
- ```
|
|
11
|
- defmodule CreateUserPipeline do
|
|
12
|
- use Opus.Pipeline
|
|
11
|
+ defmodule CreateUserPipeline do
|
|
12
|
+ use Opus.Pipeline
|
|
13
13
|
|
|
14
|
- skip :prevent_duplicates, if: :user_exists?
|
|
15
|
- step :persist_user
|
|
16
|
- end
|
|
17
|
- ```
|
|
14
|
+ skip :prevent_duplicates, if: :user_exists?
|
|
15
|
+ step :persist_user
|
|
16
|
+
|
|
17
|
+ def user_exists?(_), do: "implementation omitted"
|
|
18
|
+ end
|
|
18
19
|
|
|
19
20
|
In this example, if the `user_exists?` implementation returns `true`, then the next step `persist_user`
|
|
20
21
|
is not going to be called. If `false` or any other value, then Opus will keep following to the next stages.
|
|
|
@@ -24,6 +25,7 @@ defmodule Opus.Pipeline.Stage.Skip do
|
|
24
25
|
|
|
25
26
|
@behaviour Stage
|
|
26
27
|
|
|
28
|
+ @doc false
|
|
27
29
|
def run(stage, input) do
|
|
28
30
|
case stage |> Stage.maybe_run(input) do
|
|
29
31
|
:pipeline_skipped -> {:halt, :pipeline_skipped}
|
changed
lib/opus/pipeline/stage/step.ex
|
|
@@ -1,14 +1,52 @@
|
|
1
1
|
defmodule Opus.Pipeline.Stage.Step do
|
|
2
2
|
@moduledoc ~S"""
|
|
3
3
|
The step stage defines an operation which is considered successful unless
|
|
4
|
- it returns an error atom `:error` or tuple `{:error, _}`.
|
|
4
|
+ it returns either an error atom `:error` or tuple `{:error, _}`.
|
|
5
|
+
|
|
5
6
|
It is also considered failed and halts the pipeline when it raises an unexpected exception.
|
|
7
|
+
|
|
8
|
+ ## Example
|
|
9
|
+
|
|
10
|
+ defmodule CryptoMarkerForecastPipeline do
|
|
11
|
+ use Opus.Pipeline
|
|
12
|
+
|
|
13
|
+ step :waste_time, with: (fn _ -> Process.sleep(10) end)
|
|
14
|
+ step :calculate_lunar_phase
|
|
15
|
+ step :fetch_elonmusks_tweets
|
|
16
|
+ step :forecast
|
|
17
|
+
|
|
18
|
+ # Step definitions can either be defined inline using the `with` option
|
|
19
|
+ # or as module functions like below
|
|
20
|
+
|
|
21
|
+ # Notice that all step functions expect a single argument.
|
|
22
|
+ # The return value of a step becomes the input value of the next one.
|
|
23
|
+
|
|
24
|
+ def calculate_lunar_phase(_) do
|
|
25
|
+ ["🌑", "🌒", "🌓", "🌔", "🌖", "🌗", "🌘", "🌚", "🌜", "🌝"]
|
|
26
|
+ |> Enum.random
|
|
27
|
+ end
|
|
28
|
+
|
|
29
|
+ def fetch_elonmusks_tweets(_), do: "Baby Doge, doo, doo, doo"
|
|
30
|
+
|
|
31
|
+ def forecast(_) do
|
|
32
|
+ if :random.uniform > 0.5 do
|
|
33
|
+ :buy
|
|
34
|
+ else
|
|
35
|
+ :sell
|
|
36
|
+ end
|
|
37
|
+ end
|
|
38
|
+ end
|
|
39
|
+
|
|
40
|
+ The above pipeline module can be invoked with:
|
|
41
|
+
|
|
42
|
+ CryptoMarkerForecastPipeline.call "anything"
|
|
6
43
|
"""
|
|
7
44
|
|
|
8
45
|
alias Opus.Pipeline.Stage
|
|
9
46
|
|
|
10
47
|
@behaviour Stage
|
|
11
48
|
|
|
49
|
+ @doc false
|
|
12
50
|
def run(stage, input) do
|
|
13
51
|
stage |> Stage.maybe_run(input) |> Stage.handle_run(%{stage: stage, input: input})
|
|
14
52
|
end
|
changed
lib/opus/pipeline/stage/tee.ex
|
|
@@ -9,6 +9,7 @@ defmodule Opus.Pipeline.Stage.Tee do
|
|
9
9
|
|
|
10
10
|
@behaviour Stage
|
|
11
11
|
|
|
12
|
+ @doc false
|
|
12
13
|
def run(stage, input) do
|
|
13
14
|
Stage.maybe_run(stage, input)
|
changed
lib/opus/pipeline/stage_filter.ex
|
|
@@ -10,6 +10,7 @@ defmodule Opus.Pipeline.StageFilter do
|
|
10
10
|
|
|
11
11
|
import Enum, only: [reject: 2, filter: 2]
|
|
12
12
|
|
|
13
|
+ @doc false
|
|
13
14
|
def call(stages, opts) do
|
|
14
15
|
stages
|
|
15
16
|
|> stage_filter(:except, opts[:except])
|
changed
lib/opus/pipeline_error.ex
|
|
@@ -4,7 +4,7 @@ defmodule Opus.PipelineError do
|
|
4
4
|
"""
|
|
5
5
|
|
|
6
6
|
@type t :: %__MODULE__{
|
|
7
|
- error: struct,
|
|
7
|
+ error: any,
|
|
8
8
|
input: any,
|
|
9
9
|
pipeline: module,
|
|
10
10
|
stage: atom,
|
changed
lib/opus/telemetry.ex
|
|
@@ -1,6 +1,6 @@
|
|
1
1
|
defmodule Opus.Telemetry do
|
|
2
2
|
@moduledoc ~S"""
|
|
3
|
- Emits telemetry events
|
|
3
|
+ Emits telemetry events.
|
|
4
4
|
|
|
5
5
|
To enable this instrumentation module, update your config/config.exs file with:
|
changed
mix.exs
|
|
@@ -4,7 +4,7 @@ defmodule Opus.Mixfile do
|
|
4
4
|
def project do
|
|
5
5
|
[
|
|
6
6
|
app: :opus,
|
|
7
|
- version: "0.7.0",
|
|
7
|
+ version: "0.8.1",
|
|
8
8
|
elixir: "~> 1.6",
|
|
9
9
|
elixirc_paths: elixirc_paths(Mix.env()),
|
|
10
10
|
build_embedded: Mix.env() == :prod,
|
|
|
@@ -39,7 +39,7 @@ defmodule Opus.Mixfile do
|
|
39
39
|
defp deps do
|
|
40
40
|
[
|
|
41
41
|
{:retry, "~> 0.8"},
|
|
42
|
- {:telemetry, "~> 0.4", optional: true},
|
|
42
|
+ {:telemetry, "~> 0.4 or ~> 1.0", optional: true},
|
|
43
43
|
{:credo, "~> 0.8.10", only: [:dev, :test], runtime: false},
|
|
44
44
|
{:ex_doc, "~> 0.24.2", only: :dev, runtime: false},
|
|
45
45
|
{:dialyxir, "~> 1.0.0-rc.3", only: [:dev, :test], runtime: false},
|