产品定价 立即试用
PE MQTT Broker
文档 > 配置 > TBMQ PE集成执行器
入门
安装 架构 API 常见问题
目录

集成执行器配置属性

本指南帮助您熟悉TBMQ PE Integration Executor (IE) 的配置文件和参数。 我们强烈建议使用环境变量配置TBMQ IE。 这样在新平台版本发布时,您无需合并配置文件。

可用配置参数及对应环境变量列表见此处

如何修改配置参数?

Docker-Based部署

若TBMQ IE部署在Docker Compose环境中,可编辑脚本并为对应容器添加环境变量。更多详情请参阅 Docker文档

K8S-Based部署

若TBMQ IE部署在K8S环境中,可编辑脚本并为对应deployments/stateful sets添加环境变量。更多详情请参阅 K8S文档

Configuration parameters

配置文件采用YAML格式。所有配置参数均有对应的环境变量名和默认值。 修改配置参数时,只需修改其默认值。例如:

1
2
server:
  address: "${HTTP_BIND_ADDRESS:0.0.0.0}"

本例中,‘HTTP_BIND_ADDRESS’ 为环境变量名,‘0.0.0.0’ 为默认值。

以下简单示例用于添加值为 ‘8084’ 的新环境变量 ‘HTTP_BIND_PORT’:

1
2
...
export HTTP_BIND_PORT=8084

参数按系统组件分组。列表包含名称(tbmq-integration-executor.yml 文件中的地址)、环境变量、默认值和说明。

HTTP服务端参数

参数环境变量默认值描述
server.address HTTP_BIND_ADDRESS 0.0.0.0 HTTP服务端绑定地址
server.port HTTP_BIND_PORT 8082 HTTP服务端绑定端口

Kafka参数

