向 Apache Kafka 主题发布消息,入站消息data作为record value发送。
配置
Topic and key configuration(主题与key配置)
定义目标Kafka主题及可选的分区key以路由消息。
- Topic pattern — 发布消息的Kafka主题名称。支持 templates。
- Key pattern — 用于决定消息写入分区的partition key。支持模板。若无需特定分区,可留空。
Producer configuration(Producer配置)
配置Kafka producer的消息投递、批处理和可靠性行为。
-
Automatically retry times if fails — record发送失败时Kafka producer的重试次数(
retries)。producer会重试因临时错误失败的record。设为0禁用重试。 -
Produces batch size in bytes — Kafka producer的batch大小(
batch.size)。向同一分区发送多条record时,producer会尝试合并为更少的请求以提高吞吐量。batch大小为0时禁用批处理。 -
Time to buffer locally (ms) — Kafka producer的linger时间(
linger.ms)。producer在发送batch前等待以积累更多record的时间。设大于0会增加延迟,但有助于批处理。 -
Client buffer max size in bytes — Kafka producer的buffer内存(
buffer.memory)。producer可用于缓冲待发送record的总字节数。 -
Number of acknowledgments — producer认为请求完成前broker需收到的确认数(
acks)。 可用选项:- -1 (all) — producer等待所有in-sync副本确认record。持久性最强,但延迟最高。
- 0 — producer不等待broker确认。broker故障时可能丢消息,但延迟最低。
- 1 — producer等待leader将record写入本地log,但不等待follower的完整确认。在持久性和延迟之间取得平衡。
Connection configuration(连接配置)
配置与Kafka集群的连接。
- Bootstrap servers — Kafka broker地址列表,逗号分隔,格式为
host:port。producer使用这些地址建立与集群的连接。
Advanced settings(高级设置)
配置额外Kafka producer属性、消息头和字符编码。
-
Other properties — 高级Kafka producer配置的自定义键值对。这些属性直接传递给Kafka producer客户端。常见用途包括SSL/TLS配置(如
ssl.keystore.location、ssl.truststore.location)。 -
Add Message metadata key-value pairs to Kafka record headers — 启用时,所有消息metadata键值对会以
tb_msg_md_为前缀添加到Kafka record headers。 -
Charset encoding — 将消息metadata值转换为Kafka headers的字节数组时使用的字符编码。仅当启用 Add Message metadata key-value pairs to Kafka record headers 时生效。
JSON Schema
输出消息格式
无论成功或失败,节点均保留原始消息data。Metadata会更新为Kafka相关信息或错误详情。
Success case(成功时)
消息成功发布到Kafka时,以下字段会添加到出站消息metadata:
offset— record在Kafka分区中的offset,表示record在分区log中的位置。partition— 存储record的分区号。topic— 发布record的主题名。
原始消息data保持不变。
示例:
原始消息metadata:
1
2
3
4
{
"deviceType": "sensor",
"deviceName": "Sensor-001"
}
成功发布后(例如到主题 “telemetry” 的分区2、offset 12345),metadata变为:
1
2
3
4
5
6
7
{
"deviceType": "sensor",
"deviceName": "Sensor-001",
"offset": "12345",
"partition": "2",
"topic": "telemetry"
}
Failure case(失败时)
发布失败时,在出站消息metadata中添加以下字段:
error— 以class <ExceptionClass>: <error message>格式包含异常类名和错误信息
示例:
原始消息metadata:
1
2
3
{
"deviceType": "sensor"
}
失败后(如连接超时),metadata变为:
1
2
3
4
{
"deviceType": "sensor",
"error": "class org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for telemetry-0: 30000 ms has passed since batch creation"
}
消息data保持不变。
消息确认行为
节点的消息确认行为由环境变量 ACTORS_RULE_EXTERNAL_NODE_FORCE_ACK 控制:
- 设为
true— 入站消息在接收后立即被确认并标记为已成功处理。创建包含更新metadata的新消息并加入队列供下一节点处理。 - 设为
false(默认)— 入站消息在整个Kafka发布操作期间保持处理中状态。消息就地转换,发布完成后更新metadata,修改后的消息传递至下一节点。
消息处理算法
- The node constructs a Kafka producer record:
- If the topic pattern is configured, templates are resolved using values from the incoming message data and metadata to determine the target topic.
- If the key pattern is configured, templates are resolved using values from the incoming message data and metadata to determine the partition key. If empty, no key is specified.
- If Add Message metadata key-value pairs to Kafka record headers is enabled, all metadata entries are added as headers with the prefix
tb_msg_md_, encoded using the specified charset.
- The message data is used as the record value (sent as a UTF-8 string).
- The publish operation is executed asynchronously using the Kafka producer:
- The record is sent to the specified topic with the resolved key (if provided).
- The Kafka client determines the partition based on the key (if provided) or uses default distribution.
- When the Kafka broker acknowledges the record successfully:
- The offset, partition, and topic are added to the message metadata.
- The resulting message is forwarded via the
Successconnection.
- If an error occurs during publishing:
- Error details are added to the outgoing message metadata under the
errorkey. - The message is forwarded via the
Failureconnection.
- Error details are added to the outgoing message metadata under the
输出连接
- Success
- 消息已成功发布到Kafka。
- 消息metadata包含
offset、partition和topic字段。
- Failure
- Publishing error:Kafka broker拒绝消息、主题不存在或producer超时。
- Initialization error:Kafka producer初始化失败(如配置无效、SSL证书问题)。
- Unexpected error:消息处理过程中发生意外错误。
示例
示例1 — 基本telemetry发布
将telemetry数据发布到专用telemetry主题,不指定partition key。
入站消息
Data:
1
2
3
4
{
"temperature": 25.5,
"humidity": 60.2
}
Metadata:
1
2
3
4
{
"deviceType": "TH-Sensor",
"deviceName": "Sensor-001"
}
节点配置
1
2
3
4
5
6
7
8
9
10
11
12
13
{
"topicPattern": "telemetry",
"keyPattern": "",
"bootstrapServers": "kafka.example.com:9092",
"retries": 3,
"batchSize": 16384,
"linger": 10,
"bufferMemory": 33554432,
"acks": "-1",
"otherProperties": {},
"addMetadataKeyValuesAsKafkaHeaders": false,
"kafkaHeadersCharset": "UTF-8"
}
出站消息
Data:未更改。
Metadata:
1
2
3
4
5
6
7
{
"deviceType": "TH-Sensor",
"deviceName": "Sensor-001",
"offset": "12345",
"partition": "2",
"topic": "telemetry"
}
经 Success 连接路由。
结果
telemetry数据已成功发布到 “telemetry” 主题。Kafka broker将record分配到分区2、offset 12345。因未指定key,Kafka客户端使用默认分区分布。producer等待所有in-sync副本确认record(acks=-1),确保持久性。