产品定价 立即试用
MQTT Broker
文档 > 配置 > TBMQ集成执行器
入门
安装 架构 API 常见问题
目录

Integration Executor 配置属性

本指南帮助您熟悉TBMQ CE Integration Executor (IE) 的配置文件和参数。 我们强烈建议使用环境变量配置TBMQ IE。 这样在新平台版本发布时,您无需合并配置文件。

可用配置参数及对应环境变量列表见此处

如何修改配置参数?

Docker-Based部署

若TBMQ IE部署在Docker Compose环境中,可编辑脚本并为对应容器添加环境变量。更多详情请参阅 Docker文档

K8S-Based部署

若TBMQ IE部署在K8S环境中,可编辑脚本并为对应deployments/stateful sets添加环境变量。更多详情请参阅 K8S文档

Configuration parameters

配置文件采用YAML格式。所有配置参数均有对应的环境变量名和默认值。 修改配置参数时,只需修改其默认值。例如:

1
2
server:
  address: "${HTTP_BIND_ADDRESS:0.0.0.0}"

本例中,‘HTTP_BIND_ADDRESS’ 为环境变量名,‘0.0.0.0’ 为默认值。

以下简单示例用于添加值为 ‘8084’ 的新环境变量 ‘HTTP_BIND_PORT’:

1
2
...
export HTTP_BIND_PORT=8084

参数按系统组件分组。列表包含名称(tbmq-integration-executor.yml 文件中的地址)、环境变量、默认值和说明。

HTTP服务器参数

参数环境变量默认值描述
server.address HTTP_BIND_ADDRESS 0.0.0.0 HTTP服务器绑定地址
server.port HTTP_BIND_PORT 8082 HTTP服务器绑定端口

Kafka参数