参数环境变量默认值描述
queue.integration-downlink.poll-interval TB_IE_DOWNLINK_POLL_INTERVAL 1000 轮询tbmq.ie.downlink主题消息的间隔(毫秒)
queue.integration-msg.poll-interval TB_IE_MSG_POLL_INTERVAL 1000 轮询'tbmq.msg.ie'主题消息的间隔(毫秒)
queue.integration-msg.pack-processing-timeout TB_IE_MSG_PACK_PROCESSING_TIMEOUT 30000 处理消息包的超时时间(毫秒)
queue.integration-msg.ack-strategy.type TB_IE_MSG_ACK_STRATEGY_TYPE SKIP_ALL 'tbmq.msg.ie'主题的处理策略。可以是:SKIP_ALL、RETRY_ALL
queue.integration-msg.ack-strategy.retries TB_IE_MSG_ACK_STRATEGY_RETRIES 5 重试次数,0表示无限。用于RETRY_ALL处理策略
queue.integration-msg.ack-strategy.pause-between-retries TB_IE_MSG_ACK_STRATEGY_PAUSE_BETWEEN_RETRIES 1 消费者线程在重试前等待的时间(秒)
queue.kafka.bootstrap.servers TB_KAFKA_SERVERS localhost:9092 用于建立连接的Kafka引导服务器列表
queue.kafka.enable-topic-deletion TB_KAFKA_ENABLE_TOPIC_DELETION true 控制TBMQ是否允许删除为集成创建的Kafka主题。 设置为'true'时,TBMQ可能在清理期间自动删除主题 (例如,当集成被删除时)。 设置为'false'时,TBMQ将跳过主题删除并停止使用它们。 这有助于防止生产环境中的意外数据丢失
queue.kafka.default.consumer.partition-assignment-strategy TB_KAFKA_DEFAULT_CONSUMER_PARTITION_ASSIGNMENT_STRATEGY org.apache.kafka.clients.consumer.StickyAssignor 支持的分区分配策略的类名或类型列表,按优先级排序,客户端在使用组管理时将使用该列表来在消费者实例之间分配分区所有权
queue.kafka.default.consumer.session-timeout-ms TB_KAFKA_DEFAULT_CONSUMER_SESSION_TIMEOUT_MS 10000 使用Kafka组管理功能时用于检测客户端故障的超时时间(毫秒)
queue.kafka.default.consumer.max-poll-interval-ms TB_KAFKA_DEFAULT_CONSUMER_MAX_POLL_INTERVAL_MS 300000 使用消费者组管理时poll()调用之间的最大延迟(毫秒)
queue.kafka.default.consumer.max-poll-records TB_KAFKA_DEFAULT_CONSUMER_MAX_POLL_RECORDS 2000 单个poll()调用返回的最大记录数
queue.kafka.default.consumer.max-partition-fetch-bytes TB_KAFKA_DEFAULT_CONSUMER_MAX_PARTITION_FETCH_BYTES 16777216 服务器将返回的每个分区的最大数据量(字节)
queue.kafka.default.consumer.fetch-max-bytes TB_KAFKA_DEFAULT_CONSUMER_FETCH_MAX_BYTES 134217728 服务器应为获取请求返回的最大数据量(字节)
queue.kafka.default.consumer.heartbeat-interval-ms TB_KAFKA_DEFAULT_CONSUMER_HEARTBEAT_INTERVAL_MS 3000 The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than TB_KAFKA_DEFAULT_CONSUMER_SESSION_TIMEOUT_MS, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. Value in milliseconds. Default is 3 sec
queue.kafka.default.producer.acks TB_KAFKA_DEFAULT_PRODUCER_ACKS 1 生产者在认为请求完成前要求领导者收到的确认数
queue.kafka.default.producer.retries TB_KAFKA_DEFAULT_PRODUCER_RETRIES 1 将值设置为大于零将导致客户端重新发送发送失败的任何记录,该记录可能会因暂时错误而失败
queue.kafka.default.producer.batch-size TB_KAFKA_DEFAULT_PRODUCER_BATCH_SIZE 16384 当多个记录被发送到同一分区时,生产者将尝试将记录一起批处理为更少的请求。大小以字节为单位
queue.kafka.default.producer.linger-ms TB_KAFKA_DEFAULT_PRODUCER_LINGER_MS 5 生产者将在请求传输之间到达的任何记录分组为单个批处理请求,以毫秒为单位设置
queue.kafka.default.producer.buffer-memory TB_KAFKA_DEFAULT_PRODUCER_BUFFER_MEMORY 33554432 生产者可用于缓冲等待发送到服务器的记录的总字节内存
queue.kafka.default.producer.compression-type TB_KAFKA_DEFAULT_COMPRESSION_TYPE none 生产者生成的所有数据的压缩类型。有效值为`none`、`gzip`、`snappy`、`lz4`或`zstd`
queue.kafka.admin.config TB_KAFKA_ADMIN_CONFIG retries:1 用于创建admin Kafka客户端的分号分隔的配置列表
queue.kafka.admin.command-timeout TB_KAFKA_ADMIN_COMMAND_TIMEOUT_SEC 30 Kafka Admin客户端命令超时(以秒为单位)。适用于describeCluster、listTopics等操作
queue.kafka.consumer-stats.enabled TB_KAFKA_CONSUMER_STATS_ENABLED true 如果启用,打印Kafka主题中消费者组偏移与最后消息偏移之间的滞后时间
queue.kafka.consumer-stats.print-interval-ms TB_KAFKA_CONSUMER_STATS_PRINT_INTERVAL_MS 60000 Kafka消费者组统计的统计打印间隔(毫秒)
queue.kafka.consumer-stats.kafka-response-timeout-ms TB_KAFKA_CONSUMER_STATS_RESPONSE_TIMEOUT_MS 1000 等待统计加载请求到Kafka完成的时间(毫秒)
queue.kafka.consumer-stats.consumer-config TB_KAFKA_CONSUMER_STATS_CONSUMER_CONFIG 用于Kafka统计消费者的分号分隔的配置列表
queue.kafka.integration-downlink.topic-prefix TB_KAFKA_IE_DOWNLINK_TOPIC_PREFIX tbmq.ie.downlink 用于从tbmq向集成执行器发送集成配置和验证请求的主题的前缀
queue.kafka.integration-downlink.http.topic-properties TB_KAFKA_IE_DOWNLINK_HTTP_TOPIC_PROPERTIES retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:6;replication.factor:1 `tbmq.ie.downlink.http`主题的Kafka主题属性(分号分隔)
queue.kafka.integration-downlink.http.additional-consumer-config TB_KAFKA_IE_DOWNLINK_HTTP_ADDITIONAL_CONSUMER_CONFIG `tbmq.ie.downlink.http`主题的额外Kafka消费者配置(分号分隔)
queue.kafka.integration-downlink.http.additional-producer-config TB_KAFKA_IE_DOWNLINK_HTTP_ADDITIONAL_PRODUCER_CONFIG `tbmq.ie.downlink.http`主题的额外Kafka生产者配置(分号分隔)
queue.kafka.integration-downlink.kafka.topic-properties TB_KAFKA_IE_DOWNLINK_KAFKA_TOPIC_PROPERTIES retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:6;replication.factor:1 `tbmq.ie.downlink.kafka`主题的Kafka主题属性(分号分隔)
queue.kafka.integration-downlink.kafka.additional-consumer-config TB_KAFKA_IE_DOWNLINK_KAFKA_ADDITIONAL_CONSUMER_CONFIG `tbmq.ie.downlink.kafka`主题的额外Kafka消费者配置(分号分隔)
queue.kafka.integration-downlink.kafka.additional-producer-config TB_KAFKA_IE_DOWNLINK_KAFKA_ADDITIONAL_PRODUCER_CONFIG `tbmq.ie.downlink.kafka`主题的额外Kafka生产者配置(分号分隔)
queue.kafka.integration-downlink.mqtt.topic-properties TB_KAFKA_IE_DOWNLINK_MQTT_TOPIC_PROPERTIES retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:6;replication.factor:1 `tbmq.ie.downlink.mqtt`主题的Kafka主题属性(分号分隔)
queue.kafka.integration-downlink.mqtt.additional-consumer-config TB_KAFKA_IE_DOWNLINK_MQTT_ADDITIONAL_CONSUMER_CONFIG `tbmq.ie.downlink.mqtt`主题的额外Kafka消费者配置(分号分隔)
queue.kafka.integration-downlink.mqtt.additional-producer-config TB_KAFKA_IE_DOWNLINK_MQTT_ADDITIONAL_PRODUCER_CONFIG `tbmq.ie.downlink.mqtt`主题的额外Kafka生产者配置(分号分隔)
queue.kafka.integration-uplink.topic TB_KAFKA_IE_UPLINK_TOPIC tbmq.ie.uplink 用于从集成执行器向tbmq发送消息/事件的主题
queue.kafka.integration-uplink.topic-properties TB_KAFKA_IE_UPLINK_TOPIC_PROPERTIES retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:6;replication.factor:1 `tbmq.ie.uplink`主题的Kafka主题属性(分号分隔)
queue.kafka.integration-uplink.additional-consumer-config TB_KAFKA_IE_UPLINK_ADDITIONAL_CONSUMER_CONFIG `tbmq.ie.uplink`主题的额外Kafka消费者配置(分号分隔)
queue.kafka.integration-uplink.additional-producer-config TB_KAFKA_IE_UPLINK_ADDITIONAL_PRODUCER_CONFIG `tbmq.ie.uplink`主题的额外Kafka生产者配置(分号分隔)
queue.kafka.integration-uplink-notifications.topic-prefix TB_KAFKA_IE_UPLINK_NOTIF_TOPIC_PREFIX tbmq.ie.uplink.notifications 用于从集成执行器向特定tbmq节点发送通知或回复的主题的前缀
queue.kafka.integration-uplink-notifications.topic-properties TB_KAFKA_IE_UPLINK_NOTIF_TOPIC_PROPERTIES retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;replication.factor:1 `tbmq.ie.uplink.notifications`主题的Kafka主题属性(分号分隔)
queue.kafka.integration-uplink-notifications.additional-consumer-config TB_KAFKA_IE_UPLINK_NOTIF_ADDITIONAL_CONSUMER_CONFIG `tbmq.ie.uplink.notifications`主题的额外Kafka消费者配置(分号分隔)
queue.kafka.integration-uplink-notifications.additional-producer-config TB_KAFKA_IE_UPLINK_NOTIF_ADDITIONAL_PRODUCER_CONFIG `tbmq.ie.uplink.notifications`主题的额外Kafka生产者配置(分号分隔)
queue.kafka.integration-msg.topic-properties TB_KAFKA_IE_MSG_TOPIC_PROPERTIES retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;replication.factor:1 `tbmq.msg.ie`主题的Kafka主题属性(分号分隔)
queue.kafka.integration-msg.additional-consumer-config TB_KAFKA_IE_MSG_ADDITIONAL_CONSUMER_CONFIG max.poll.records:50 `tbmq.msg.ie`主题的额外Kafka消费者配置(分号分隔)
queue.kafka.integration-msg.additional-producer-config TB_KAFKA_IE_MSG_ADDITIONAL_PRODUCER_CONFIG `tbmq.msg.ie`主题的额外Kafka生产者配置(分号分隔)
queue.kafka.kafka-prefix TB_KAFKA_PREFIX 所有Kafka主题、生产者、消费者组和消费者的通用前缀。默认为空字符串,表示不添加前缀

