Composable components for complex event processing

Anton Mishchuk
6 min readApr 26, 2024

--

Complex event processing (CEP) is an approach to analyzing real-time streams of events for identifying meaningful patterns, relationships, and structures in the data.

From the engineering point of view, most of the CEP problems can be formulated in the following way: several input streams should be properly processed to produce several output streams.

The article presents a theoretical foundation of the general approach to such problems — how to decompose the stream processing logic into simple composable parts. I consider a set of 5 basic components that have similar interfaces and thus can be easily composed into more complex (composite) components.

The details of the implementation using the Elixir programming language will be presented in the next article. However if one is eager to see it right now, check the Strom project.

Stream and basic operations on streams

Stream — a sequence of data made available over time. Many programming languages have such an abstraction and provide an interface for processing data streams.

Since a stream is just a data sequence without any predefined size (potentially infinite), the set of operations that can be performed on streams is quite limited. One can create a stream (read events from somewhere) destroy a stream (write events to somewhere), mix several streams into one, or split one stream into many. And, of course, one can also modify a stream — change events in a given stream or produce a new stream based on the input one.

Let’s introduce components for each action.

Basic components

The simplified notation for the operations is:

null -> source -> stream
stream -> sink -> null
{stream1, stream2} -> mixer -> {stream}
stream -> splitter -> {stream1, stream2}
stream -> transformer -> stream

Or in a more functional style:

source() -> stream
sink(stream) -> null
mixer({stream1, stream2}) -> stream
splitter(stream) -> {stream1, stream2}
transfomer(stream) -> stream

Unfortunately, since the operators have quite different interfaces, they can not be composed easily. Let’s consider an example with the following topology:

Topology T1

To have a more specific problem, one may think about two streams of numbers (SA, SB), that are summed up by pairs (in T) and then split into streams with odd and even numbers respectively (SX, SY).

Using the operations, one can present the topology in the following way:

SA = source(SrcA)
SB = source(SrcB)
SM = mixer(M, {SA, SB})
SM = transformer(T, SM)
{SX, SY} = splitter(S, SM)
sink(SinkX, SX)
sink(SinkY, SY)

Quite simple. However, there are two disadvantages of the approach.

First, with a more complex topology, it might be difficult to follow the imperative definition of the topology. Second, the interfaces of components are different. Some receive one stream as an input (transformer, splitter, sink). The mixer needs several streams, and sources don’t need any inputs. The same goes for outputs: e.g. the splitter produces several streams and sinks don’t produce any.

However with a proper abstraction on top of streams, namely, the set of streams, the interfaces of the components can be unified.

Flow

Let’s define a “flow” as a set of streams that can be addressed by name (Dictionary, HashMap, etc):

flow = %{foo: stream_foo, bar: stream_bar} 

There can be an empty flow — %{}.

Let’s redefine the interface of operators:

# source adds a stream to the flow
%{bar: Sbar} -> source(Src, :foo) -> %{foo: Sfoo, bar: Sbar}

# sink runs a stream (write data), so there is no more the stream in the flow
%{foo: Sfoo, bar: Sbar} -> sink(Snk, :foo) -> %{bar: Sbar}

# mixer mixes several streams into one
%{foo: Sfoo, bar: Sbar} -> mixer(M, [:foo, :bar], :mixed) -> %{mixed: Smixed}

# splitter splits one stream into several streams
%{foo: Sfoo} -> splitter(S, :foo, [:bar, :baz]) -> %{bar: Sbar, baz: Sbaz}

# transformer modifies a stream (or several streams)
%{bar: Sbar, foo: Sfoo} -> transformer(T, [:foo]) -> %{foo: T(Sfoo), bar: Sbar}

A couple of notes here:

  • The source can also accept an empty flow (%{})
  • Each operation needs a name(s) of the stream(s) in the flow to which it will be applied.

Here we go! Now, all the components take a “flow” data structure as input and return another “flow”. Therefore the operators can be composed in a simple “one-dimensional” (“linear”) way. For example, the topology T1 can be presented in the following way:

%{}
|> source(SrcA, :SA)
|> source(SrcB, :SB)
|> mixer(M, [:SB, :SA], :SM)
|> transformer(T, [:SM])
|> splitter(S, :SM, [:SX, :SY])
|> sink(SinkX, :SX)
|> sink(SinkY, :SY)

Here I use the pipe-operator notation (|>): the argument from the left side of the operator goes to the first position in the function from the right side:

x |> funtion(y) |> function(z)
# is equivalent to
function(function(x, y), z)

The sameness of the component interfaces allows the creation of composite components consisting of several basic ones. Or composite of composites and so on.

For our example, we can put the mixer, the transformer, and the splitter into one composite, which will take a flow with at least two streams and produce a flow with the other two streams:

composite = Comp([
mixer(M, [:SB, :SA], :SM),
transformer(T, [:SM]),
splitter(S, :SM, [:SX, :SY])
])

%{}
|> source(SrcA, :SA)
|> source(SrcB, :SB)
|> composite(Comp, [:SA, :SB])
|> sink(SinkX, :SX)
|> sink(SinkY, :SY)

Complex topologies

The great thing about the linear composition is that any 2D topology can be presented as a list of components. There might be mathematical proof for that, I think, however, I don’t have one.

Let’s just consider a more sophisticated example, to get an idea.

Topology T2
%{}
|> source(SrcA, :SA)
|> source(SrcB, :SB)
|> mixer(MixAB, [:SB, :SA], :SM)
|> transformer(F, [:SM])
|> source(SrcC, :SC)
|> transformer(Fc, [:SC])
|> source(SrcD, :SD)
|> transformer(Fd, [:SD])
|> splitter(S, :SM, [:SX, :SY])
|> mixer(MixCX, [:SX, :SC], :SCX)
|> mixer(MixDY, [:SY, :SD], :SDY)
|> transformer(Fcx, [:SCX])
|> transformer(Fdy, [:SDY])
|> sink(SinkX, :SCX)
|> sink(SinkY, :SDY)

Stream diagram

Having a linear representation of a topology is a good thing for programming, however, it might be hard to understand what exactly happens with each particular stream in the flow.

The “stream” diagram visualizes the stream transformations in a given topology. For the topology T1, one can draw the following diagram:

A stream diagram for the T1 topology

The flow goes from top to bottom (one can also draw it from left to right). If a component does something with a stream, there is a dot on the stream opposite to the component.

One can easily read what happens here:

  • There is an empty flow as an input.
  • Source SrcA adds the SA stream, and then SrcB adds the SB stream.
  • SA and SB are mixed into SM.
  • The transformation T is applied to SM.
  • SM is split into SX and SY.
  • SinkX writes the SX stream, and SinkY writes SY.
  • The output is an empty flow.

I suggest a reader to draw a diagram for the T2 topology.

Technical considerations

The flow abstraction enables the possibility of having composable components with a unified interface. This allows presenting a complex topology as a simple list of components transforming a given flow.

However, implementing the approach with a particular programming language is more challenging. There are several important requirements:

  • The stream abstraction must exist in the language.
  • The components must be able to process several streams in parallel. The language must provide primitives for parallel programming (or, at least, concurrent programming).
  • Stream events must be properly delivered to all the components in the topology: no deadlock or overflows.
  • Error tolerance is recommended — the processing should not stop if an error happens in a component.

BEAM seems to be the right tool for the implementation. I’ve used the Elixir programming language. See the Strom project. I’ll write a separate article with the details of the implementation.

Thank you for reading and your likes.

--

--