参数环境变量默认值描述
queue.integration-downlink.poll-interval TB_IE_DOWNLINK_POLL_INTERVAL 1000 从 'tbmq.ie.downlink' 主题轮询消息的间隔(毫秒)
queue.integration-msg.poll-interval TB_IE_MSG_POLL_INTERVAL 1000 从 'tbmq.msg.ie' 主题轮询消息的间隔(毫秒)
queue.integration-msg.pack-processing-timeout TB_IE_MSG_PACK_PROCESSING_TIMEOUT 30000 处理消息包的超时时间(毫秒)
queue.integration-msg.ack-strategy.type TB_IE_MSG_ACK_STRATEGY_TYPE SKIP_ALL 'tbmq.msg.ie' 主题的处理策略。可选:SKIP_ALL、RETRY_ALL
queue.integration-msg.ack-strategy.retries TB_IE_MSG_ACK_STRATEGY_RETRIES 5 重试次数,0表示不限。用于RETRY_ALL处理策略
queue.integration-msg.ack-strategy.pause-between-retries TB_IE_MSG_ACK_STRATEGY_PAUSE_BETWEEN_RETRIES 1 消费者线程中重试前的等待时间(秒)
queue.kafka.bootstrap.servers TB_KAFKA_SERVERS localhost:9092 用于建立连接的Kafka bootstrap服务器列表
queue.kafka.enable-topic-deletion TB_KAFKA_ENABLE_TOPIC_DELETION true 控制TBMQ是否允许删除为Integration创建的Kafka主题。 设为 'true' 时,TBMQ在清理过程中可能会自动删除主题(例如Integration被删除时)。 设为 'false' 时,TBMQ将跳过主题删除并仅停止使用。 有助于防止生产环境中意外丢失数据
queue.kafka.default.consumer.partition-assignment-strategy TB_KAFKA_DEFAULT_CONSUMER_PARTITION_ASSIGNMENT_STRATEGY org.apache.kafka.clients.consumer.StickyAssignor 按优先级排序的分区分配策略类名或类型列表,在使用组管理时,客户端将用其在不同消费者实例间分配分区所有权
queue.kafka.default.consumer.session-timeout-ms TB_KAFKA_DEFAULT_CONSUMER_SESSION_TIMEOUT_MS 10000 使用Kafka组管理功能时,用于检测客户端故障的超时时间(毫秒)
queue.kafka.default.consumer.max-poll-interval-ms TB_KAFKA_DEFAULT_CONSUMER_MAX_POLL_INTERVAL_MS 300000 使用消费者组管理时,两次poll() 调用之间的最大延迟(毫秒)
queue.kafka.default.consumer.max-poll-records TB_KAFKA_DEFAULT_CONSUMER_MAX_POLL_RECORDS 2000 单次poll() 调用返回的最大记录数
queue.kafka.default.consumer.max-partition-fetch-bytes TB_KAFKA_DEFAULT_CONSUMER_MAX_PARTITION_FETCH_BYTES 16777216 服务器每个分区将返回的最大数据量(字节)
queue.kafka.default.consumer.fetch-max-bytes TB_KAFKA_DEFAULT_CONSUMER_FETCH_MAX_BYTES 134217728 服务器对fetch请求应返回的最大数据量(字节)
queue.kafka.default.consumer.heartbeat-interval-ms TB_KAFKA_DEFAULT_CONSUMER_HEARTBEAT_INTERVAL_MS 3000 使用Kafka组管理功能时,向消费者协调器发送心跳的期望间隔。心跳用于确保消费者会话保持活跃,并便于新消费者加入或离开时进行再平衡。该值必须小于TB_KAFKA_DEFAULT_CONSUMER_SESSION_TIMEOUT_MS,通常不应超过其1/3。可进一步调低以控制正常再平衡的期望时间。单位毫秒,默认3秒
queue.kafka.default.producer.acks TB_KAFKA_DEFAULT_PRODUCER_ACKS 1 生产者在认为请求完成前,要求leader收到的确认数量
queue.kafka.default.producer.retries TB_KAFKA_DEFAULT_PRODUCER_RETRIES 1 设为大于0时,发送失败且可能是临时错误的记录将被客户端重新发送
queue.kafka.default.producer.batch-size TB_KAFKA_DEFAULT_PRODUCER_BATCH_SIZE 16384 生产者会尝试将发送到同一分区的多条记录合并为更少的请求。单位字节
queue.kafka.default.producer.linger-ms TB_KAFKA_DEFAULT_PRODUCER_LINGER_MS 5 生产者将请求传输之间到达的记录合并为单批请求,单位毫秒
queue.kafka.default.producer.buffer-memory TB_KAFKA_DEFAULT_PRODUCER_BUFFER_MEMORY 33554432 生产者用于缓冲待发送到服务器记录的总内存(字节)
queue.kafka.default.producer.compression-type TB_KAFKA_DEFAULT_COMPRESSION_TYPE none 生产者生成数据的压缩类型。有效值:`none`、`gzip`、`snappy`、`lz4` 或 `zstd`
queue.kafka.admin.config TB_KAFKA_ADMIN_CONFIG retries:1 用于创建admin Kafka客户端的配置列表,以分号分隔
queue.kafka.admin.command-timeout TB_KAFKA_ADMIN_COMMAND_TIMEOUT_SEC 30 Kafka Admin客户端命令超时(秒)。适用于describeCluster、listTopics等操作
queue.kafka.consumer-stats.enabled TB_KAFKA_CONSUMER_STATS_ENABLED true 启用时,打印Kafka主题中消费者组偏移与最后消息偏移之间的lag
queue.kafka.consumer-stats.print-interval-ms TB_KAFKA_CONSUMER_STATS_PRINT_INTERVAL_MS 60000 Kafka消费者组统计信息的打印间隔(毫秒)
queue.kafka.consumer-stats.kafka-response-timeout-ms TB_KAFKA_CONSUMER_STATS_RESPONSE_TIMEOUT_MS 1000 等待Kafka统计加载请求完成的超时时间(毫秒)
queue.kafka.consumer-stats.consumer-config TB_KAFKA_CONSUMER_STATS_CONSUMER_CONFIG 用于Kafka统计消费者的配置列表,以分号分隔
queue.kafka.integration-downlink.topic-prefix TB_KAFKA_IE_DOWNLINK_TOPIC_PREFIX tbmq.ie.downlink 从TBMQ向Integration Executor发送integration配置和验证请求的主题前缀
queue.kafka.integration-downlink.http.topic-properties TB_KAFKA_IE_DOWNLINK_HTTP_TOPIC_PROPERTIES retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:6;replication.factor:1 `tbmq.ie.downlink.http` 主题的Kafka主题属性,以分号分隔
queue.kafka.integration-downlink.http.additional-consumer-config TB_KAFKA_IE_DOWNLINK_HTTP_ADDITIONAL_CONSUMER_CONFIG `tbmq.ie.downlink.http` 主题的额外Kafka消费者配置,以分号分隔
queue.kafka.integration-downlink.http.additional-producer-config TB_KAFKA_IE_DOWNLINK_HTTP_ADDITIONAL_PRODUCER_CONFIG `tbmq.ie.downlink.http` 主题的额外Kafka生产者配置,以分号分隔
queue.kafka.integration-downlink.kafka.topic-properties TB_KAFKA_IE_DOWNLINK_KAFKA_TOPIC_PROPERTIES retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:6;replication.factor:1 `tbmq.ie.downlink.kafka` 主题的Kafka主题属性,以分号分隔
queue.kafka.integration-downlink.kafka.additional-consumer-config TB_KAFKA_IE_DOWNLINK_KAFKA_ADDITIONAL_CONSUMER_CONFIG `tbmq.ie.downlink.kafka` 主题的额外Kafka消费者配置,以分号分隔
queue.kafka.integration-downlink.kafka.additional-producer-config TB_KAFKA_IE_DOWNLINK_KAFKA_ADDITIONAL_PRODUCER_CONFIG `tbmq.ie.downlink.kafka` 主题的额外Kafka生产者配置,以分号分隔
queue.kafka.integration-downlink.mqtt.topic-properties TB_KAFKA_IE_DOWNLINK_MQTT_TOPIC_PROPERTIES retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:6;replication.factor:1 `tbmq.ie.downlink.mqtt` 主题的Kafka主题属性,以分号分隔
queue.kafka.integration-downlink.mqtt.additional-consumer-config TB_KAFKA_IE_DOWNLINK_MQTT_ADDITIONAL_CONSUMER_CONFIG `tbmq.ie.downlink.mqtt` 主题的额外Kafka消费者配置,以分号分隔
queue.kafka.integration-downlink.mqtt.additional-producer-config TB_KAFKA_IE_DOWNLINK_MQTT_ADDITIONAL_PRODUCER_CONFIG `tbmq.ie.downlink.mqtt` 主题的额外Kafka生产者配置,以分号分隔
queue.kafka.integration-uplink.topic TB_KAFKA_IE_UPLINK_TOPIC tbmq.ie.uplink 从Integration Executor向TBMQ发送消息/事件的主题
queue.kafka.integration-uplink.topic-properties TB_KAFKA_IE_UPLINK_TOPIC_PROPERTIES retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:6;replication.factor:1 `tbmq.ie.uplink` 主题的Kafka主题属性,以分号分隔
queue.kafka.integration-uplink.additional-consumer-config TB_KAFKA_IE_UPLINK_ADDITIONAL_CONSUMER_CONFIG `tbmq.ie.uplink` 主题的额外Kafka消费者配置,以分号分隔
queue.kafka.integration-uplink.additional-producer-config TB_KAFKA_IE_UPLINK_ADDITIONAL_PRODUCER_CONFIG `tbmq.ie.uplink` 主题的额外Kafka生产者配置,以分号分隔
queue.kafka.integration-uplink-notifications.topic-prefix TB_KAFKA_IE_UPLINK_NOTIF_TOPIC_PREFIX tbmq.ie.uplink.notifications 从Integration Executor向特定TBMQ节点发送通知或回复的主题前缀
queue.kafka.integration-uplink-notifications.topic-properties TB_KAFKA_IE_UPLINK_NOTIF_TOPIC_PROPERTIES retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;replication.factor:1 `tbmq.ie.uplink.notifications` 主题的Kafka主题属性,以分号分隔
queue.kafka.integration-uplink-notifications.additional-consumer-config TB_KAFKA_IE_UPLINK_NOTIF_ADDITIONAL_CONSUMER_CONFIG `tbmq.ie.uplink.notifications` 主题的额外Kafka消费者配置,以分号分隔
queue.kafka.integration-uplink-notifications.additional-producer-config TB_KAFKA_IE_UPLINK_NOTIF_ADDITIONAL_PRODUCER_CONFIG `tbmq.ie.uplink.notifications` 主题的额外Kafka生产者配置,以分号分隔
queue.kafka.integration-msg.topic-properties TB_KAFKA_IE_MSG_TOPIC_PROPERTIES retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;replication.factor:1 `tbmq.msg.ie` 主题的Kafka主题属性,以分号分隔
queue.kafka.integration-msg.additional-consumer-config TB_KAFKA_IE_MSG_ADDITIONAL_CONSUMER_CONFIG max.poll.records:50 `tbmq.msg.ie` 主题的额外Kafka消费者配置,以分号分隔
queue.kafka.integration-msg.additional-producer-config TB_KAFKA_IE_MSG_ADDITIONAL_PRODUCER_CONFIG `tbmq.msg.ie` 主题的额外Kafka生产者配置,以分号分隔
queue.kafka.kafka-prefix TB_KAFKA_PREFIX 所有Kafka主题、生产者、消费者组和消费者的通用前缀。默认为空字符串表示不添加前缀

