Calyptia Core Agent
22.10
22.10
  • Calyptia Fluent Bit v22.10 Documentation
  • Differences with Open Source
  • Performance and Benchmarking
  • Concepts
    • Key Concepts
    • Buffering
    • Data Pipeline
      • Input
      • Parser
      • Filter
      • Buffer
      • Router
      • Output
  • Installation
    • Getting Started with Calyptia Fluent Bit
    • Supported Platforms
    • Linux Packages
      • Amazon Linux
      • Redhat / CentOS
      • Debian
      • Ubuntu
    • Docker
    • Kubernetes
    • Windows
  • Administration
    • Configuring Calyptia Fluent Bit
      • Classic mode
        • Format and Schema
        • Configuration File
        • Variables
        • Commands
        • Upstream Servers
        • Record Accessor
      • Unit Sizes
      • Multiline Parsing
    • Transport Security
    • Buffering & Storage
    • Backpressure
    • Scheduling and Retries
    • Networking
    • Memory Management
    • Monitoring
    • HTTP Proxy
    • Troubleshooting
  • Local Testing
    • Validating your Data and Structure
    • Running a Logging Pipeline Locally
  • Data Pipeline
    • Inputs
      • Collectd
      • CPU Log Based Metrics
      • Disk I/O Log Based Metrics
      • Docker Log Based Metrics
      • Docker Events
      • Dummy
      • Exec
      • Exec Wasi
      • Fluent Bit Metrics
      • Forward
      • Head
      • HTTP
      • Health
      • Kernel Logs
      • Memory Metrics
      • MQTT
      • Network I/O Log Based Metrics
      • NGINX Exporter Metrics
      • Node Exporter Metrics
      • Process Log Based Metrics
      • Prometheus Scrape Metrics
      • Random
      • Serial Interface
      • Standard Input
      • StatsD
      • Syslog
      • Systemd
      • Tail
      • TCP
      • Thermal
      • OpenTelemetry
      • Wasm Input pulgin for developers
      • Windows Event Log
      • Windows Event Log (winevtlog)
      • Windows Exporter Metrics
    • Parsers
      • Configuring Parser
      • JSON
      • Regular Expression
      • LTSV
      • Logfmt
      • Decoders
    • Filters
      • AWS Metadata
      • CheckList
      • ECS Metadata
      • Expect
      • GeoIP2 Filter
      • Grep
      • Kubernetes
      • Lua
      • Parser
      • Record Modifier
      • Modify
      • Multiline
      • Nest
      • Nightfall
      • Rewrite Tag
      • Standard Output
      • Throttle
      • Tensorflow
      • Wasm
      • Wasm filter plugin for developers
    • Outputs
      • Amazon CloudWatch
      • Amazon Kinesis Data Firehose
      • Amazon Kinesis Data Streams
      • Amazon S3
      • Azure Blob
      • Azure Data Explorer
      • Azure Log Analytics
      • Counter
      • Datadog
      • Elasticsearch
      • File
      • FlowCounter
      • Forward
      • GELF
      • Golang Output plugin for developers
      • Google Cloud BigQuery
      • HTTP
      • InfluxDB
      • Kafka
      • Kafka REST Proxy
      • LogDNA
      • Loki
      • NATS
      • New Relic
      • NULL
      • Observe
      • OpenSearch
      • OpenTelemetry
      • PostgreSQL
      • Prometheus Exporter
      • Prometheus Remote Write
      • SkyWalking
      • Slack
      • Splunk
      • Stackdriver
      • Standard Output
      • Syslog
      • TCP & TLS
      • Treasure Data
      • WebSocket
Powered by GitBook
On this page
  • Configuration Parameters
  • Getting Started
  • Command Line
  • Configuration File
  • Avro Support
  1. Data Pipeline
  2. Outputs

Kafka

PreviousInfluxDBNextKafka REST Proxy

Last updated 2 years ago

Kafka output plugin allows to ingest your records into an service. This plugin use the official (built-in dependency)

Configuration Parameters

Key
Description
default

format

Specify data format, options available: json, msgpack.

json

message_key

Optional key to store the message

message_key_field

If set, the value of Message_Key_Field in the record will indicate the message key. If not set nor found in the record, Message_Key will be used (if set).

timestamp_key

Set the key to store the record timestamp

@timestamp

timestamp_format

'iso8601' or 'double'

double

brokers

Single of multiple list of Kafka Brokers, e.g: 192.168.1.3:9092, 192.168.1.4:9092.

topics

Single entry or list of topics separated by comma (,) that Calyptia Fluent Bit will use to send messages to Kafka. If only one topic is set, that one will be used for all records. Instead if multiple topics exists, the one set in the record by Topic_Key will be used.

