Skip to main content

Kafka

Publishes messages to Apache Kafka topics with support for SASL authentication, TLS encryption, and message compression.

Overview

The Apache Kafka output connector for Monad enables you to publish data directly to Kafka topics. It supports multiple security protocols (SASL_PLAINTEXT, SASL_SSL, SSL), SASL authentication mechanisms (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512), mutual TLS (mTLS), message compression, and configurable acknowledgment levels. The connector batches messages at the Monad level before writing them to Kafka.

Requirements

To configure Kafka as an output destination, you need:

  1. A running Kafka cluster with at least one accessible broker.
  2. A pre-existing Kafka topic to publish messages to. The connector will verify that the topic exists during the connection test — it will not create the topic automatically.
  3. Network access from Monad to your Kafka brokers on the configured port (typically 9092 for plaintext, 9093 for TLS).
  4. Credentials (if using SASL authentication): a username and password with produce permissions on the target topic.
  5. Certificates (if using SSL/TLS): CA certificate in PEM format, and optionally a client certificate and key for mutual TLS.

Configuration

The following configuration defines the input parameters for the Kafka output connector.

Settings

SettingTypeRequiredDefaultDescription
Bootstrap ServersstringYesComma-separated list of Kafka broker addresses in host:port format (e.g., broker1:9092,broker2:9092).
TopicstringYesThe Kafka topic to publish messages to.
Security ProtocolstringYesNONESecurity protocol for broker connections. See Security Protocols below.
SASL MechanismstringConditionalSASL authentication mechanism. Required when using SASL_PLAINTEXT or SASL_SSL. See SASL Mechanisms below.
UsernamestringConditionalUsername for SASL authentication. Required when using a SASL security protocol.
Compression TypestringNononeCompression codec applied to messages. See Compression Types below.
Message Key FieldstringNoJSON field path (gjson syntax) to extract as the Kafka message key (e.g., user.id). If the field is not found in a message, the batch will fail.
AcknowledgmentsstringNoallAcknowledgment level for produced messages. See Acknowledgment Levels below.
RetriesintegerNo3Number of retry attempts for failed writes.
HeadersarrayNoStatic key-value headers to attach to every Kafka message.

Batch Config

SettingTypeRequiredDefaultMinMaxDescription
Batch Record CountintegerNo10001100000Maximum number of records per batch.
Batch Data SizebytesNo1 MiB1 KiB10 MiBMaximum data size per batch.
Publish RatesecondsNo5160Maximum seconds before flushing a batch.

Secrets

SecretRequiredDescription
PasswordConditionalPassword or API secret for SASL authentication. Required for SASL protocols.
CA CertificateConditionalCA certificate in PEM format for TLS verification. Required for SSL and SASL_SSL protocols.
Client CertificateNoClient certificate in PEM format for mutual TLS (mTLS) authentication.
Client KeyNoClient private key in PEM format for mutual TLS (mTLS) authentication.

Security Protocols

ValueDescription
NONENo authentication or encryption. Suitable for internal or development environments only.
SASL_PLAINTEXTSASL authentication over an unencrypted connection. Requires SASL Mechanism, Username, and Password.
SASL_SSLSASL authentication over a TLS-encrypted connection. Requires SASL Mechanism, Username, Password, and CA Certificate.
SSLTLS encryption without SASL authentication. Requires CA Certificate. Optionally supports mTLS with a Client Certificate and Client Key.

SASL Mechanisms

ValueDescription
PLAINSimple username/password authentication. Credentials are sent in plaintext — use with SSL for security.
SCRAM-SHA-256Challenge-response authentication using SHA-256. More secure than PLAIN.
SCRAM-SHA-512Challenge-response authentication using SHA-512. Highest security among available mechanisms.

Compression Types

ValueDescription
noneNo compression. Lowest CPU overhead.
gzipHigher compression ratio, higher CPU usage. Good for compressible text data.
snappyModerate compression ratio, low CPU overhead. Good default for high-throughput topics.
lz4Very fast compression and decompression with moderate compression ratio.
zstdBest compression ratio with low CPU overhead. Recommended for most use cases.

Acknowledgment Levels

ValueDescription
0No acknowledgment. Highest throughput, no durability guarantee. Messages may be lost if the broker crashes.
1Leader-only acknowledgment. Message is written to the leader partition before acknowledging.
allAll in-sync replicas must acknowledge the write. Strongest durability guarantee. Default and recommended setting.

Message Keys

When a Message Key Field is configured, the connector extracts the specified JSON field from each message using dot notation syntax and uses its value as the Kafka message key.

Message keys determine which Kafka partition a message is routed to. Messages with the same key are always sent to the same partition, preserving ordering for that key.

Example: If your messages look like {"user": {"id": "abc123", "name": "Alice"}}, setting Message Key Field to user.id will use abc123 as the Kafka message key.

Important: If the specified key field is not present in a message, the entire batch will fail. Ensure the field exists in all messages before enabling this setting.

Static Headers

The Headers setting allows you to attach static key-value metadata to every message produced. Headers are useful for routing, filtering, or adding context to messages consumed downstream.

Each header requires:

  • Key (string, required): The header name.
  • Value (string, required): The header value.

Mutual TLS (mTLS)

For environments that require client authentication in addition to server authentication, provide both a Client Certificate and Client Key alongside the CA Certificate. The client certificate must be signed by a CA that the broker trusts.

Troubleshooting

Topic does not exist

  • The connection test verifies that the configured topic exists on the broker. Create the topic before starting the pipeline.
  • Error: topic "my-topic" does not exist on broker

Failed to connect to broker

  • Verify the broker addresses in Bootstrap Servers are correct and reachable from Monad's network.
  • Check firewall rules and security group settings.
  • Ensure the port is correct (typically 9092 for plaintext, 9093 for TLS).

SASL authentication failures

  • Confirm the SASL Mechanism matches what your broker is configured to accept.
  • Verify the username and password are correct.
  • When using SASL_PLAINTEXT, ensure the broker is configured to allow SASL on a plaintext listener.

TLS/SSL certificate errors

  • Ensure the CA Certificate is in PEM format and corresponds to the CA that signed the broker's certificate.
  • For mTLS, verify the Client Certificate and Client Key are a matching pair.
  • Check that the certificate has not expired.

Message key field not found

  • The connector returns an error and fails the batch if the configured Message Key Field is missing from any message.
  • Verify the gjson path is correct for your message schema. Use dot notation for nested fields (e.g., metadata.id).

High latency / low throughput

  • Tune the Batch Record Count and Publish Rate settings to balance latency and throughput for your use case.
  • Consider enabling compression (e.g., zstd or snappy) to reduce network I/O.
  • Set Acknowledgments to 1 if you can tolerate reduced durability guarantees in exchange for higher throughput.