服务参数

参数环境变量默认值描述
service.type TB_SERVICE_TYPE tbmq-integration-executor 微服务类型。允许值:tbmq-integration-executor
service.id TB_SERVICE_ID 此服务的唯一ID(为空则自动生成)
service.integrations.supported TB_SERVICE_INTEGRATIONS_SUPPORTED ALL 在微服务Integration Executor上启用的integration。允许值:HTTP、KAFKA、MQTT。默认ALL
service.integrations.excluded TB_SERVICE_INTEGRATIONS_EXCLUDED NONE 在服务/微服务Integration Executor上排除处理的integration列表。允许值:HTTP、KAFKA、MQTT。默认NONE

Integration通用参数

参数环境变量默认值描述
integrations.statistics.enabled INTEGRATIONS_STATISTICS_ENABLED true 启用/禁用integration统计
integrations.statistics.persist-frequency INTEGRATIONS_STATISTICS_PERSIST_FREQUENCY 3600000 Integration统计持久化频率(毫秒)
integrations.init.connection-timeout-sec INTEGRATIONS_INIT_CONNECTION_TIMEOUT_SEC 15 integration允许的最大连接超时(秒)。任何更大的用户定义超时将被缩小到此限制
integrations.init.connection-check-api-request-timeout-sec INTEGRATIONS_INIT_CONNECTION_CHECK_API_REQUEST_TIMEOUT_SEC 20 API请求的连接检查超时(秒)
integrations.reinit.enabled INTEGRATIONS_REINIT_ENABLED true 启用/禁用integration热重初始化。针对状态为 'FAILED' 的integration执行
integrations.reinit.frequency INTEGRATIONS_REINIT_FREQUENCY_MS 300000 重初始化integration的检查间隔(毫秒)。默认5分钟
integrations.destroy.graceful-timeout-ms INTEGRATIONS_DESTROY_TIMEOUT_MS 1000 优雅关闭过程中每次迭代等待integration正确终止的持续时间(毫秒)。默认1秒
integrations.destroy.count INTEGRATIONS_DESTROY_COUNT 10 在强制停止进程前尝试优雅关闭的迭代次数
integrations.destroy.forced-shutdown-timeout-ms INTEGRATIONS_DESTROY_FORCED_SHUTDOWN_TIMEOUT_MS 15000 若优雅关闭未开始或超过允许时间,强制停止应用前的最大等待时间(毫秒)
integrations.allow-local-network-hosts INTEGRATIONS_ALLOW_LOCAL_NETWORK_HOSTS true 启用/禁用integration本地网络主机
integrations.uplink.callback-threads-count INTEGRATIONS_UPLINK_THREADS 4 处理上行事件到TBMQ节点回调的线程池线程数
integrations.manage.lifecycle-threads-count INTEGRATIONS_MANAGE_LIFECYCLE_THREADS 4 处理integration生命周期事件(CREATE/UPDATE/DELETE)的线程池线程数
integrations.manage.command-threads-count INTEGRATIONS_MANAGE_COMMAND_THREADS 4 处理integration验证请求的线程池线程数
integrations.external.threads-count INTEGRATIONS_EXTERNAL_THREADS 10 用于处理外部操作的线程池线程数,例如向Kafka主题生产消息
integrations.netty.threads-count INTEGRATIONS_NETTY_SHARED_GROUP_THREADS 0 Netty共享worker组线程数。默认0表示线程数为availableProcessors * 2。用于通过MQTT bridge(integration)向外部MQTT broker发送消息

