Stream Processing

Aggregation on top of incoming data

Stream Processing

In some cases, you may need to perform certain aggregations of data from streams of data in order to reduce incoming data traffic or derive insights at the edge. Some use cases of this stream processing include the following:

  1. Group all incoming Nginx log by HTTP code

  2. Snapshot surrounding messages when an "error" is found

  3. Calculate the average, maximum, and minimum of a response time from a log message

Defining a streams file

Calyptia Core automatically configures a stream file by default for every pipeline and you only need to define new Stream Tasks on top of incoming data. For a full list of directives and capabilities of the stream processor be sure to check Fluent Bit documentation

The following stream's file counts the number of records in a 5 second period.

streams

[STREAM_TASK]
    Name count_5_second
    Exec CREATE STREAM count5 WITH (tag='count') AS SELECT COUNT(*) FROM TAG:'foobar' WINDOW TUMBLING (5 SECOND);

Now we add this file as we did before for the Lua script:

calyptia create pipeline_file --file streams --pipeline $PIPELINE

Last updated