产品定价 立即试用
社区版
入门 文档 指南 安装 架构 API 常见问题

kafka

Apache Kafka 主题发布消息,入站消息data作为record value发送。

配置

Topic and key configuration(主题与key配置)

定义目标Kafka主题及可选的分区key以路由消息。

  • Topic pattern — 发布消息的Kafka主题名称。支持 templates
  • Key pattern — 用于决定消息写入分区的partition key。支持模板。若无需特定分区,可留空。

Producer configuration(Producer配置)

配置Kafka producer的消息投递、批处理和可靠性行为。

  • Automatically retry times if fails — record发送失败时Kafka producer的重试次数(retries)。producer会重试因临时错误失败的record。设为 0 禁用重试。

  • Produces batch size in bytes — Kafka producer的batch大小(batch.size)。向同一分区发送多条record时,producer会尝试合并为更少的请求以提高吞吐量。batch大小为 0 时禁用批处理。

  • Time to buffer locally (ms) — Kafka producer的linger时间(linger.ms)。producer在发送batch前等待以积累更多record的时间。设大于 0 会增加延迟,但有助于批处理。

  • Client buffer max size in bytes — Kafka producer的buffer内存(buffer.memory)。producer可用于缓冲待发送record的总字节数。

  • Number of acknowledgments — producer认为请求完成前broker需收到的确认数(acks)。 可用选项:

    • -1 (all) — producer等待所有in-sync副本确认record。持久性最强,但延迟最高。
    • 0 — producer不等待broker确认。broker故障时可能丢消息,但延迟最低。
    • 1 — producer等待leader将record写入本地log,但不等待follower的完整确认。在持久性和延迟之间取得平衡。

Connection configuration(连接配置)

配置与Kafka集群的连接。

  • Bootstrap servers — Kafka broker地址列表,逗号分隔,格式为 host:port。producer使用这些地址建立与集群的连接。

Advanced settings(高级设置)

配置额外Kafka producer属性、消息头和字符编码。

  • Other properties — 高级Kafka producer配置的自定义键值对。这些属性直接传递给Kafka producer客户端。常见用途包括SSL/TLS配置(如 ssl.keystore.locationssl.truststore.location)。

  • Add Message metadata key-value pairs to Kafka record headers — 启用时,所有消息metadata键值对会以 tb_msg_md_ 为前缀添加到Kafka record headers。

  • Charset encoding — 将消息metadata值转换为Kafka headers的字节数组时使用的字符编码。仅当启用 Add Message metadata key-value pairs to Kafka record headers 时生效。

JSON Schema

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "TbKafkaNodeConfiguration",
  "type": "object",
  "properties": {
    "topicPattern": {
      "type": "string",
      "description": "Name of the Kafka topic (supports templatization)."
    },
    "keyPattern": {
      "type": [
        "string",
        "null"
      ],
      "description": "Partition key for message routing (supports templatization)."
    },
    "bootstrapServers": {
      "type": "string",
      "minLength": 1,
      "description": "Comma-separated list of Kafka broker addresses (host:port)."
    },
    "retries": {
      "type": "integer",
      "minimum": 0,
      "description": "Number of retries if record send fails."
    },
    "batchSize": {
      "type": "integer",
      "minimum": 0,
      "description": "Producer batch size in bytes."
    },
    "linger": {
      "type": "integer",
      "minimum": 0,
      "description": "Time to buffer records locally in milliseconds."
    },
    "bufferMemory": {
      "type": "integer",
      "minimum": 0,
      "description": "Total bytes of memory for buffering records."
    },
    "acks": {
      "type": "string",
      "enum": [
        "-1",
        "0",
        "1",
        "all"
      ],
      "description": "Number of acknowledgments required from broker."
    },
    "otherProperties": {
      "type": "object",
      "additionalProperties": {
        "type": "string"
      },
      "description": "Additional Kafka producer properties as key-value pairs."
    },
    "addMetadataKeyValuesAsKafkaHeaders": {
      "type": "boolean",
      "description": "Add message metadata as Kafka record headers."
    },
    "kafkaHeadersCharset": {
      "type": "string",
      "description": "Character encoding for metadata values in headers."
    }
  },
  "required": [
    "topicPattern",
    "bootstrapServers"
  ],
  "additionalProperties": false
}

输出消息格式

无论成功或失败,节点均保留原始消息data。Metadata会更新为Kafka相关信息或错误详情。

Success case(成功时)