服务参数

参数环境变量默认值描述
service.type TB_SERVICE_TYPE tbmq-integration-executor 微服务类型。允许的值:tbmq-integration-executor
service.id TB_SERVICE_ID 此服务的唯一ID(如果为空则自动生成)
service.integrations.supported TB_SERVICE_INTEGRATIONS_SUPPORTED ALL 允许在微服务集成执行器上启用集成。 允许的值:HTTP、KAFKA、MQTT。默认为ALL
service.integrations.excluded TB_SERVICE_INTEGRATIONS_EXCLUDED NONE 要从服务/微服务集成执行器上的处理中排除的集成列表。 允许的值:HTTP、KAFKA、MQTT。默认为NONE

集成通用参数

参数环境变量默认值描述
integrations.statistics.enabled INTEGRATIONS_STATISTICS_ENABLED true 启用/禁用集成统计
integrations.statistics.persist-frequency INTEGRATIONS_STATISTICS_PERSIST_FREQUENCY 3600000 集成统计持久化频率(毫秒)
integrations.init.connection-timeout-sec INTEGRATIONS_INIT_CONNECTION_TIMEOUT_SEC 15 集成允许的最大连接超时(秒)。任何更大的用户定义的超时都将降低至此限制
integrations.init.connection-check-api-request-timeout-sec INTEGRATIONS_INIT_CONNECTION_CHECK_API_REQUEST_TIMEOUT_SEC 20 API请求的连接检查超时(秒)
integrations.reinit.enabled INTEGRATIONS_REINIT_ENABLED true 启用/禁用集成热重新初始化。此过程对状态为'FAILED'的集成执行
integrations.reinit.frequency INTEGRATIONS_REINIT_FREQUENCY_MS 300000 重新初始化集成的检查间隔(毫秒)。默认为5分钟
integrations.destroy.graceful-timeout-ms INTEGRATIONS_DESTROY_TIMEOUT_MS 1000 在优雅关闭过程的每次迭代中等待集成正常终止的持续时间(毫秒)。 默认值设置为1秒
integrations.destroy.count INTEGRATIONS_DESTROY_COUNT 10 强制停止进程前尝试优雅关闭的迭代次数
integrations.destroy.forced-shutdown-timeout-ms INTEGRATIONS_DESTROY_FORCED_SHUTDOWN_TIMEOUT_MS 15000 如果优雅关闭过程未启动或超过允许的时间,强制停止应用程序前等待的最大持续时间(毫秒)
integrations.allow-local-network-hosts INTEGRATIONS_ALLOW_LOCAL_NETWORK_HOSTS true 启用/禁用集成本地网络主机
integrations.uplink.callback-threads-count INTEGRATIONS_UPLINK_THREADS 4 池中用于处理到tbmq节点的上行链路事件回调的线程数
integrations.manage.lifecycle-threads-count INTEGRATIONS_MANAGE_LIFECYCLE_THREADS 4 池中用于处理集成生命周期事件(CREATE/UPDATE/DELETE)的线程数
integrations.manage.command-threads-count INTEGRATIONS_MANAGE_COMMAND_THREADS 4 池中用于处理集成验证请求的线程数
integrations.external.threads-count INTEGRATIONS_EXTERNAL_THREADS 10 池中专门用于处理外部操作(如生成Kafka主题消息)的线程数
integrations.netty.threads-count INTEGRATIONS_NETTY_SHARED_GROUP_THREADS 0 Netty共享worker组线程计数。默认为0,表示线程计数为availableProcessors * 2。 用于使用MQTT桥接(集成)向外部MQTT代理发送消息

