Links

Using Lua, Stream Processors, and More

n the case the built-in parsers or filters are not powerful enough, you can also employ the most powerful aspects of Fluent Bit: Lua and Stream Processing. To do this Calyptia Core supports referencing additional files that are referred to in these external files:

Lua

The lua filter is extremely powerful and allows for maximum versatility when modifying or performing logical altercations to streams of data. To view documentation about the lua filter be sure to check fluent bit for use cases and more documentation

Adding a lua filter to the pipeline

First we need to specify the Lua filter within our Pipeline configuration file. Instead of referring to the raw file we will add the reference {{ files.luacheck }} We've created the following below:
pipeline-es.conf
[INPUT]
Name forward
Host 0.0.0.0
Port 24284
[FILTER]
Name lua
Script {{ files.luacheck }}
Call luacheck
Match es
[OUTPUT]
Name es
Match *
Host hostname.us-east-2.es.amazonaws.com
HTTP_User es
HTTP_Passwd {{secrets.es_http_passwd}}
Port 443
TLS on
We will then define the luacheck file as required by Fluent Bit and return the record with any modifications along with the tag, and timestamp.
luacheck
luacheck(tag, timestamp, record)
new_record = record
new_record["checked"] = "true"
return 2, timestamp, new_record
end
To deploy the new pipeline configuration file we update the pipeline configuration and add a new file to it to refer to the Lua file we created above.
calyptia update pipeline $PIPELINE --config pipeline-es.conf
calyptia create pipeline_file --file luacheck --pipeline $PIPELINE
Now any logs that come are processed with this Lua filter will have the attribute checked with the value "true".

Stream Processing

In some cases, you may need to perform certain aggregations of data from streams of data in order to reduce incoming data traffic or derive insights at the edge. Some use cases of this stream processing include the following:
  1. 1.
    Group all incoming Nginx log by HTTP code
  2. 2.
    Snapshot surrounding messages when an "error" is found
  3. 3.
    Calculate the average, maximum, and minimum of a response time from a log message

Defining a streams file

Calyptia Core automatically configures a stream file by default for every pipeline and you only need to define new Stream Tasks on top of incoming data. For a full list of directives and capabilities of the stream processor be sure to check Fluent Bit documentation
The following stream's file counts the number of records in a 5 second period.
streams
[STREAM_TASK]
Name count_5_second
Exec CREATE STREAM count5 WITH (tag='count') AS SELECT COUNT(*) FROM TAG:'foobar' WINDOW TUMBLING (5 SECOND);
Now we add this file as we did before for the Lua script:
calyptia create pipeline_file --file streams --pipeline $PIPELINE