Strom — composable components for stream processing
This article describes some details of the implementation of the ideas described in my previous article “Composable components for complex event processing”.
Strom is implemented using the Elixir programming language — the language that has two important abstractions: Stream and Process.
Necessary abstractions
- Stream (https://hexdocs.pm/elixir/Stream.html) — composable, lazy enumerables. For example:
stream =
[1, 2, 3]
|> Stream.cycle() # creates an infinite stream: 1,2,3,1,2,3 ...
|> Stream.map(&(&1 * 2)) # multiplies each element by 2
|> Stream.each(&IO.inspect/1) # prints elements
It returns a stream data structure, no evaluation happens since streams are lazy. Only when one runs the stream (Stream.run/1
) or converts the stream to a List
(or Enum
), the stream will start infinitely looping through elements.
2. Process (GenServer
) — the core abstraction for the BEAM languages — a lightweight execution entity for concurrency and parallelism. Processes can be started and stopped, invoke functions, and send messages to other processes. Read the documentation for more information.
Parallel processing of streams
As described in the previous article, the components must operate with flows
— named sets of streams. In Elixir, they are just maps with arbitrary keys and streams as values. For example:
flow = %{
stream1: Enum.to_list(1..1000), # stream of 1000 numners: 1, 2, ... 1000
stream2: Stream.cycle([1, 2, 3]), # infinite stream: 1, 2, 3, 1, ...
}
Consider a transformer that multiplies each element in streams by three:
multiplier = Transformer.new(
[:stream1, :stream2], # specify streams to apply the function
&(&1 * 3) # the function
)
The result of calling the transformer
is a new flow
with new streams (:stream1
and :stream2
) which will produce multiplied elements:
new_flow = Transformer.call(flow, multiplier)
# %{stream1: modified_stream1, stream2: modified_stream2}
The resulting streams must be run independently (in different processes). Therefore the main requirement for the component is to process both streams independently (in parallel).
To achieve the required parallelism, a transformer starts a separate Task
process that consumes a stream and reports results to the transformer. The transformer creates new streams. When a client needs an element of a stream, the client calls the transformer to get it.
A transformer stores data from tasks and sends them to clients. It also handles the communication between processes: it can pause and resume tasks and clients when there is no data, or when there is too much data.
The Mixer
and Splitter
components work similarly. Under the hood, both components are just particular cases of the generic GenMix
component, that consumes several steams, mixes them, and produces other steams as output.
Here is an example of using a Mixer
and a Splitter
with the flow defined before:
mixer = Mixer.new(
[:stream1, :stream2], # streams to mix
:mixed_stream # output stream
) |> Mixer.start()
%{mixed_stream: stream} = Mixer.call(flow, mixer) # a new flow now has only the ":mixed_stream"
splitter = Splitter.new(
:mixed_stream, # stream to split
%{ # define outputs
odd: &(rem(&1, 2) == 1), # odd numbers go here
even: &(rem(&1, 2) == 2) # even numbers go here
}
) |> Splitter.start()
%{odd: odd, even: even} = Splitter.call(%{mixed_stream: stream}, splitter) # new streams are created from ":mixed_stream"
Source
and Sink
also start a separate task to read and write data. The only difference is that Source
and Sink
can work only with one stream in a flow. It’s an intentional limitation since the components work with “an external world”, so proper isolation is desirable.
The fact that all the user-defined functions are executed in separate tasks enables a good fault tolerance property for components. The tasks are supervised by theStrom.TaskSupervisor
supervisor and are restarted after a failure. Some events may disappear, however, components will continue processing streams.
Also, since Tasks eagerly consume streams, a Strom-based application always actively polls sources and pushes data to sinks. The “back-pressure” behavior is achieved by the limited size of components’ storage — when a component is full, the related tasks stop providing data.
Linear composition of components
Since all the components have similar interfaces, they can be easily connected. Let’s continue the previous examples and create a Composite
which will consume the original flow (stream1
and stream2
), mixes them into :mixed_stream
, then applies the &(&1 * 3)
, and then splits it into the :odd
and :even
streams.
components = [
Mixer.new([:stream1, :stream2], :mixed_stream),
Transformer.new(:mixed_stream, &(&1 * 3)),
Splitter.new(:mixed_stream, %{odd: &(rem(&1, 2) == 1), even: &(rem(&1, 2) == 2)})
]
composite =
components
|> Composite.new()
|> Composite.start()
%{odd: odd, even: even} = Composite.call(flow, composite)
To build a composite one just puts all the components into a list — a one-dimensional data structure, that is why I call it “linear composition”.
A composite has the same interface as components, so further composition is possible — one can create composites from composites!
composite = Composite.new([composite1, composite2, ...])
Thus very complex topologies can be easily represented.
This fact enables the creation of reusable generic composites and the representation of stream processing topologies of arbitrary complexity!
Check the project here, there are examples in the Readme file and in the test/examples folder.
Also, check the file_alarm project. I’ll write a separate article about it soon.
Thank you for reading.