管理参数

参数环境变量默认值描述
management.health.diskspace.enabled HEALTH_DISKSPACE_ENABLED false 启用/禁用磁盘空间健康检查
management.endpoint.health.show-details HEALTH_SHOW_DETAILS never 控制健康端点是否显示完整的组件详细信息(例如Redis、DB、TBMQ)。 选项: - 'never':始终隐藏详细信息(启用安全性时的默认值)。 - 'when-authorized':仅向已认证用户显示详细信息。 - 'always':始终在响应中包含完整健康详细信息
management.endpoints.web.exposure.include METRICS_ENDPOINTS_EXPOSE health,info,prometheus 指定应通过HTTP公开哪些执行器端点。 使用'health,info'仅公开基本健康和信息端点。 要公开Prometheus指标,请更新此项以在列表中包含'prometheus'(例如'health,info,prometheus')

统计参数

参数环境变量默认值描述
stats.ie.enabled STATS_IE_ENABLED true 启用/禁用向日志打印统计
stats.ie.print-interval-ms STATS_IE_PRINT_INTERVAL_MS 60000 打印统计的周期(毫秒)。默认值对应1分钟
stats.timer.percentiles STATS_TIMER_PERCENTILES 0.5 执行器为定时器指标返回的指标百分位数。以逗号分隔的(,)双精度值列表
stats.system-info.persist-frequency STATS_SYSTEM_INFO_PERSIST_FREQUENCY_SEC 60 系统信息(CPU、内存使用情况等)的持久化频率(秒)

事件配置参数

参数环境变量默认值描述
event.error.rate-limits.enabled EVENT_ERROR_RATE_LIMITS_ENABLED true 如果为true,速率限制将被激活
event.error.rate-limits.integration EVENT_ERROR_RATE_LIMITS_INTEGRATION 5000:3600,100:60 所有集成每小时不超过5000条消息或每分钟不超过100条消息
event.error.rate-limits.ttl-minutes EVENT_ERROR_RATE_LIMITS_TTL 60 达到错误事件速率限制后,防止速率限制事件重复持久化的时间(分钟)