消息成功发布到Kafka时,以下字段会添加到出站消息metadata:

  • offset — record在Kafka分区中的offset,表示record在分区log中的位置。
  • partition — 存储record的分区号。
  • topic — 发布record的主题名。

原始消息data保持不变。

示例:

原始消息metadata:

1
2
3
4
{
  "deviceType": "sensor",
  "deviceName": "Sensor-001"
}

成功发布后(例如到主题 “telemetry” 的分区2、offset 12345),metadata变为:

1
2
3
4
5
6
7
{
  "deviceType": "sensor",
  "deviceName": "Sensor-001",
  "offset": "12345",
  "partition": "2",
  "topic": "telemetry"
}

Failure case(失败时)

发布失败时,在出站消息metadata中添加以下字段:

  • error — 以 class <ExceptionClass>: <error message> 格式包含异常类名和错误信息

示例:

原始消息metadata:

1
2
3
{
  "deviceType": "sensor"
}

失败后(如连接超时),metadata变为:

1
2
3
4
{
  "deviceType": "sensor",
  "error": "class org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for telemetry-0: 30000 ms has passed since batch creation"
}

消息data保持不变。

消息确认行为

节点的消息确认行为由环境变量 ACTORS_RULE_EXTERNAL_NODE_FORCE_ACK 控制:

  • 设为 true — 入站消息在接收后立即被确认并标记为已成功处理。创建包含更新metadata的新消息并加入队列供下一节点处理。
  • 设为 false(默认)— 入站消息在整个Kafka发布操作期间保持处理中状态。消息就地转换,发布完成后更新metadata,修改后的消息传递至下一节点。

消息处理算法

  1. The node constructs a Kafka producer record:
    • If the topic pattern is configured, templates are resolved using values from the incoming message data and metadata to determine the target topic.
    • If the key pattern is configured, templates are resolved using values from the incoming message data and metadata to determine the partition key. If empty, no key is specified.
    • If Add Message metadata key-value pairs to Kafka record headers is enabled, all metadata entries are added as headers with the prefix tb_msg_md_, encoded using the specified charset.
  2. The message data is used as the record value (sent as a UTF-8 string).
  3. The publish operation is executed asynchronously using the Kafka producer:
    • The record is sent to the specified topic with the resolved key (if provided).
    • The Kafka client determines the partition based on the key (if provided) or uses default distribution.
  4. When the Kafka broker acknowledges the record successfully:
    • The offset, partition, and topic are added to the message metadata.
    • The resulting message is forwarded via the Success connection.
  5. If an error occurs during publishing:
    • Error details are added to the outgoing message metadata under the error key.
    • The message is forwarded via the Failure connection.

输出连接

  • Success
    • 消息已成功发布到Kafka。
    • 消息metadata包含 offsetpartitiontopic 字段。
  • Failure
    • Publishing error:Kafka broker拒绝消息、主题不存在或producer超时。
    • Initialization error:Kafka producer初始化失败(如配置无效、SSL证书问题)。
    • Unexpected error:消息处理过程中发生意外错误。

示例

示例1 — 基本telemetry发布

将telemetry数据发布到专用telemetry主题,不指定partition key。

入站消息

Data:

1
2
3
4
{
  "temperature": 25.5,
  "humidity": 60.2
}

Metadata:

1
2
3
4
{
  "deviceType": "TH-Sensor",
  "deviceName": "Sensor-001"
}

节点配置

1
2
3
4
5
6
7
8
9
10
11
12
13
{
  "topicPattern": "telemetry",
  "keyPattern": "",
  "bootstrapServers": "kafka.example.com:9092",
  "retries": 3,
  "batchSize": 16384,
  "linger": 10,
  "bufferMemory": 33554432,
  "acks": "-1",
  "otherProperties": {},
  "addMetadataKeyValuesAsKafkaHeaders": false,
  "kafkaHeadersCharset": "UTF-8"
}

出站消息

Data:未更改。

Metadata:

1
2
3
4
5
6
7
{
  "deviceType": "TH-Sensor",
  "deviceName": "Sensor-001",
  "offset": "12345",
  "partition": "2",
  "topic": "telemetry"
}

Success 连接路由。

结果

telemetry数据已成功发布到 “telemetry” 主题。Kafka broker将record分配到分区2、offset 12345。因未指定key,Kafka客户端使用默认分区分布。producer等待所有in-sync副本确认record(acks=-1),确保持久性。