fluent-bit

topic_key

If multiple Topics exists, the value of Topic_Key in the record will indicate the topic to use. E.g: if Topic_Key is router and the record is {"key1": 123, "router": "route_2"}, Calyptia Fluent Bit will use topic route_2. Note that if the value of Topic_Key is not present in Topics, then by default the first topic in the Topics list will indicate the topic to be used.

dynamic_topic

adds unknown topics (found in Topic_Key) to Topics. So in Topics only a default topic needs to be configured

Off

queue_full_retries

Calyptia Fluent Bit queues data into rdkafka library, if for some reason the underlying library cannot flush the records the queue might fills up blocking new addition of records. The queue_full_retries option set the number of local retries to enqueue the data. The default value is 10 times, the interval between each retry is 1 second. Setting the queue_full_retries value to 0 set's an unlimited number of retries.

10

rdkafka.{property}

Setting rdkafka.log.connection.close to false and rdkafka.request.required.acks to 1 are examples of recommended settings of librdfkafka properties.

Getting Started

In order to insert records into Apache Kafka, you can run the plugin from the command line or through the configuration file:

Command Line

The kafka plugin, can read the parameters from the command line in two ways, through the -p argument (property), e.g:

$ calyptia-fluent-bit -i cpu -o kafka -p brokers=192.168.1.3:9092 -p topics=test

Configuration File

In your main configuration file append the following Input & Output sections:

[INPUT]
    Name  cpu

[OUTPUT]
    Name        kafka
    Match       *
    Brokers     192.168.1.3:9092
    Topics      test

Avro Support

Fluent-bit comes with support for avro encoding for the out_kafka plugin. Avro support is optional and must be activated at build-time by using a build def with cmake: -DFLB_AVRO_ENCODER=On such as in the following example which activates:

  • out_kafka with avro encoding

  • fluent-bit's prometheus

  • metrics via an embedded http endpoint

  • debugging support

  • builds the test suites

cmake -DFLB_DEV=On -DFLB_OUT_KAFKA=On -DFLB_TLS=On -DFLB_TESTS_RUNTIME=On -DFLB_TESTS_INTERNAL=On -DCMAKE_BUILD_TYPE=Debug -DFLB_HTTP_SERVER=true -DFLB_AVRO_ENCODER=On ../

Kafka Configuration File with Avro Encoding

This is example fluent-bit config tails kubernetes logs, decorates the log lines with kubernetes metadata via the kubernetes filter, and then sends the fully decorated log lines to a kafka broker encoded with a specific avro schema.

[INPUT]
    Name              tail
    Tag               kube.*
    Alias             some-alias
    Path              /logdir/*.log
    DB                /dbdir/some.db
    Skip_Long_Lines   On
    Refresh_Interval  10
    Parser some-parser

[FILTER]
    Name                kubernetes
    Match               kube.*
    Kube_URL            https://some_kube_api:443
    Kube_CA_File        /certs/ca.crt
    Kube_Token_File     /tokens/token
    Kube_Tag_Prefix     kube.var.log.containers.
    Merge_Log           On
    Merge_Log_Key       log_processed

[OUTPUT]
    Name        kafka
    Match       *
    Brokers     192.168.1.3:9092
    Topics      test
    Schema_str  {"name":"avro_logging","type":"record","fields":[{"name":"timestamp","type":"string"},{"name":"stream","type":"string"},{"name":"log","type":"string"},{"name":"kubernetes","type":{"name":"krec","type":"record","fields":[{"name":"pod_name","type":"string"},{"name":"namespace_name","type":"string"},{"name":"pod_id","type":"string"},{"name":"labels","type":{"type":"map","values":"string"}},{"name":"annotations","type":{"type":"map","values":"string"}},{"name":"host","type":"string"},{"name":"container_name","type":"string"},{"name":"docker_id","type":"string"},{"name":"container_hash","type":"string"},{"name":"container_image","type":"string"}]}},{"name":"cluster_name","type":"string"},{"name":"fabric","type":"string"}]}
    Schema_id some_schema_id
    rdkafka.client.id some_client_id
    rdkafka.debug All
    rdkafka.enable.ssl.certificate.verification true

    rdkafka.ssl.certificate.location /certs/some.cert
    rdkafka.ssl.key.location /certs/some.key
    rdkafka.ssl.ca.location /certs/some-bundle.crt
    rdkafka.security.protocol ssl
    rdkafka.request.required.acks 1
    rdkafka.log.connection.close false

    Format avro
    rdkafka.log_level 7
    rdkafka.metadata.broker.list 192.168.1.3:9092

{property} can be any

Apache Kafka
librdkafka C library
librdkafka properties