Stream Processing
Aggregation on top of incoming data
In some cases, you may need to perform certain aggregations of data from streams of data in order to reduce incoming data traffic or derive insights at the edge. Some use cases of this stream processing include the following:
- 1.Group all incoming Nginx log by HTTP code
- 2.Snapshot surrounding messages when an "error" is found
- 3.Calculate the average, maximum, and minimum of a response time from a log message
Calyptia Core automatically configures a stream file by default for every pipeline and you only need to define new Stream Tasks on top of incoming data. For a full list of directives and capabilities of the stream processor be sure to check Fluent Bit documentation
The following stream's file counts the number of records in a 5 second period.
streams
[STREAM_TASK]
Name count_5_second
Exec CREATE STREAM count5 WITH (tag='count') AS SELECT COUNT(*) FROM TAG:'foobar' WINDOW TUMBLING (5 SECOND);
Now we add this file as we did before for the Lua script:
calyptia create pipeline_file --file streams --pipeline $PIPELINE
Last modified 2mo ago