Using Lua, Stream Processors, and More

n the case the built-in parsers or filters are not powerful enough, you can also employ the most powerful aspects of Fluent Bit: Lua and Stream Processing. To do this Calyptia Core supports referencing additional files that are referred to in these external files:

Lua

The lua filter is extremely powerful and allows for maximum versatility when modifying or performing logical altercations to streams of data. To view documentation about the lua filter be sure to check fluent bit for use cases and more documentation

Adding a lua filter to the pipeline

First we need to specify the Lua filter within our Pipeline configuration file. Instead of referring to the raw file we will add the reference {{ files.luacheck }} We've created the following below:

pipeline-es.conf

[INPUT]
    Name          forward
    Host          0.0.0.0
    Port          24284

[FILTER]
    Name lua
    Script {{ files.luacheck }}
    Call   luacheck
    Match  es

[OUTPUT]
    Name        es
    Match       *
    Host        hostname.us-east-2.es.amazonaws.com
    HTTP_User   es
    HTTP_Passwd {{secrets.es_http_passwd}}
    Port        443
    TLS         on

We will then define the luacheck file as required by Fluent Bit and return the record with any modifications along with the tag, and timestamp.

luacheck

luacheck(tag, timestamp, record)
    new_record = record
    new_record["checked"] = "true"
    return 2, timestamp, new_record
end

To deploy the new pipeline configuration file we update the pipeline configuration and add a new file to it to refer to the Lua file we created above.

calyptia update pipeline $PIPELINE --config pipeline-es.conf
calyptia create pipeline_file --file luacheck --pipeline $PIPELINE

Now any logs that come are processed with this Lua filter will have the attribute checked with the value "true".

Stream Processing

In some cases, you may need to perform certain aggregations of data from streams of data in order to reduce incoming data traffic or derive insights at the edge. Some use cases of this stream processing include the following:

  1. Group all incoming Nginx log by HTTP code

  2. Snapshot surrounding messages when an "error" is found

  3. Calculate the average, maximum, and minimum of a response time from a log message

Defining a streams file

Calyptia Core automatically configures a stream file by default for every pipeline and you only need to define new Stream Tasks on top of incoming data. For a full list of directives and capabilities of the stream processor be sure to check Fluent Bit documentation

The following stream's file counts the number of records in a 5 second period.

streams

[STREAM_TASK]
    Name count_5_second
    Exec CREATE STREAM count5 WITH (tag='count') AS SELECT COUNT(*) FROM TAG:'foobar' WINDOW TUMBLING (5 SECOND);

Now we add this file as we did before for the Lua script:

calyptia create pipeline_file --file streams --pipeline $PIPELINE

\

Last updated