changed README.md
 
@@ -14,7 +14,7 @@ The package can be installed by adding `opus` to your list of dependencies in `m
14
14
15
15
```elixir
16
16
def deps do
17
- [{:opus, "~> 0.7"}]
17
+ [{:opus, "~> 0.8"}]
18
18
end
19
19
```
20
20
 
@@ -177,8 +177,10 @@ options:
177
177
* `:raise`: A list of exceptions to not rescue. Defaults to `false`
178
178
which converts all exceptions to `{:error, %Opus.PipelineError{}}`
179
179
values halting the pipeline.
180
- * `:error_message`: A String or Atom to replace the original error when
181
- a stage fails.
180
+ * `:error_message`: An error message to replace the original error when a
181
+ stage fails. It can be a String or Atom, which will be used directly in place
182
+ of the original message, or an anonymous function, which receives the input
183
+ of the failed stage and must return the error message to be used.
182
184
* `:retry_times`: How many times to retry a failing stage, before
183
185
halting the pipeline.
184
186
* `:retry_backoff`: A backoff function to provide delay values for
changed hex_metadata.config
 
@@ -27,5 +27,5 @@
27
27
{<<"name">>,<<"telemetry">>},
28
28
{<<"optional">>,true},
29
29
{<<"repository">>,<<"hexpm">>},
30
- {<<"requirement">>,<<"~> 0.4">>}]]}.
31
- {<<"version">>,<<"0.7.0">>}.
30
+ {<<"requirement">>,<<"~> 0.4 or ~> 1.0">>}]]}.
31
+ {<<"version">>,<<"0.8.1">>}.
changed lib/opus.ex
 
@@ -1,5 +1,7 @@
1
1
defmodule Opus do
2
2
@moduledoc """
3
3
Documentation for Opus.
4
+
5
+ see `Opus.Pipeline`.
4
6
"""
5
7
end
changed lib/opus/pipeline.ex
 
@@ -3,20 +3,64 @@ defmodule Opus.Pipeline do
3
3
Defines a pipeline.
4
4
5
5
A pipeline defines a single entry point function to start running the defined stages.
6
- A sample pipeline can be:
6
+
7
+ A simple pipeline module can be defined as:
7
8
8
9
defmodule ArithmeticPipeline do
9
10
use Opus.Pipeline
10
11
11
- step :to_integer, &:erlang.binary_to_integer/1
12
+ step :to_integer, with: &:erlang.binary_to_integer/1
12
13
step :double, with: & &1 * 2
13
14
end
14
15
16
+ # Invoke it with:
17
+ ArithmeticPipeline.call "42"
18
+ # => {:ok, 84}
19
+
15
20
The pipeline can be run calling a `call/1` function which is defined by `Opus.Pipeline`.
16
21
Pipelines are intended to have a single parameter and always return a tagged tuple `{:ok, value} | {:error, error}`.
17
22
A stage returning `{:error, error}` halts the pipeline. The error value is an `Opus.PipelineError` struct which
18
23
contains useful information to detect where the error was caused and why.
19
24
25
+ ## Stage Definition
26
+
27
+ The following types of stages can be defined:
28
+
29
+ * `step` - see `Opus.Pipeline.Stage.Step`
30
+ * `tee` - see `Opus.Pipeline.Stage.Tee`
31
+ * `check` - see `Opus.Pipeline.Stage.Check`
32
+ * `link` - see `Opus.Pipeline.Stage.Link`
33
+ * `skip` - see `Opus.Pipeline.Stage.Skip`
34
+
35
+ ## Stage Options
36
+
37
+ * `:with`: The function to call to fulfill this stage. It can be an Atom
38
+ referring to a public function of the module, an anonymous function or
39
+ a function reference.
40
+ * `:if`: Makes a stage conditional, it can be either an Atom referring
41
+ to a public function of the module, an anonymous function or a
42
+ function reference. For the stage to be executed, the condition *must*
43
+ return `true`. When the stage is skipped, the input is forwarded to
44
+ the next step if there's one.
45
+ * `:unless`: The opposite of the `:if` option, executes the step only
46
+ when the callback function returns `false`.
47
+ * `:raise`: A list of exceptions to not rescue. Defaults to `false`
48
+ which converts all exceptions to `{:error, %Opus.PipelineError{}}`
49
+ values halting the pipeline.
50
+ * `:error_message`: An error message to replace the original error when a
51
+ stage fails. It can be a String or Atom, which will be used directly in place
52
+ of the original message, or an anonymous function, which receives the input
53
+ of the failed stage and must return the error message to be used.
54
+ * `:retry_times`: How many times to retry a failing stage, before
55
+ halting the pipeline.
56
+ * `:retry_backoff`: A backoff function to provide delay values for
57
+ retries. It can be an Atom referring to a public function in the
58
+ module, an anonymous function or a function reference. It must return
59
+ an `Enumerable.t` yielding at least as many numbers as the
60
+ `retry_times`.
61
+ * `:instrument?`: A boolean which defaults to `true`. Set to `false` to
62
+ skip instrumentation for a stage.
63
+
20
64
## Exception Handling
21
65
22
66
All exceptions are converted to `{:error, exception}` tuples by default.
changed lib/opus/pipeline/registration.ex
 
@@ -47,6 +47,7 @@ defmodule Opus.Pipeline.Registration do
47
47
[
48
48
define_callback(:conditional, stage_id, name, ensure_valid_conditional!(opts)),
49
49
define_callback(:with, stage_id, name, Access.get(opts, :with)),
50
+ define_callback(:error_message, stage_id, name, Access.get(opts, :error_message)),
50
51
define_callback(:retry_backoff, stage_id, name, Access.get(opts, :retry_backoff))
51
52
]
52
53
end
changed lib/opus/pipeline/stage.ex
 
@@ -1,7 +1,5 @@
1
1
defmodule Opus.Pipeline.Stage do
2
- @moduledoc """
3
- Specification of the stage behavior
4
- """
2
+ @moduledoc false
5
3
6
4
@callback run(
7
5
{
 
@@ -20,8 +18,7 @@ defmodule Opus.Pipeline.Stage do
20
18
{module, type, name, %{conditional: {cond_type, :anonymous}, stage_id: id} = opts},
21
19
input
22
20
) do
23
- callback =
24
- (module._opus_callbacks[id] |> Enum.find(fn %{type: t} -> t == :conditional end)).name
21
+ callback = find_callback(module, :conditional, id)
25
22
26
23
maybe_run(
27
24
{module, type, name, %{opts | conditional: {cond_type, {module, callback, [input]}}}},
 
@@ -84,8 +81,7 @@ defmodule Opus.Pipeline.Stage do
84
81
do: with_retries({module, opts}, fn -> do_run(stage, input) end)
85
82
86
83
def with_retries({module, %{retry_times: times, stage_id: id, retry_backoff: :anonymous}}, fun) do
87
- callback =
88
- (module._opus_callbacks[id] |> Enum.find(fn %{type: t} -> t == :retry_backoff end)).name
84
+ callback = find_callback(module, :retry_backoff, id)
89
85
90
86
with_retries(
91
87
{module, %{retry_times: times, stage_id: id, retry_backoff: {module, callback, []}}},
 
@@ -144,6 +140,16 @@ defmodule Opus.Pipeline.Stage do
144
140
end
145
141
end
146
142
143
+ def handle_run(:error, %{
144
+ stage: {module, _type, name, %{error_message: :anonymous, stage_id: id}},
145
+ input: input
146
+ }) do
147
+ callback = find_callback(module, :error_message, id)
148
+
149
+ message = Safe.apply({module, callback, [input]})
150
+ {:halt, {:error, %PipelineError{error: message, pipeline: module, stage: name, input: input}}}
151
+ end
152
+
147
153
def handle_run(:error, %{stage: {module, _type, name, %{error_message: message}}, input: input}) do
148
154
{:halt, {:error, %PipelineError{error: message, pipeline: module, stage: name, input: input}}}
149
155
end
 
@@ -154,6 +160,17 @@ defmodule Opus.Pipeline.Stage do
154
160
{:error,
155
161
%PipelineError{error: "stage failed", pipeline: module, stage: name, input: input}}}
156
162
163
+ def handle_run({:error, e}, %{
164
+ stage: {module, _type, name, %{error_message: :anonymous, stage_id: id}},
165
+ input: input
166
+ }) do
167
+ callback = find_callback(module, :error_message, id)
168
+ message = Safe.apply({module, callback, [input]})
169
+
170
+ {:halt,
171
+ {:error, format_error(%{e | error: message}, %{pipeline: module, stage: name, input: input})}}
172
+ end
173
+
157
174
def handle_run({:error, e}, %{
158
175
stage: {module, _type, name, %{error_message: message}},
159
176
input: input
 
@@ -183,7 +200,7 @@ defmodule Opus.Pipeline.Stage do
183
200
end
184
201
185
202
defp do_run({module, type, name, %{with: :anonymous, stage_id: id} = opts}, input) do
186
- callback = (module._opus_callbacks[id] |> Enum.find(fn %{type: t} -> t == :with end)).name
203
+ callback = find_callback(module, :with, id)
187
204
do_run({module, type, name, %{opts | with: {module, callback, [input]}}}, input)
188
205
end
189
206
 
@@ -208,4 +225,8 @@ defmodule Opus.Pipeline.Stage do
208
225
_ -> false
209
226
end
210
227
end
228
+
229
+ defp find_callback(module, type, stage_id) do
230
+ (module._opus_callbacks[stage_id] |> Enum.find(fn %{type: t} -> t == type end)).name
231
+ end
211
232
end
changed lib/opus/pipeline/stage/check.ex
 
@@ -22,6 +22,7 @@ defmodule Opus.Pipeline.Stage.Check do
22
22
23
23
@behaviour Stage
24
24
25
+ @doc false
25
26
def run({module, type, name, opts} = stage, input) do
26
27
case Stage.maybe_run(stage, input) do
27
28
ret when ret in [true, :stage_skipped] ->
changed lib/opus/pipeline/stage/link.ex
 
@@ -2,7 +2,25 @@ defmodule Opus.Pipeline.Stage.Link do
2
2
@moduledoc ~S"""
3
3
The link stage calls the specified pipeline module.
4
4
5
- With a non Opus.Pipeline module, it ignores it.
5
+ When defined with a non `Opus.Pipeline` module, it ignores it.
6
+
7
+ ## Example
8
+
9
+ defmodule AddOnePipeline do
10
+ use Opus.Pipeline
11
+
12
+ step :add, with: &(&1 + 1)
13
+ end
14
+
15
+ defmodule MultiplicationPipeline do
16
+ use Opus.Pipeline
17
+
18
+ step :double, with: &(&1 * 2)
19
+ link AddOnePipeline
20
+ end
21
+
22
+ MultiplicationPipeline.call 5
23
+ # {:ok, 11}
6
24
"""
7
25
8
26
alias Opus.Pipeline.Stage
changed lib/opus/pipeline/stage/skip.ex
 
@@ -1,20 +1,21 @@
1
1
defmodule Opus.Pipeline.Stage.Skip do
2
2
@moduledoc ~S"""
3
3
The skip stage is meant to halt the pipeline with no error if the given condition is true.
4
- This stage must be called with an `if` option, in order to decide if the pipeline is going to be
5
- halted or not.
6
4
7
- When the given conditional is `true`, the pipeline will return {:ok, :skipped} and all the following
5
+ This stage must always be defined with an `if` option, in order to decide if
6
+ the pipeline is going to be halted or not.
7
+
8
+ When the given conditional is `true`, the pipeline will return `{:ok, :skipped}` and all the following
8
9
steps will be skipped.
9
10
10
- ```
11
- defmodule CreateUserPipeline do
12
- use Opus.Pipeline
11
+ defmodule CreateUserPipeline do
12
+ use Opus.Pipeline
13
13
14
- skip :prevent_duplicates, if: :user_exists?
15
- step :persist_user
16
- end
17
- ```
14
+ skip :prevent_duplicates, if: :user_exists?
15
+ step :persist_user
16
+
17
+ def user_exists?(_), do: "implementation omitted"
18
+ end
18
19
19
20
In this example, if the `user_exists?` implementation returns `true`, then the next step `persist_user`
20
21
is not going to be called. If `false` or any other value, then Opus will keep following to the next stages.
 
@@ -24,6 +25,7 @@ defmodule Opus.Pipeline.Stage.Skip do
24
25
25
26
@behaviour Stage
26
27
28
+ @doc false
27
29
def run(stage, input) do
28
30
case stage |> Stage.maybe_run(input) do
29
31
:pipeline_skipped -> {:halt, :pipeline_skipped}
changed lib/opus/pipeline/stage/step.ex
 
@@ -1,14 +1,52 @@
1
1
defmodule Opus.Pipeline.Stage.Step do
2
2
@moduledoc ~S"""
3
3
The step stage defines an operation which is considered successful unless
4
- it returns an error atom `:error` or tuple `{:error, _}`.
4
+ it returns either an error atom `:error` or tuple `{:error, _}`.
5
+
5
6
It is also considered failed and halts the pipeline when it raises an unexpected exception.
7
+
8
+ ## Example
9
+
10
+ defmodule CryptoMarkerForecastPipeline do
11
+ use Opus.Pipeline
12
+
13
+ step :waste_time, with: (fn _ -> Process.sleep(10) end)
14
+ step :calculate_lunar_phase
15
+ step :fetch_elonmusks_tweets
16
+ step :forecast
17
+
18
+ # Step definitions can either be defined inline using the `with` option
19
+ # or as module functions like below
20
+
21
+ # Notice that all step functions expect a single argument.
22
+ # The return value of a step becomes the input value of the next one.
23
+
24
+ def calculate_lunar_phase(_) do
25
+ ["🌑", "🌒", "🌓", "🌔", "🌖", "🌗", "🌘", "🌚", "🌜", "🌝"]
26
+ |> Enum.random
27
+ end
28
+
29
+ def fetch_elonmusks_tweets(_), do: "Baby Doge, doo, doo, doo"
30
+
31
+ def forecast(_) do
32
+ if :random.uniform > 0.5 do
33
+ :buy
34
+ else
35
+ :sell
36
+ end
37
+ end
38
+ end
39
+
40
+ The above pipeline module can be invoked with:
41
+
42
+ CryptoMarkerForecastPipeline.call "anything"
6
43
"""
7
44
8
45
alias Opus.Pipeline.Stage
9
46
10
47
@behaviour Stage
11
48
49
+ @doc false
12
50
def run(stage, input) do
13
51
stage |> Stage.maybe_run(input) |> Stage.handle_run(%{stage: stage, input: input})
14
52
end
changed lib/opus/pipeline/stage/tee.ex
 
@@ -9,6 +9,7 @@ defmodule Opus.Pipeline.Stage.Tee do
9
9
10
10
@behaviour Stage
11
11
12
+ @doc false
12
13
def run(stage, input) do
13
14
Stage.maybe_run(stage, input)
changed lib/opus/pipeline/stage_filter.ex
 
@@ -10,6 +10,7 @@ defmodule Opus.Pipeline.StageFilter do
10
10
11
11
import Enum, only: [reject: 2, filter: 2]
12
12
13
+ @doc false
13
14
def call(stages, opts) do
14
15
stages
15
16
|> stage_filter(:except, opts[:except])
changed lib/opus/pipeline_error.ex
 
@@ -4,7 +4,7 @@ defmodule Opus.PipelineError do
4
4
"""
5
5
6
6
@type t :: %__MODULE__{
7
- error: struct,
7
+ error: any,
8
8
input: any,
9
9
pipeline: module,
10
10
stage: atom,
changed lib/opus/telemetry.ex
 
@@ -1,6 +1,6 @@
1
1
defmodule Opus.Telemetry do
2
2
@moduledoc ~S"""
3
- Emits telemetry events
3
+ Emits telemetry events.
4
4
5
5
To enable this instrumentation module, update your config/config.exs file with:
changed mix.exs
 
@@ -4,7 +4,7 @@ defmodule Opus.Mixfile do
4
4
def project do
5
5
[
6
6
app: :opus,
7
- version: "0.7.0",
7
+ version: "0.8.1",
8
8
elixir: "~> 1.6",
9
9
elixirc_paths: elixirc_paths(Mix.env()),
10
10
build_embedded: Mix.env() == :prod,
 
@@ -39,7 +39,7 @@ defmodule Opus.Mixfile do
39
39
defp deps do
40
40
[
41
41
{:retry, "~> 0.8"},
42
- {:telemetry, "~> 0.4", optional: true},
42
+ {:telemetry, "~> 0.4 or ~> 1.0", optional: true},
43
43
{:credo, "~> 0.8.10", only: [:dev, :test], runtime: false},
44
44
{:ex_doc, "~> 0.24.2", only: :dev, runtime: false},
45
45
{:dialyxir, "~> 1.0.0-rc.3", only: [:dev, :test], runtime: false},