将 Logstash 管道从 Azure Event Hubs 迁移到 Kafka 输入插件
作者来自 Elastic Álex Cámara逐步指南将 Logstash 管道从 Azure Event Hubs 插件迁移到 Kafka 输入插件以消除偏移量存储成本并提升性能。简介Azure Event Hubs 原生支持 Apache Kafka 协议这意味着你不再需要 logstash-input-azure_event_hubs 插件或用于偏移量检查点的外部 Blob Storage 账户。切换到 logstash-input-kafka 可以移除存储依赖降低成本并实现最高 2.5 倍的吞吐量。本指南将带你完成迁移为什么重要、如何转换现有配置、两个插件之间的参数映射以及如何调整代理设置。为什么要迁移从 Azure Event Hubs 插件迁移到 Kafka input 插件的原因有几个Azure Event Hubs 原生支持 Kafka。Event Hubs 在 Standard、Premium 和 Dedicated 层提供内置的 Apache Kafka 端点。这意味着 logstash-input-azure_event_hubs 插件不再必要。标准的 logstash-integration-kafkainput插件可以直接连接到同一服务无需额外的 Azure 配置。不再需要 Blob Storage 来做偏移量检查点。基于 AMQP 的插件需要外部 Azure Blob Storage 账户来跟踪消费者偏移量。这意味着要创建和维护存储账户并支付每次检查点写入费用。使用 Kafka 协议时偏移量跟踪由 Azure Event Hubs 内部处理无需额外费用也不需要外部存储。GPv1 存储即将退役GPv2 成本更高。Microsoft 将在 2026 年 10 月退役通用目的 v1 存储账户。未手动升级到 GPv2 的账户将被自动迁移。logstash-input-azure_event_hubs 插件在 GPv2 下仍可正常工作所以现有管道不会中断。然而GPv2 对偏移量密集型工作负载的事务成本可能更高。通过切换到 Kafka input 插件这个问题被消除没有存储账户就无需升级也无需支付费用。还没准备好迁移可以暂时降低 GPv2 成本。GPv2 事务定价比 GPv1 的固定费率高得多。将 checkpoint_interval 设置高于默认的 5 秒可以减少写入操作从而降低成本影响。可以使用 Azure 定价计算器估算成本差异。示例East US 和本地保留存储每 10,000 次写操作的成本比较存储类型写入成本GPv1固定$0.00036GPv2Hot 层$0.050大约是写入操作成本的 140 倍增长。更广泛的社区支持和活跃维护。Kafka input 插件在 Logstash 部署中更为广泛使用并定期更新以适配 Kafka 生态系统。迁移到它可以降低长期运维风险让你的管道保持在受支持的路径上。更高吞吐量。在相同命名空间下消费时Kafka input 插件的性能持续优于 Azure Event Hubs 插件。详见性能对比部分。启用 Kafka 接口的要求Kafka 接口已内置在 Azure Event Hubs 中无需在 Azure 门户中启用或配置任何内容。唯一要求是你的 Event Hubs 命名空间必须在 Standard、Premium 或 Dedicated 层。Basic 层不支持 Kafka 协议。请参阅层级对比表获取详细信息。转换你的配置本节将演示如何将现有 logstash-input-azure_event_hubs 配置转换为 logstash-input-kafka从最简单的单 Hub 场景开始再扩展到多 Hub 和高级用例。关键行为变化在更改任何配置之前需要注意两个重要差异不再使用 Blob Storage 来存储偏移量。Kafka input 插件通过 Azure Event Hubs 服务内部跟踪偏移量无需额外费用。storage_connection 和 storage_container 参数没有对应项。无需创建、维护或支付任何存储费用。消费者偏移量不会继承。AMQP 消费者组和 Kafka 消费者组是完全独立的即使它们使用相同名称。Kafka input 插件首次连接时Azure 会自动创建 group_id 中指定的 Kafka 消费者组默认logstash。它不会读取旧的 Blob Storage 检查点也不会从旧插件停止的地方继续而是全新开始。Event Hubs (AMQP) 消费者组Kafka 消费者组协议AMQP偏移量存储外部 Azure Blob Storage创建方式必须通过门户、SDK 或 ARM 创建命名空间范围限于单个 Event Hub限制每个命名空间最多允许 1,000 个同时 Kafka 消费者组。参见 Event Hubs vs. Kafka 消费者组 FAQ。认证logstash-input-azure_event_hubs 插件仅支持通过连接字符串的 SASShared Access Signature认证。相同的 SAS 凭证可通过 SASL PLAIN 与 Kafka 插件一起使用如下面的单个 Event Hub基础迁移示例所示。单个 Event Hub基础迁移大多数管道从单个 Event Hub、SAS 认证和 Blob Storage 检查点开始。以下示例展示了基础的 azure_event_hubs 配置及其直接的 Kafka 对应配置。迁移前旧版 Azure Event Hubs 输入input { azure_event_hubs { event_hub_connections [Endpointsb://NAMESPACE.servicebus.windows.net/;SharedAccessKeyNameACCESS_KEY_NAME;SharedAccessKeyACCESS_KEY;EntityPathEVENT_HUB_NAME] storage_connection DefaultEndpointsProtocolhttps;AccountNameSTORAGE_ACCOUNT_NAME;AccountKeySTORAGE_ACCOUNT_KEY;EndpointSuffixcore.windows.net consumer_group CONSUMER_GROUP_NAME storage_container STORAGE_NAME } }迁移后Kafka inputinput { kafka { # The Namespace name and the mandatory Kafka SSL port bootstrap_servers NAMESPACE.servicebus.windows.net:9093 topics [EVENT_HUB_NAME] group_id KAFKA_CONSUMER_GROUP_NAME security_protocol SASL_SSL sasl_mechanism PLAIN # Need to create a jaas.conf file storing Username and Password (username is always $ConnectionString) jaas_path path/to/jaas.conf } }KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username$ConnectionString passwordEndpointsb://NAMESPACE.servicebus.windows.net/;SharedAccessKeyNameACCESS_KEY_NAME;SharedAccessKeyACCESS_KEY; };# Inline JAAS configuration (substitutes jaas_path) sasl_jaas_config org.apache.kafka.common.security.plain.PlainLoginModule required username$ConnectionString passwordEndpointsb://NAMESPACE.servicebus.windows.net/;SharedAccessKeyNameACCESS_KEY_NAME;SharedAccessKeyACCESS_KEY;多个 Event Hubs 使用单个 Kafka input如果你的 SAS 策略具有命名空间级别的读取权限不仅限于单个 Event Hub你可以通过在 kafka input 中列出多个 topic 来从多个 Event Hubs 消费数据input { kafka { bootstrap_servers NAMESPACE.servicebus.windows.net:9093 topics [EVENT_HUB_1, EVENT_HUB_2, EVENT_HUB_3] group_id KAFKA_CONSUMER_GROUP_NAME security_protocol SASL_SSL sasl_mechanism PLAIN jaas_path path/to/jaas.conf } }配置参数映射以下部分将每个 logstash-input-azure_event_hubs 参数映射到其 logstash-input-kafka 等效项并提供使用说明和示例配置。1config_mode没有直接等效项。Kafka 没有“basic”与“advanced”模式。要从多个 hub 使用不同设置进行消费可以定义多个 kafka {} input 块或列出多个 topic。basic 模式的转换在 Single Event Hub (basic migration) 中已有说明。下面是同一命名空间下两个 Event Hubs 的高级模式示例input { azure_event_hubs { config_mode advanced storage_connection DefaultEndpointsProtocolhttps;AccountNameSTORAGE_ACCOUNT;... event_hubs [ {EVENT_HUB_1 { event_hub_connection Endpointsb://NAMESPACE.servicebus.windows.net/;SharedAccessKeyNameKEY_1;SharedAccessKeyACCESS_KEY;EntityPathEVENT_HUB_1 consumer_group CONSUMER_GROUP_1 }}, {EVENT_HUB_2 { event_hub_connection Endpointsb://NAMESPACE.servicebus.windows.net/;SharedAccessKeyNameKEY_2;SharedAccessKeyACCESS_KEY;EntityPathEVENT_HUB_2 consumer_group CONSUMER_GROUP_2 }} ] } }input { kafka { bootstrap_servers NAMESPACE.servicebus.windows.net:9093 topics [EVENT_HUB_1] group_id KAFKA_CONSUMER_GROUP_1 security_protocol SASL_SSL sasl_mechanism PLAIN sasl_jaas_config ...KEY_1... } kafka { bootstrap_servers NAMESPACE.servicebus.windows.net:9093 topics [EVENT_HUB_2] group_id KAFKA_CONSUMER_GROUP_2 security_protocol SASL_SSL sasl_mechanism PLAIN sasl_jaas_config ...KEY_2... } }2checkpoint_interval对应于 auto_commit_interval_ms。在 Azure 插件中这控制写操作多频繁地写入 Blob Storage 容器以保存读取偏移量。在 Kafka 插件中这控制消费者多频繁将其偏移量提交到 Event Hubs 服务。注意在配置 auto_commit_interval_ms 参数时保持 enable_auto_commit 为 true默认。Azure 配置input { azure_event_hubs { # ... other params ... checkpoint_interval 10 # in seconds } }Kafka 等效配置input { kafka { # ... other params ... auto_commit_interval_ms 10000 # in milliseconds } }3decorate_events此参数在两个插件中均存在名称和行为相同。4initial_position对应于 auto_offset_reset。这两个参数都控制在检查点存储中找不到先前偏移量时从何处开始读取。选项略有不同Azure:beginning,end,look_backKafka:earliest,latest,by_duration:duration,nonebeginning-end 与 earliest-latest 的区别仅在于术语不同。Azure 配置input { azure_event_hubs { initial_position beginning } }Kafka 等效配置input { kafka { auto_offset_reset earliest } }Azure ValueKafka ValueNotesbeginningearliestendlatestlook_backby_duration:Duration in ISO 8601 format (e.g., by_duration:PT1H for 1 hour). Requires logstash-integration-kafka 12.1.0.by_duration 选项在 Apache Kafka client 4.0.0 中引入并且在 logstash-integration-kafka 12.1.0 及更高版本可用。最新 Logstash 版本捆绑的版本低于 12.1.0因此需要手动更新 gemLOGSTASH_HOME/bin/logstash-plugin install --version 12.1.0 logstash-integration-kafka将 LOGSTASH_HOME 替换为 Logstash 安装目录例如DEB/RPM 包为 /usr/share/logstash。注意由于 Kafka 无法读取旧的 Blob Storage 检查点它将迁移视为首次连接。为了避免重新处理旧插件已处理的数据首次部署时将 auto_offset_reset latest 设置为最新。5max_batch_size对应于 max_poll_records。两个参数都定义了单次轮询/批处理操作中要获取的最大消息数。Azure 配置input { azure_event_hubs { max_batch_size 125 } }Kafka 等效配置input { kafka { max_poll_records 125 } }6threads: 这对应 consumer_threads。两个参数都控制用于并发消费消息的线程数。在 Azure 中最小值是 21 Event Hub 1而在 Kafka 中默认是 1 thread。Azure 配置:input { azure_event_hubs { threads 8 } }Kafka 等效:input { kafka { consumer_threads 8 } }性能比较我们在相同条件下测试了两个插件相同的 Logstash 实例、相同的 Event Hub namespace、相同的分区数量以及相同的 batch/thread 配置。绝对数值依赖于环境但相对差异才是关键。插件Payload吞吐量 (events/s)azure_event_hubs100B~5700kafka100B~14500azure_event_hubs1KB~1500kafka1KB~3200azure_event_hubs10KB~170kafka10KB~290在所有 payload 大小下Kafka input 插件提供的吞吐量比 azure_event_hubs 高 1.7x 到 2.5x。增益在小 payload 下最明显因为协议开销占主导。除了简化基础设施无需 Blob Storage无需 GPv2 考虑你还会获得明显的性能提升。代理连接配置如果 Logstash 实例直接连接 Azure Event Hubs 而不使用代理可跳过本节。在迁移过程中代理设置需要特别注意因为两个插件使用的协议本质不同。Azure Event Hubs 插件设置参考logstash-input-azure_event_hubs 插件支持 HTTPS 代理。设置包括1设置代理环境变量export https_proxyhttps://my_proxy:80802在 Event Hubs 连接字符串中添加 WebSockets 传输标志;TransportTypeAmqpWebSockets3添加以下 JVM 选项Logstash jvm.options-Dhttp.proxyHostmy_proxy -Dhttp.proxyPort8080 -Dhttps.proxyHostmy_proxy -Dhttps.proxyPort8443 -Dhttp.nonProxyHostslocalhost|127.0.0.1迁移到 TCP第 4 层代理Azure 插件的代理设置与 Kafka 客户端不兼容。Azure 插件通过 AMQP/WebSocketsHTTP 层通信因此 JVM 代理设置和 TransportTypeAmqpWebSockets 可用。Kafka 插件则直接向 broker 打开原生 TCP socket从不发起 HTTP 请求所以 JVM HTTP 代理设置完全无效。如果环境需要代理必须将 HTTP 代理替换为 TCP第 4 层代理。步骤 1配置 /etc/hostsKafka 客户端会验证 TLS 证书是否与 bootstrap_servers 中的主机名匹配。由于证书为 *.servicebus.windows.net 签发bootstrap_servers 必须使用真实的 Event Hubs FQDN而不是代理地址。可通过 DNS 覆盖将 FQDN 指向代理 IP# /etc/hosts PROXY_HOST_IP NAMESPACE.servicebus.windows.net步骤 2Logstash 配置Logstash 配置与非代理设置相同。/etc/hosts 覆盖会透明地将流量路由到代理因此 bootstrap_servers 仍使用 Event Hubs FQDNinput { kafka { bootstrap_servers NAMESPACE.servicebus.windows.net:9093 topics [EVENT_HUB_NAME] security_protocol SASL_SSL sasl_mechanism PLAIN group_id GROUP_ID jaas_path PATH_TO_JAAS_FILE } }如果 TCP 代理与 Logstash 位于同一主机或受信网络段内则无需 DNS 覆盖。此时可直接将 bootstrap_servers 指向代理 IP例如 PROXY_HOST_IP:9093并将 security_protocol 改为 SASL_PLAINTEXT。这会将 TLS 握手交给代理而 Logstash 与代理之间的连接保持未加密。仅当 Logstash 到代理的路径安全时使用此配置input { kafka { bootstrap_servers PROXY_HOST_IP:9093 security_protocol SASL_PLAINTEXT } }常见问题解答切换从 Azure Event Hubs 插件到 Kafka 插件时会丢失事件吗不会。无论使用哪种协议读取事件在配置的保留期内都会保持可用。变化的是消费者开始读取的位置。由于 Kafka 插件无法访问 AMQP 插件的 Blob Storage 检查点它会从头开始读取。设置 auto_offset_reset earliest 可重新处理所有保留事件或 auto_offset_reset latest 仅从切换点消费新的事件。详情请参见 initial_position 映射。迁移后 Azure Blob Storage 账户会怎样它不再需要用于 offset 检查点。一旦确认 Kafka 插件正常消费并且 azure_event_hubs input 已退役存储账户至少是 checkpoint 容器可以安全删除。如果存储账户用于其他用途仅删除 storage_container 中引用的特定容器即可。可以重用相同的消费者组名称吗可以但没有实际效果。即使 AMQP 和 Kafka 消费者组共享相同名称它们也是完全独立的。它们使用不同协议、不同的 offset 存储和不同的作用域规则。重用名称不会让 Kafka 插件从 AMQP 插件的最后检查点继续。支持其他认证方法吗logstash-input-azure_event_hubs 插件仅支持 SAS 连接字符串因此只需携带 SAS 凭证。无需迁移 Entra ID、OAUTHBEARER 或托管身份配置。logstash-input-kafka 插件支持 SASL OAUTHBEARER因此迁移后可以采用基于令牌的认证。如果代理只允许端口 443 的流量怎么办Azure Event Hubs 上的 Kafka 端点需要端口 9093。如果 TCP 代理只转发端口 443则必须重新配置以允许 Event Hubs FQDN*.servicebus.windows.net的端口 9093。Azure Event Hubs 不在端口 443 上提供 Kafka 监听器。下一步随着 GPv1 退役截止日期2026 年 10 月临近尽早开始迁移可减少管理不再需要的存储基础设施的时间。迁移过程中如出现问题使用问题或配置帮助在 Elastic Discuss 论坛发帖。Kafka 插件中的 bug 或异常行为在 logstash-integration-kafka 提交 issue。相关资源Kafka input 插件文档logstash-input-kafka 所有配置参数的完整参考。Azure Event Hubs input 插件文档被替换的旧插件的完整参考。Azure Event Hubs for Apache Kafka 概览微软关于 Event Hubs 内置 Kafka 端点的文档。Event Hubs 配额与层级比较Kafka 协议支持的层级要求。原文https://www.elastic.co/observability-labs/blog/migrate-logstash-pipelines-from-azure-event-hubs-to-kafka-plugin