Flow-Based REST API with Flowex and Plug
Flowex is Flow-Based Programming framework built on top of Elixir GenStage library. It is a set of abstractions which allows writing program with Railway FBP paradigm. You can read more about Flowex in this post.
I’ve written a couple of posts about the library and had a few of talks on Elixir club and Pivorak meetups. And I’ve noticed there are two main questions that appear after one get aquatinted with Flowex.
1. Is it possible to design real applications in Railway-FBP paradigm that Flowex propose?
2. Does GenStages bring significant performance overhead to the application?
The goal of the article is to get the positive answer to the first one and negative to the last.
I’ve prepared a simple Elixir application called plug_flowex which is the simplest prototype of REST API for an imaginary blogging platform. It has only 2 endpoints (GET user and GET post) and, for simplicity, reads data from text file. But this is enough to understand the principles of design and measure performance.
After reading the article you will know about
- Flowex application design approach
- Pipelines and pipes initialization process
- Components and pipelines reuse
- Visualization of pipelines
- Pipeline performance versus the performance of the same code evaluated in one process
Plug and Flowex
Elixir Plug library introduces a concept of “plugs” — functions with a similar interface that accept Plug.Conn
structure and must return the same but modified structure. This enforces developer to design application using so-called “Railway Oriented Programming” pattern, where an application is a pipeline of functions with similar interface.
This pattern also underlies the Flowex library. The only difference is that every function call is placed into separate process — GenStage, thus allowing to execute code in parallel.
When you design application in such way you need just think about “happy path” — how the input data (information packages or IPs) should be transformed to turn input into output. Then you prepare “plugs” (or “pipes” in Flowex) and assembly them into a pipeline.
Application overview (code)
There are 2 endpoints in the application:
- GET /api/:user_id
- GET /api/:user_id/posts/:post_id
There is also “GET sync_api/:user_id” endpoint used for benchmarking. We will return to this later.
Take a look at PlugFlowex
module where the actions begin.
defmodule PlugFlowex do
use Plug.Router
# ...
get "api/:user_id" do
pipeline = Application.get_env(:plug_flowex,
:get_user_pipeline)
result = GetUserPipeline.call(pipeline,
%GetUserPipeline{conn: conn})
result.conn
end get "api/:user_id/posts/:post_id" do
pipeline = Application.get_env(:plug_flowex,
:get_user_post_pipeline)
result = GetUserPostPipeline.call(pipeline,
%GetUserPostPipeline{conn: conn})
result.conn
end
# ...
end
So, for each endpoint, there is a corresponding pipeline (stored in application environment). Each pipeline is called with Plug.Conn
structure.
Let’s consider the first pipeline in details.
GetUserPipeline
The “happy path” for user retrieving is the following: authenticate a client, find a record, render JSON response and send it to the client. Below is a code of the GetUserPipeline
.
defmodule GetUserPipeline do
use Flowex.Pipeline
defstruct [:conn, :user] pipe FetchParams,
opts: %{auth_data: ["token"], repo_data: ["user_id"]}
pipe AuthClient
pipe FindRecord,
opts: %{finder: &__MODULE__.find_user/1, assign_to: :user}
pipe :prepare_data
pipe RenderResponse, opts: %{renderer: UserRenderer}
pipe SendResponse
error_pipe :handle_error def prepare_data(%{user: user}, _opts), do: %{render_data: user}
def find_user(repo_data), do: UserRepo.find(repo_data["user_id"]) def handle_error(error, struct, _opts) do
pipeline = Application.get_env(:plug_flowex,
:handle_error_pipeline)
HandleErrorPipeline.call(pipeline,
%HandleErrorPipeline{conn: struct.conn, error: error.error})
end
end
The pipeline struct has only 2 field — :conn
and :user
. An information package (IP) initialized with conn
passes through 6 pipes. Names are very descriptive, so it is easy to understand what is going on.
FetchParams
— prepares data for the following components;AuthClient
— authenticates client by token;FindRecord
— fetches user from “DB”;:prepare_data
— prepares data for rendering;RenderResponse
— renders JSON response;SendResponse
— sends response back to the client;
Let’s take a look at component’s implementation:
FetchParams
defmodule FetchParams do
import Plug.Conn defstruct [:conn] def init(opts), do: opts def call(%{conn: conn}, opts) do
conn = fetch_query_params(conn)
%{auth_data: data_for(:auth_data, conn, opts),
repo_data: data_for(:repo_data, conn, opts)}
end defp data_for(key, conn, opts) do
opts[key]
|> Enum.reduce(%{}, &Map.put(&2, &1, conn.params[&1]))
end
end
This component fetches “token” and “user_id” from params and sets the values to :auth_data
and :repo_data
fields which will be available in the next components.
Note how these keys are passed to the component via options into the pipe
macro. This is a component parameterization mechanism, which allows to reuse components in different pipelines.
AuthClient
This component just finds token in DB. If there is no such token it will raise an exception.
defmodule AuthClient do
defstruct [:auth_data]
defmodule Error do
defexception message: “invalid token”
end def init(opts), do: opts def call(%{auth_data: auth_data}, _opts) do
if token = TokenRepo.find_by_token(auth_data[“token”]) do
%{current_user_id: token[“user_id”]}
else
raise Error
end
end
end
FindRecord
defmodule FindRecord do
defstruct [:repo_data] defmodule Error do
defexception message: “record not found”
end def init(opts), do: opts def call(%{repo_data: repo_data}, opts) do
if record = opts[:finder].(repo_data) do
%{opts[:assign_to] => record}
else
raise Error
end
end
end
Note, this component has a specific option :finder
which defines a function which will be called inside the component:
def find_user(repo_data), do: UserRepo.find(repo_data[“user_id”])
:prepare_data
This component does not do much work. It just prepares data for the renderer.
def prepare_data(%{user: user}, _opts), do: %{render_data: user}
RenderResponse
The component is parameterized with renderer module UserRenderer
which is responsible for presentation of user records.
defmodule RenderResponse do
defstruct [:render_data] def init(opts), do: opts def call(%{render_data: render_data}, opts) do
content = opts[:renderer].render(render_data)
%{status: 200, content: content}
end
end
SendResponse
The component just calls Plug.Conn.send_resp
function to send data to the client.
defmodule SendResponse do
import Plug.Conn
defstruct [:conn, :status, :content] def init(opts), do: opts def call(data, _opts) do
%{conn: send_resp(data.conn, data.status, data.content)}
end
end
That’s it. These 6 components form a successful part of the pipeline. We will discuss error handling mechanism later, but for now, lets take a look at another pipeline — GetUserPostPipeline
and discover how one can reuse existing components
Reuse of components
GetUserPostPipeline
defmodule GetUserPostPipeline do
use Flowex.Pipeline
defstruct [:conn, :post] pipe FetchParams,
opts: %{auth_data: [“token”], repo_data: [“user_id”, “post_id”]}
pipe AuthClient
pipe FindRecord,
opts: %{finder: &__MODULE__.find_post/1, assign_to: :post}
pipe :prepare_data
pipe RenderResponse, opts: %{renderer: PostRenderer}
pipe SendResponse
error_pipe :handle_error def prepare_data(%{post: post}, _opts), do: %{render_data: post} def find_post(repo_data) do
PostRepo.find_user_post(repo_data[“user_id”],
repo_data[“post_id”])
end def handle_error(error, struct, _opts) do
pipeline = Application.get_env(:plug_flowex,
:handle_error_pipeline)
HandleErrorPipeline.call(pipeline,
%HandleErrorPipeline{conn: struct.conn, error: error.error})
end
end
You may notice that there is no any new component in the pipeline. GetUserPostPipeline
uses the same components as the previous one but some of them are parameterized in a different way.
FetchParams
pipe fetches post_id
in addition to user_id
. FindRecord
now uses find_post
function to find user post. And, finally, RenderResponse
uses PostRenderer
module to generate JSON response.
Component parameterization is a very important mechanism which allows initializing pipes with some specific state so they can be used in a variety of contexts. The same approach also can be applied for the whole pipeline. One can pass options to the pipeline’s start
function and thereby initialize pipeline with some specific state.
Pipeline initialization
Each pipeline is a chain of GenStage processes under one supervisor. Generally, you can start pipelines in any place in your code. But a good practice is to build all the pipelines when starting the application and place pipelines’ supervisors under application supervisor.
Below is the code of PipelineApp
module which is the main application module:
defmodule PipelineApp do
use Application
import Supervisor.Spec def start(_type, _opts) do
children = [worker(PlugFlowex, [])]
{:ok, supervisor_pid} = Supervisor.start_link(children,
strategy: :one_for_one) get_user_pipeline =
GetUserPipeline.supervised_start(supervisor_pid)
get_user_post_pipeline =
GetUserPostPipeline.supervised_start(supervisor_pid)
handle_error_pipeline =
HandleErrorPipeline.supervised_start(supervisor_pid) Application.put_env(:plug_flowex,
:get_user_pipeline, get_user_pipeline)
Application.put_env(:plug_flowex,
:get_user_post_pipeline, get_user_post_pipeline)
Application.put_env(:plug_flowex,
:handle_error_pipeline, handle_error_pipeline) {:ok, supervisor_pid}
end
end
As you can see there are 3 pipelines started using supervised_start
function which places pipeline`s supervisor under the main application supervisor. Then the pipelines are stored in application environment to be globally accessible.
Pipeline branching
You definitely noticed HandleErrorPipeline pipeline in the code. It is called in handle_error
function both in GetUserPipeline
andGetUserPostPipeline
. This is a good example of how some specific functionality may be moved to separate pipeline:
defmodule HandleErrorPipeline do
use Flowex.Pipeline
defstruct [:conn, :error] pipe Handle401
pipe Handle404
pipe Handle500
pipe RenderError
pipe SendResponse
end
This pipeline has a couple of components that just prepare information about the exception. For example, code of Handle401
pipe:
defmodule Handle401 do
defstruct [:error, :status, :content]
use HandleErrorCommon
@status 401 def call(data, _opts) do
case data.error do
%AuthClient.Error{message: message} ->
%{status: @status, content: message}
_ -> data
end
end
end
Of course, error handling is not a single case when you may call the separate pipeline. Imagine, for example, that you have complex authentication process or some sophisticated data extraction mechanism (ETL pipelines is a good example). All these cases can be easily extracted into separate pipelines.
Flowex diagrams
This is the most interesting part of Flow-Based Programming. The ability to draw the application graph is, in my view, the strongest suite of FBP approach.
The figure below demonstrates GetUserPipeline
diagram. I added (just for demonstration) two additional pipes to AuthClient
and FindRecord
components that do not exist in the code.
One can see all the components and pipelines that IPs should pass, from left to right and from top to bottom. And this is very cool!
Benchmarks
To compare pipeline performance with the performance of the same code evaluated synchronously I’ve prepared a GetUserSync
module with call
function which does the same as GetUserPipeline
. There is the code.
For benchmarking I chose wrk utility. The utility allows to specify number of threads (-t
option) and number of connections per one thread (-c
option). There will be results with different values: -t1 -c1
— only one request at a time, -t10 -c10
–10 threads with 10 connections in each, so 100 simultaneous requests, and -t100 -c100
for 10K connections
For the benchmarking I’ve added a count: 10
option for each pipe in the GetUserPipeline
pipelines, so there were 10 copies of each component exist in the pipeline.
Below are test results for different wrk options:
| | -t1 -c1 | -t10 -c10 | -t100 -c100 |
|-----------|----------|------------|-------------|
| pipeline | 4.6 K | 15.2 K | 14.8 K |
| sync | 12.1 K | 33.8 K | 38.4 K |
So, as you can see there is a significant difference. Synchronous evaluation is 2–3x times faster then in pipelines. But remember that this demo application don’t do anything: each component perform the simplest data transformation and there is no real database (all data a cached in memory).
A real application must calculate something and wait for something! Let’s simulate these calculations and pauses.
For the next benchmark, I added :timer.sleep(1)
to FindRecord
pipe in order to simulate 1ms database delay. Also I added Enum.reduce(1..1650, 1, &(&1*&2))
to RenderResponse
component to simulate processor intensive task. This factorial calculation also takes ~1ms on my machine.
| | -t1 -c1 | -t10 -c10 | -t100 -c100 |
|----------|---------|-----------|-------------|
| pipeline | 0.25 K | 2.6 K | 3.6 K |
| sync | 0.25 K | 2.8 K | 4.1 K |
The situation changed significantly! The difference became negligible!
If we increase the delay by 10 times — :timer.sleep(10)
and Enum.reduce(1..5000, 1, &(&1*&2))
(this ~10ms of processor time) the result will be:
| | -t1 -c1 | -t10 -c10 | -t100 -c100 |
|-----------|----------|------------|-------------|
| pipeline | 42 | 0.45 K | 0.56 K |
| sync | 42 | 0.45 K | 0.58 K |
Expected result! The difference is less 4%!
Conclusion
Designing application with Flowex Railway-FBP paradigm is quite an easy process. You are still in FP paradigm where your application is a set of functions which transform data structures in some way. But you need be more data-centric and think carefully about function interfaces.
Imagine a “happy path” — the sequence of data transformation to be performed in order to transform input to output. This defines components in the pipeline. Then think how to make components reusable, so you will be able to compose other pipelines with the same pipes parameterized in a different way.
Benchmark result show that the overhead of GenStage communication is very low. One can see performance delay only if components do almost nothing. But in a real situation it is absolutely negligible!
I hope, now you are ready to try Flowex in your projects!
Hit the 💚 if you enjoyed the article and do not hesitate to contact me if you have questions or proposals!
Have a wonderful week,
Anton