Kafka Importer#
| Description | Kafka Importer consumes Kafka messages and writes them into EDA state. |
| Author | Nokia |
| Catalog | nokia-eda/catalog |
| Language | Go |
Overview#
The Kafka Importer app subscribes to Kafka topics, optionally filters messages with CEL, renders EDA paths and payloads with Go templates, and writes the result into the EDA state database.
The app provides two resources:
Consumer: namespace-scoped consumer intended for a user namespaceClusterConsumer: intended for use from the EDA base namespace for centralized imports
Each consumer:
- connects to one or more Kafka brokers
- joins a Kafka consumer group using
clientName - consumes from a single topic
- optionally applies SASL and TLS settings
- optionally filters messages with
spec.condition - renders
spec.pathandspec.data - updates EDA state with the rendered output
Installation#
The Kafka Importer app can be installed using EDA Store or by running an AppInstaller with kubectl:
Install Settings#
The app supports install-time settings through spec.apps[].appSettings in the AppInstaller.
Available settings:
cpuLimit: CPU limit for the controller pod. Default:"1"memoryLimit: memory limit for the controller pod. Default:"1Gi"
The deployment keeps requests fixed at 500m CPU and 500Mi memory.
Example Install With Settings#
cat << 'EOF' | kubectl apply -f -
apiVersion: appstore.eda.nokia.com/v1
kind: AppInstaller
metadata:
name: kafka-importer-install-sized
namespace: eda-system
spec:
operation: install
apps:
- appId: kafkaimporter.eda.nokia.com
catalog: qa-eda-catalog
version:
value: v1.0.0
appSettings:
cpuLimit: "2"
memoryLimit: 2Gi
EOF
Getting Started#
Before creating a consumer, make sure the Kafka brokers, topic, and any referenced Secrets or ConfigMaps already exist.
Namespace behavior:
Consumeris created in a user namespace and its rendered path is normalized into that namespace automaticallyClusterConsumeris intended to be created in the EDA base namespace and writes to the rendered path as-is- despite its name,
ClusterConsumeris currently used as an EDA base namespace resource rather than as a true cluster-scoped Kubernetes object
Consumer Resources#
Use Consumer when you want to import Kafka data into a single namespace.
Important fields:
spec.brokers: list of Kafka broker addressesspec.topic: topic to consumespec.clientName: consumer-group name. If omitted, defaults toeda-consumer-<namespace>-<name>spec.offsetResetPolicy:latest,earliest, orresumespec.deliveryMode:AtMostOnceorAtLeastOncespec.condition: optional CEL filter expressionspec.path: Go template used to compute the EDA pathspec.data: Go template used to compute the JSON payload written to EDA state
Template Data Model#
The Go templates are evaluated against the consumed Kafka message JSON, exposed under the root variable .msg (matching the CEL msg variable used by spec.condition).
Available fields in templates:
.msg.key: Kafka key.msg.value: Kafka value.msg.is_tombstone: whether the Kafka message had a null value
The key may be:
- a plain string, accessible as
.msg.key - a JSON object, accessible with fields such as
.msg.key.user_id
For namespace-scoped Consumer resources, the rendered path is normalized into the consumer namespace. For example, if your template renders .interfaces[name="eth0"], the controller writes it under .namespace{.name=="<consumer namespace>"}.interfaces[name="eth0"].
CEL Filtering#
If spec.condition is set, the message is evaluated as CEL before any state update.
CEL sees the message as:
msg.keymsg.valuemsg.is_tombstone
Typical examples:
msg.value.status == "active"!msg.is_tombstonemsg.key.region == "us-west" && msg.value.severity >= 3
If the condition evaluates to false, the message is skipped.
Security Options#
The consumer supports:
- SASL
plain,scram-sha-256,scram-sha-512, andoauthbearer - TLS from certificate files
- TLS from a Secret containing
tls.crt,tls.key, and optionalca.crt - trust-bundle verification via a ConfigMap containing
trust-bundle.pem
Validation notes for security:
tokenUrlmust be set when using SASLoauthbearer- only one of
tls.fromSecretortls.fromFilescan be set
Example Consumer#
apiVersion: kafkaimporter.eda.nokia.com/v1alpha1
kind: Consumer
metadata:
name: interface-events
namespace: eda
spec:
brokers:
- kafka.kafka.svc.cluster.local:9092
topic: interface-events
offsetResetPolicy: earliest
deliveryMode: AtLeastOnce
condition: 'msg.value.oper_state == "down" && !msg.is_tombstone'
path: '.interfaces[name="{{ .msg.key.interface_name }}"]'
data: '{{ .msg.value }}'
cat << 'EOF' | kubectl apply -f -
apiVersion: kafkaimporter.eda.nokia.com/v1alpha1
kind: Consumer
metadata:
name: interface-events
namespace: eda
spec:
brokers:
- kafka.kafka.svc.cluster.local:9092
topic: interface-events
offsetResetPolicy: earliest
deliveryMode: AtLeastOnce
condition: 'msg.value.oper_state == "down" && !msg.is_tombstone'
path: '.interfaces[name="{{ .msg.key.interface_name }}"]'
data: '{{ .msg.value }}'
EOF
ClusterConsumer#
Use ClusterConsumer when you want a centrally managed consumer from the EDA base namespace.
Behavior differences from Consumer:
- it is intended to run from the EDA base namespace
- the rendered path is written exactly as rendered, without namespace normalization
- this allows writing directly to fully qualified paths such as
.namespace{.name=="eda"}.alerts[...]
Example ClusterConsumer#
apiVersion: kafkaimporter.eda.nokia.com/v1alpha1
kind: ClusterConsumer
metadata:
name: central-alert-import
namespace: eda-system
spec:
brokers:
- kafka.kafka.svc.cluster.local:9092
topic: alerts
condition: 'msg.value.severity >= 3 && !msg.is_tombstone'
path: '.namespace{.name=="eda"}.alerts[id="{{ .msg.key.alert_id }}"]'
data: '{{ .msg.value }}'
cat << 'EOF' | kubectl apply -f -
apiVersion: kafkaimporter.eda.nokia.com/v1alpha1
kind: ClusterConsumer
metadata:
name: central-alert-import
namespace: eda-system
spec:
brokers:
- kafka.kafka.svc.cluster.local:9092
topic: alerts
condition: 'msg.value.severity >= 3 && !msg.is_tombstone'
path: '.namespace{.name=="eda"}.alerts[id="{{ .msg.key.alert_id }}"]'
data: '{{ .msg.value }}'
EOF
Status#
Both Consumer and ClusterConsumer report runtime status through:
status.connectedstatus.messagestatus.lastErrorstatus.lastSeenstatus.offsets
The controller updates status while the consumer runs, including per-partition offsets and the last successful message time.
Configuration Notes#
When creating a consumer, keep these rules in mind:
brokersmust contain at least one addresstopicis requiredpathanddatamust be valid Go templatesconditionmust be a valid CEL boolean expression- for SASL
oauthbearer,tokenUrlis required tls.fromSecretandtls.fromFilescannot be used together
Known Limitation#
The app does not currently raise a dedicated EDA alarm for Kafka connectivity failures. Connection and processing issues are reflected through resource status fields instead.