Kafka
Publishes messages to Apache Kafka topics with support for SASL authentication, TLS encryption, and message compression.
Overview
The Apache Kafka output connector for Monad enables you to publish data directly to Kafka topics. It supports multiple security protocols (SASL_PLAINTEXT, SASL_SSL, SSL), SASL authentication mechanisms (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512), mutual TLS (mTLS), message compression, and configurable acknowledgment levels. The connector batches messages at the Monad level before writing them to Kafka.
Requirements
To configure Kafka as an output destination, you need:
- A running Kafka cluster with at least one accessible broker.
- A pre-existing Kafka topic to publish messages to. The connector will verify that the topic exists during the connection test — it will not create the topic automatically.
- Network access from Monad to your Kafka brokers on the configured port (typically
9092for plaintext,9093for TLS). - Credentials (if using SASL authentication): a username and password with produce permissions on the target topic.
- Certificates (if using SSL/TLS): CA certificate in PEM format, and optionally a client certificate and key for mutual TLS.
Configuration
The following configuration defines the input parameters for the Kafka output connector.
Settings
| Setting | Type | Required | Default | Description |
|---|---|---|---|---|
| Bootstrap Servers | string | Yes | — | Comma-separated list of Kafka broker addresses in host:port format (e.g., broker1:9092,broker2:9092). |
| Topic | string | Yes | — | The Kafka topic to publish messages to. |
| Security Protocol | string | Yes | NONE | Security protocol for broker connections. See Security Protocols below. |
| SASL Mechanism | string | Conditional | — | SASL authentication mechanism. Required when using SASL_PLAINTEXT or SASL_SSL. See SASL Mechanisms below. |
| Username | string | Conditional | — | Username for SASL authentication. Required when using a SASL security protocol. |
| Compression Type | string | No | none | Compression codec applied to messages. See Compression Types below. |
| Message Key Field | string | No | — | JSON field path (gjson syntax) to extract as the Kafka message key (e.g., user.id). If the field is not found in a message, the batch will fail. |
| Acknowledgments | string | No | all | Acknowledgment level for produced messages. See Acknowledgment Levels below. |
| Retries | integer | No | 3 | Number of retry attempts for failed writes. |
| Headers | array | No | — | Static key-value headers to attach to every Kafka message. |
Batch Config
| Setting | Type | Required | Default | Min | Max | Description |
|---|---|---|---|---|---|---|
| Batch Record Count | integer | No | 1000 | 1 | 100000 | Maximum number of records per batch. |
| Batch Data Size | bytes | No | 1 MiB | 1 KiB | 10 MiB | Maximum data size per batch. |
| Publish Rate | seconds | No | 5 | 1 | 60 | Maximum seconds before flushing a batch. |
Secrets
| Secret | Required | Description |
|---|---|---|
| Password | Conditional | Password or API secret for SASL authentication. Required for SASL protocols. |
| CA Certificate | Conditional | CA certificate in PEM format for TLS verification. Required for SSL and SASL_SSL protocols. |
| Client Certificate | No | Client certificate in PEM format for mutual TLS (mTLS) authentication. |
| Client Key | No | Client private key in PEM format for mutual TLS (mTLS) authentication. |
Security Protocols
| Value | Description |
|---|---|
NONE | No authentication or encryption. Suitable for internal or development environments only. |
SASL_PLAINTEXT | SASL authentication over an unencrypted connection. Requires SASL Mechanism, Username, and Password. |
SASL_SSL | SASL authentication over a TLS-encrypted connection. Requires SASL Mechanism, Username, Password, and CA Certificate. |
SSL | TLS encryption without SASL authentication. Requires CA Certificate. Optionally supports mTLS with a Client Certificate and Client Key. |
SASL Mechanisms
| Value | Description |
|---|---|
PLAIN | Simple username/password authentication. Credentials are sent in plaintext — use with SSL for security. |
SCRAM-SHA-256 | Challenge-response authentication using SHA-256. More secure than PLAIN. |
SCRAM-SHA-512 | Challenge-response authentication using SHA-512. Highest security among available mechanisms. |
Compression Types
| Value | Description |
|---|---|
none | No compression. Lowest CPU overhead. |
gzip | Higher compression ratio, higher CPU usage. Good for compressible text data. |
snappy | Moderate compression ratio, low CPU overhead. Good default for high-throughput topics. |
lz4 | Very fast compression and decompression with moderate compression ratio. |
zstd | Best compression ratio with low CPU overhead. Recommended for most use cases. |
Acknowledgment Levels
| Value | Description |
|---|---|
0 | No acknowledgment. Highest throughput, no durability guarantee. Messages may be lost if the broker crashes. |
1 | Leader-only acknowledgment. Message is written to the leader partition before acknowledging. |
all | All in-sync replicas must acknowledge the write. Strongest durability guarantee. Default and recommended setting. |
Message Keys
When a Message Key Field is configured, the connector extracts the specified JSON field from each message using dot notation syntax and uses its value as the Kafka message key.
Message keys determine which Kafka partition a message is routed to. Messages with the same key are always sent to the same partition, preserving ordering for that key.
Example: If your messages look like {"user": {"id": "abc123", "name": "Alice"}}, setting Message Key Field to user.id will use abc123 as the Kafka message key.
Important: If the specified key field is not present in a message, the entire batch will fail. Ensure the field exists in all messages before enabling this setting.
Static Headers
The Headers setting allows you to attach static key-value metadata to every message produced. Headers are useful for routing, filtering, or adding context to messages consumed downstream.
Each header requires:
- Key (string, required): The header name.
- Value (string, required): The header value.
Mutual TLS (mTLS)
For environments that require client authentication in addition to server authentication, provide both a Client Certificate and Client Key alongside the CA Certificate. The client certificate must be signed by a CA that the broker trusts.
Troubleshooting
Topic does not exist
- The connection test verifies that the configured topic exists on the broker. Create the topic before starting the pipeline.
- Error:
topic "my-topic" does not exist on broker
Failed to connect to broker
- Verify the broker addresses in Bootstrap Servers are correct and reachable from Monad's network.
- Check firewall rules and security group settings.
- Ensure the port is correct (typically
9092for plaintext,9093for TLS).
SASL authentication failures
- Confirm the SASL Mechanism matches what your broker is configured to accept.
- Verify the username and password are correct.
- When using
SASL_PLAINTEXT, ensure the broker is configured to allow SASL on a plaintext listener.
TLS/SSL certificate errors
- Ensure the CA Certificate is in PEM format and corresponds to the CA that signed the broker's certificate.
- For mTLS, verify the Client Certificate and Client Key are a matching pair.
- Check that the certificate has not expired.
Message key field not found
- The connector returns an error and fails the batch if the configured Message Key Field is missing from any message.
- Verify the gjson path is correct for your message schema. Use dot notation for nested fields (e.g.,
metadata.id).
High latency / low throughput
- Tune the Batch Record Count and Publish Rate settings to balance latency and throughput for your use case.
- Consider enabling compression (e.g.,
zstdorsnappy) to reduce network I/O. - Set Acknowledgments to
1if you can tolerate reduced durability guarantees in exchange for higher throughput.