Railway Flow-Based Programming with Flowex

Anton Mishchuk
5 min readMar 5, 2017

Flow-Based Programming

Having been a software developer many years, I’ve tried lots of programming languages with different paradigms: procedural, object oriented and functional. Last year I took a look at a completely different approach to program design — Flow-Based Programming (FBP) — a paradigm that defines an application as a network of independent processes exchanging data via message passing. FBP is a data-centered approach — when an application is viewed as a system of data streams being transformed by processes. It differs a lot from “conventional programming” (both OO and FP) where a program is a sequential modification of data which are at rest.

In FBP each process (called “component”) is independent “black box” with one or several inputs and outputs. In general, these components can be implemented using any programming language. The data (called “information packets” or IP) are being sent from output of one process to input of another via externally predefined connections. It can be said there are 2 logic layers in FBP application. The bottom layer is a set of components implementing parts of business-logic. And the top layer is a “communication logic” — an organization of data flows from one component to another.

Elixir GenStage

One may have noticed that of Elixir/Erlang actors completely fit the FBP process description. So an implementation of components is not a big deal. A more complex problem is setting up connections between them for passing IPs. As mentioned before these connections should be defined externally (components are not supposed to know about their neighbors).

José Valim announced GenStage package on July 14, 2016. GenStage is an Elixir behavior for exchanging events with back-pressure between Elixir processes. José defined GenStage as “better abstractions for working with collections” and the main scope of its application is parallel data processing (Flow package).

But I’ve found GenStage feature as a solution for FBP components communication problem. What one need to do is just place a component logic inside “stage” process and subscribe them in right order. GenStage will guarantee not only correct message passing but also ensure that stage will not be overflowed with data.

Railway FBP

There is a “Railway Oriented Programming” pattern in functional programming which presents a program (or its part) as a pipeline of functions (output of one function is an input for another). As an example let’s consider a simple program which receives a number as an input, then adds one, then multiplies the result by two and finally subtracts three:

defmodule Functions do
def add_one(number), do: number + 1
def mult_by_two(number), do: number * 2
def minus_three(number), do: number - 3
end
defmodule MainModule do
def run(number) do
number
|> Functions.add_one
|> Functions.mult_by_two
|> Functions.minus_three
end
end

MainModule.run/1 function defines a pipeline of functions with the same interface. The program can be easy redesigned using FBP approach. All we need is to “place” each of the functions into separate “stage” process.

This is what I call “Railway FBP” — the special case of FBP when a component graph is just a simple chain.

Flowex

Flowex is a set of abstractions built on top Elixir GenStage which allows to easily create chains of communicating processes.

The main abstraction is “pipeline”. In order to create it, one should use Flowex.Pipeline in the module and define functions which will be placed into separate GenStage using pipe macro:

defmodule FunPipeline do
use Flowex.Pipeline
pipe :add_one
pipe :mult_by_two
pipe :minus_three
# functions` definitions are skipped
end

After compilation of FunPipeline module, one can “start” pipeline: FunPipeline.start. A lot of things happen after that:

  • three GenStages start — one for each of the function in the pipeline;
  • one additional GenStage starts for error processing is started;
  • ‘producer’ and ‘consumer’ GenStages start to handle input and output;
  • all the components are placed under Supervisor.

In order to run calculations one can use FunPipeline.cast or FunPipeline.call function. According to Elixir/Erlang conventions call will perform a synchronous operation, so the function will returns result only after IP has been sent through all “pipes”. The cast function sends IP into the pipeline and returns :ok immediately. One should use cast if the returned result doesn’t matter.

Another way to run calculations is using Flowex.Client. The client is just GenServer initialized with a specific pipeline. One need clients to effectively utilize the pipeline (see details)

What if something went wrong? The pipeline has a mechanism of error handling. If an error happens, for example, in the first pipe, the :mult_by_two and :minus_three functions will not be called. IP will bypass to the “error_pipe”. Even if you don’t specify “error_pipe” flowex will add the default one. One can use error_pipe macro to define a function which will be called when an error happens:

defmodule FunPipeline do
use Flowex.Pipeline
# ...
error_pipe :if_error

def if_error(error, struct, opts)
#...
end

What should you do if you need to share some functionality between pipelines? It is not a good idea to duplicate function if you want to use the same component in other pipeline modules. There is a better solution — one can pass module name to the pipe macro:

defmodule ModulePipeline do
use Flowex.Pipeline
pipe AddOne
pipe MultByTwo
pipe MinusThree
end

Each module must implement only init and call functions (like in Plug modules). In this way, one can create reusable components for pipelines.

Each component of pipeline takes a some time to finish IP processing. One component does simple work, another can process data for a long time. So if client or clients continuously push IPs they will stack before the slowest component. And data processing speed will be limited by that component.

Flowex has a solution. One can define a number of executing processes for each component:

defmodule FunPipeline do
use Flowex.Pipeline
pipe :add_one, 1
pipe :mult_by_two, 3
pipe :minus_three, 2
error_pipe :if_error, 2
# ...
end

And the pipeline will look like on the figure below:

I called that feature “controlled parallelism” — one can adjust and control a number of executing components before the program starts. It is an opposite approach to what we used to see in Elixir/Erlang applications where a number of processes may change on demand — for example “cowboy” server creates a new process for each request.

Conclusion

Flow-Based Programming is an absolutely different approach to program design as compared with “conventional programming”. While in general, it seems difficult and weird some special cases are very easy to implement.

I’m sure lots of your programs or at least their parts may be designed to be a chain of sequentially called functions. If so, there is only one step to evaluate them in separate processes — use Flowex.

--

--