管理参数

参数环境变量默认值描述
management.health.diskspace.enabled HEALTH_DISKSPACE_ENABLED false 启用/禁用磁盘空间健康检查
management.endpoint.health.show-details HEALTH_SHOW_DETAILS never 控制健康端点是否显示完整组件详情(如Redis、DB、TBMQ)。选项:'never':始终隐藏详情(启用安全时的默认);'when-authorized':仅对已认证用户显示详情;'always':响应中始终包含完整健康详情
management.endpoints.web.exposure.include METRICS_ENDPOINTS_EXPOSE health,info,prometheus 指定通过HTTP暴露哪些Actuator端点。使用 'health,info' 仅暴露基本健康和信息端点。如需暴露Prometheus指标,在列表中加入 'prometheus'(如 'health,info,prometheus')

统计参数

参数环境变量默认值描述
stats.ie.enabled STATS_IE_ENABLED true 启用/禁用向日志打印统计
stats.ie.print-interval-ms STATS_IE_PRINT_INTERVAL_MS 60000 打印统计的周期(毫秒)。默认值对应1分钟
stats.timer.percentiles STATS_TIMER_PERCENTILES 0.5 Actuator为定时器指标返回的百分位数。逗号分隔的双精度值列表
stats.system-info.persist-frequency STATS_SYSTEM_INFO_PERSIST_FREQUENCY_SEC 60 系统信息(CPU、内存使用等)持久化频率(秒)

事件配置参数

参数环境变量默认值描述
event.error.rate-limits.enabled EVENT_ERROR_RATE_LIMITS_ENABLED true 为true时启用限速
event.error.rate-limits.integration EVENT_ERROR_RATE_LIMITS_INTEGRATION 5000:3600,100:60 所有integration每小时不超过5000条消息或每分钟不超过100条消息
event.error.rate-limits.ttl-minutes EVENT_ERROR_RATE_LIMITS_TTL 60 达到错误事件限速后,防止限速事件重复持久化的时间(分钟)