Calyptia Core Agent
Support Portal
24.4
24.4
  • Calyptia Core Agent Documentation
  • Comparison to Fluent Bit
  • Performance and Benchmarking
  • Concepts
    • Key Concepts
    • Buffering
    • Data Pipeline
      • Input
      • Parser
      • Filter
      • Buffer
      • Router
      • Output
  • Installation
    • Getting Started with Calyptia Core Agent
    • Supported Platforms
    • Linux
      • RHEL-based
      • Debian-based
    • Docker
    • Kubernetes
    • macOS
    • Windows
  • Administration
    • Configuring Calyptia Core Agent
      • Classic mode
        • Format and Schema
        • Configuration File
        • Variables
        • Commands
        • Upstream Servers
        • Record Accessor
      • YAML Configuration File
      • Unit Sizes
      • Multiline Parsing
    • Transport Security
    • Buffering & Storage
    • Backpressure
    • Scheduling and Retries
    • Networking
    • Memory Management
    • Monitoring
    • HTTP Proxy
    • Hot Reload
    • Troubleshooting
  • Local Testing
    • Validating your Data and Structure
    • Running a Logging Pipeline Locally
  • Data Pipeline
    • Inputs
      • Collectd
      • CPU Log Based Metrics
      • Disk I/O Log Based Metrics
      • Docker Log Based Metrics
      • Docker Events
      • Dummy
      • Elasticsearch
      • Exec
      • Exec Wasi
      • Fluent Bit Metrics
      • Forward
      • Head
      • HTTP
      • Kafka
      • Health
      • Kernel Logs
      • Memory Metrics
      • MQTT
      • Network I/O Log Based Metrics
      • NGINX Exporter Metrics
      • Node Exporter Metrics
      • Podman Metrics
      • Process Log Based Metrics
      • Prometheus Scrape Metrics
      • Random
      • Serial Interface
      • Standard Input
      • StatsD
      • Syslog
      • Systemd
      • Tail
      • TCP
      • Thermal
      • OpenTelemetry
      • Windows Event Log
      • Windows Event Log (winevtlog)
      • Windows Exporter Metrics
    • Parsers
      • Configuring Parser
      • JSON
      • Regular Expression
      • LTSV
      • Logfmt
      • Decoders
    • Filters
      • AWS Metadata
      • CheckList
      • ECS Metadata
      • Expect
      • GeoIP2 Filter
      • Grep
      • Kubernetes
      • Log to Metrics
      • Lua
      • Parser
      • Record Modifier
      • Modify
      • Multiline
      • Nest
      • Nightfall
      • Rewrite Tag
      • Standard Output
      • Throttle
      • Tensorflow
      • Wasm
    • Outputs
      • Amazon CloudWatch
      • Amazon Kinesis Data Firehose
      • Amazon Kinesis Data Streams
      • Amazon S3
      • Azure Blob
      • Azure Data Explorer
      • Azure Log Analytics
      • Counter
      • Datadog
      • Elasticsearch
      • File
      • FlowCounter
      • Forward
      • GELF
      • Google Cloud BigQuery
      • HTTP
      • InfluxDB
      • Kafka
      • Kafka REST Proxy
      • LogDNA
      • Loki
      • NATS
      • New Relic
      • NULL
      • Observe
      • OpenSearch
      • OpenTelemetry
      • PostgreSQL
      • Prometheus Exporter
      • Prometheus Remote Write
      • SkyWalking
      • Slack
      • Splunk
      • Stackdriver
      • Standard Output
      • Syslog
      • TCP & TLS
      • Treasure Data
      • Vivo Exporter
      • WebSocket
  • Calyptia Core Agent for Developers
    • Golang Output Plugins
    • WASM Filter Plugins
    • WASM Input Plugins
Powered by GitBook
On this page
  • Configuration Parameters
  • Getting Started
  • Command Line
  • Configuration File
  1. Data Pipeline
  2. Inputs

Kafka

PreviousHTTPNextHealth

Last updated 1 year ago

The Kafka input plugin allows subscribing to one or more Kafka topics to collect messages from an service. This plugin uses the official (built-in dependency).

Configuration Parameters

Key
Description
default

brokers

Single or multiple list of Kafka Brokers, e.g: 192.168.1.3:9092, 192.168.1.4:9092.

topics

Single entry or list of topics separated by comma (,) that Calyptia Fluent Bit will subscribe to.

client_id

Client id passed to librdkafka.

group_id

Group id passed to librdkafka.

fluent-bit

poll_ms

Kafka brokers polling interval in milliseconds.

500

rdkafka.{property}

Getting Started

In order to subscribe/collect messages from Apache Kafka, you can run the plugin from the command line or through the configuration file:

Command Line

The kafka plugin can read parameters through the -p argument (property), e.g:

$ calyptia-fluent-bit -i kafka -o stdout -p brokers=192.168.1.3:9092 -p topics=some-topic

Configuration File

In your main configuration file append the following Input & Output sections:

[INPUT]
    Name        kafka
    Brokers     192.168.1.3:9092
    Topics      some-topic
    poll_ms     100

[OUTPUT]
    Name        stdout

Example of using kafka input/output plugins

The calyptia-fluent-bit source repository contains a full example of using calyptia-fluent-bit to process kafka records:

[INPUT]
    Name kafka
    brokers kafka-broker:9092
    topics fb-source
    poll_ms 100

[FILTER]
    Name    lua
    Match   *
    script  kafka.lua
    call    modify_kafka_message

[OUTPUT]
    Name kafka
    brokers kafka-broker:9092
    topics fb-sink

The above will connect to the broker listening on kafka-broker:9092 and subscribe to the fb-source topic, polling for new messages every 100 milliseconds.

Every message received is then processed with kafka.lua and sent back to the fb-sink topic of the same broker.

The example can be executed locally with make start in the examples/kafka_filter directory (docker/compose is used).

{property} can be any

Apache Kafka
librdkafka C library
librdkafka properties