上一篇【第11篇】KafkaProducer源码全景图——一条消息的奇幻旅程下一篇【第013篇】Kafka序列化器深度解析——自定义Serializer不再是难题摘要你可能用过Spring MVC的拦截器、MyBatis的拦截器、Nginx的拦截器……但你用过Kafka的拦截器吗Kafka Producer提供了ProducerInterceptor接口让你在消息发送前onSend和收到Broker响应后onAcknowledgement两个关键节点插入自定义逻辑——不修改一行业务代码就能实现全链路追踪TraceId注入、消息内容校验、发送耗时统计等功能。本文从ProducerInterceptor接口设计讲起深入ProducerInterceptors的拦截器链源码剖析其执行顺序和异常处理机制然后手把手带你实现一个生产可用的TraceId注入拦截器。一、拦截器在Producer中的位置回顾上篇的KafkaProducer全景图拦截器是消息处理链的第一站send(record) │ ▼ ┌───────────────────┐ │ Interceptors │ ← 第一条消息先到这儿 │ onSend(record) │ 可以修改/过滤/标注消息 └───────┬───────────┘ ▼ ┌───────────────────┐ │ Serializer │ 序列化Key/Value └───────┬───────────┘ ▼ ┌───────────────────┐ │ Partitioner │ 计算目标分区 └───────┬───────────┘ ▼ ┌───────────────────┐ │ RecordAccumulator │ 进入缓冲准备发送 └───────────────────┘ ...网络发送... ▼ ┌───────────────────┐ │ Interceptors │ ← 收到Broker响应时再回调 │ onAcknowledgement│ 可以统计耗时/成功率 └───────────────────┘两次回调时机onSend()消息刚进入send()方法还没序列化的最早时机onAcknowledgement()Sender线程收到Broker的ProduceResponse后调用用户callback之前二、ProducerInterceptor接口——只有一个要实现的小合同// org.apache.kafka.clients.producer.ProducerInterceptorK, VpublicinterfaceProducerInterceptorK,VextendsConfigurable{/** * 消息发送前调用在用户线程中执行 * * param record 原始消息 * return 修改后的消息可以返回同一个对象或新对象 * 如果返回 null这条消息会被丢弃不发送 */ProducerRecordK,VonSend(ProducerRecordK,Vrecord);/** * 收到Broker响应后调用在Sender线程中执行 * * param metadata 发送结果分区offset时间戳 * param exception 如果发送失败这里不为null */voidonAcknowledgement(RecordMetadatametadata,Exceptionexception);/** * Producer关闭时调用释放资源 */voidclose();}关键要点onSend返回null→ 消息被丢弃不会被序列化或发送onAcknowledgement运行在Sender线程不是用户线程注意线程安全close()在 Producer关闭时调用适合做汇总日志比如本次总共发送了X条三、ProducerInterceptors——拦截器链的调度器Kafka用ProducerInterceptors类管理多个拦截器的协作// org.apache.kafka.clients.producer.internals.ProducerInterceptorspublicclassProducerInterceptorsK,VimplementsCloseable{// 拦截器列表按配置顺序也就是拦截器链privatefinalListProducerInterceptorK,Vinterceptors;publicProducerInterceptors(ListProducerInterceptorK,Vinterceptors){// 深拷贝防止外部修改this.interceptorsnewArrayList(interceptors);}/** * 遍历所有拦截器的 onSend() * 拦截器链模式每个拦截器的输出是下一个的输入 */publicProducerRecordK,VonSend(ProducerRecordK,Vrecord){ProducerRecordK,VinterceptRecordrecord;for(ProducerInterceptorK,Vinterceptor:this.interceptors){try{// 前一个拦截器的输出 后一个拦截器的输入interceptRecordinterceptor.onSend(interceptRecord);}catch(Exceptione){// 单个拦截器异常不会影响其他拦截器log.warn(Error executing interceptor onSend callback,e);}// 如果某个拦截器返回了 null → 丢弃消息if(interceptRecordnull){returnnull;}}returninterceptRecord;}/** * 遍历所有拦截器的 onAcknowledgement() */publicvoidonAcknowledgement(RecordMetadatametadata,Exceptionexception){for(ProducerInterceptorK,Vinterceptor:this.interceptors){try{interceptor.onAcknowledgement(metadata,exception);}catch(Exceptione){// 同样单个拦截器的异常不传播log.warn(Error executing interceptor onAcknowledgement callback,e);}}}/** * 关闭所有拦截器 */publicvoidclose(){for(ProducerInterceptorK,Vinterceptor:this.interceptors){try{interceptor.close();}catch(Exceptione){log.warn(Error closing interceptor,e);}}}}设计要点分析拦截器链执行模式 record ──► [拦截器A].onSend() ──► record ──► [拦截器B].onSend() ──► record ──► 序列化... 特点 1. 责任链模式A的输出是B的输入 2. 任何一环返回null → 消息被丢弃后续拦截器不会执行 3. 任何一环抛异常 → 只打日志继续执行下一个 不会因为一个拦截器挂了就影响整个Producer onAcknowledgement 回调 不分先后顺序每个拦截器都会收到回调独立通知 注意这里运行在 Sender 线程不是用户线程如何配置多个拦截器# producer.properties interceptor.classescom.example.TraceIdInterceptor,com.example.MetricsInterceptor,com.example.AuditInterceptor多个拦截器用逗号分隔按书写顺序组成拦截器链。第一个在链的最前端。四、实战一TraceId注入拦截器——全链路追踪的基础这是最常见的拦截器使用场景在消息头里注入TraceId让消费者可以串联起整个调用链。importorg.apache.kafka.clients.producer.ProducerInterceptor;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.clients.producer.RecordMetadata;importorg.apache.kafka.common.header.Headers;importorg.apache.kafka.common.header.internals.RecordHeaders;importorg.slf4j.MDC;importjava.util.Map;/** * TraceId注入拦截器 * * 功能 * 1. onSend() 时从MDC中取出traceId注入到Kafka消息头 * 2. 如果MDC没有traceId自动生成一个避免断链 * 3. onAcknowledgement() 时打印发送日志可选 */publicclassTraceIdInterceptorimplementsProducerInterceptorString,String{privatestaticfinalStringTRACE_ID_HEADERX-Trace-Id;privatestaticfinalStringSPAN_ID_HEADERX-Span-Id;OverridepublicProducerRecordString,StringonSend(ProducerRecordString,Stringrecord){// 1. 获取当前traceIdStringtraceIdMDC.get(TRACE_ID_HEADER);if(traceIdnull||traceId.isEmpty()){// 如果MDC中没有比如非Web请求自动生成一个traceIdgenerateTraceId();}StringspanIdMDC.get(SPAN_ID_HEADER);if(spanIdnull||spanId.isEmpty()){spanIdgenerateSpanId();}// 2. 构建新的Headers包含原有headers trace信息HeadersnewHeadersnewRecordHeaders(record.headers().toArray());newHeaders.add(TRACE_ID_HEADER,traceId.getBytes());newHeaders.add(SPAN_ID_HEADER,spanId.getBytes());// 3. 返回带有trace信息的新消息returnnewProducerRecord(record.topic(),record.partition(),record.timestamp(),record.key(),record.value(),newHeaders);}OverridepublicvoidonAcknowledgement(RecordMetadatametadata,Exceptionexception){// 发送完成后的回调if(exception!null){// 发送失败时记录错误日志StringtraceIdMDC.get(TRACE_ID_HEADER);System.err.printf([TraceId%s] 消息发送失败: topic%s, partition%d, error%s%n,traceId,metadata.topic(),metadata.partition(),exception.getMessage());}// 成功时不打日志避免性能开销交由用户自己的callback处理}Overridepublicvoidclose(){// 不需要释放资源}Overridepublicvoidconfigure(MapString,?configs){// 可以从configs中读取自定义配置// 比如是否自动生成traceId、traceId的前缀等}// 工具方法 privateStringgenerateTraceId(){returngen-System.currentTimeMillis()-Integer.toHexString(Thread.currentThread().hashCode());}privateStringgenerateSpanId(){returnspan-Long.toHexString(System.nanoTime());}}消费者端配套代码光生产者加TraceId不够消费者端也要把TraceId放回MDCimportorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.slf4j.MDC;// 消费循环中while(running){ConsumerRecordsString,Stringrecordsconsumer.poll(Duration.ofMillis(1000));for(ConsumerRecordString,Stringrecord:records){// 从消息头中取出traceId放入MDCHeadertraceHeaderrecord.headers().lastHeader(X-Trace-Id);if(traceHeader!null){MDC.put(X-Trace-Id,newString(traceHeader.value()));}HeaderspanHeaderrecord.headers().lastHeader(X-Span-Id);if(spanHeader!null){MDC.put(X-Span-Id,newString(spanHeader.value()));}try{// 处理消息此处的日志会自动带上traceIdprocessMessage(record);}finally{// 处理完清理MDC防止线程复用导致traceId串了MDC.clear();}}}五、实战二消息发送耗时统计拦截器用onAcknowledgement统计每条消息从send()到 Broker响应的耗时publicclassMetricsInterceptorimplementsProducerInterceptorString,String{// 用ThreadLocal存放send时间戳因为onSend在用户线程onAcknowledgement在Sender线程// 注意这里存不了——onSend和onAcknowledgement不在同一线程// 正确做法把时间戳存到消息头或者在ProducerRecord中传递privatefinalAtomicLongsentCountnewAtomicLong(0);privatefinalAtomicLongfailedCountnewAtomicLong(0);privatefinalAtomicLongtotalLatencyMsnewAtomicLong(0);OverridepublicProducerRecordString,StringonSend(ProducerRecordString,Stringrecord){// 把当前时间戳注入消息头用于后续计算耗时HeadersnewHeadersnewRecordHeaders(record.headers().toArray());newHeaders.add(X-Send-Timestamp,String.valueOf(System.currentTimeMillis()).getBytes());returnnewProducerRecord(record.topic(),record.partition(),record.timestamp(),record.key(),record.value(),newHeaders);}OverridepublicvoidonAcknowledgement(RecordMetadatametadata,Exceptionexception){if(exception!null){failedCount.incrementAndGet();}else{sentCount.incrementAndGet();// 从metadata里拿不到我们注入的时间戳所以换个方式// 配合消费者端的头信息计算端到端延迟}}Overridepublicvoidclose(){// Producer关闭时打印汇总System.out.printf([MetricsInterceptor] 发送成功: %d, 发送失败: %d%n,sentCount.get(),failedCount.get());}Overridepublicvoidconfigure(MapString,?configs){}}六、实战三消息过滤拦截器某些场景下需要不发某些消息onSend返回null就能实现过滤publicclassMessageFilterInterceptorimplementsProducerInterceptorString,String{privateSetStringblacklistKeys;Overridepublicvoidconfigure(MapString,?configs){Stringblacklist(String)configs.get(filter.blacklist.keys);if(blacklist!null){blacklistKeysnewHashSet(Arrays.asList(blacklist.split(,)));}}OverridepublicProducerRecordString,StringonSend(ProducerRecordString,Stringrecord){// 如果消息Key在黑名单中丢弃这条消息if(blacklistKeys!nullblacklistKeys.contains(record.key())){System.out.println([FilterInterceptor] 丢弃消息: keyrecord.key());returnnull;// ← 返回null 丢弃}returnrecord;}OverridepublicvoidonAcknowledgement(RecordMetadatametadata,Exceptionexception){}Overridepublicvoidclose(){}Overridepublicvoidconfigure(MapString,?configs){}}警告这种静默丢弃很容易搞丢重要数据。建议在生产中只用于明确的降级场景并配合告警。七、拦截器的线程安全——你必须知道的事这是最容易踩的坑线程模型 用户线程 Sender线程 │ │ ▼ │ [拦截器].onSend() │ │ │ ▼ │ 序列化 → 分区 → 进Accumulator│ │ ▼ [拦截器].onAcknowledgement() │ ▼ 用户callback.call()方法运行线程注意onSend()用户线程调send()的那个不要做耗时操作会阻塞业务onAcknowledgement()Sender线程不要用ThreadLocal存状态两个线程看到的ThreadLocal不同close()用户线程Producer正常关闭时调用正确传递上下文的方式把信息注入到**消息头Headers**中而不是用ThreadLocal。消息头会随着消息在网络中传输消费者也能读到。八、多个拦截器的配置实战假设你需要同时使用TraceId Metrics Filter三个拦截器PropertiespropsnewProperties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,localhost:9092);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 配置拦截器链按顺序执行props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,com.example.TraceIdInterceptor,// 第一步注入TraceIdcom.example.MessageFilterInterceptor,// 第二步过滤黑名单com.example.MetricsInterceptor// 第三步统计指标);KafkaProducerString,StringproducernewKafkaProducer(props);执行顺序send(record) → TraceIdInterceptor.onSend() // 先注入traceId → MessageFilterInterceptor.onSend() // 再判断是否过滤此时能读到traceId → MetricsInterceptor.onSend() // 最后打指标 → ...发送... → MetricsInterceptor.onAcknowledgement() → MessageFilterInterceptor.onAcknowledgement() → TraceIdInterceptor.onAcknowledgement()本篇小结Kafka拦截器是个被严重低估的特性。三行配置就能实现全链路追踪——这在微服务架构里简直是雪中送炭。核心就一个接口ProducerInterceptor两个方法onSend消息前处理和onAcknowledgement响应后处理配合责任链模式串成拦截器链。今天你掌握了三个实战案例TraceId注入全链路追踪、消息过滤业务降级、指标统计监控可视化。这些都是改个配置就能用的生产实践。下一篇我们进入序列化器Serializer的源码世界。你会发现为什么Kafka推荐用Avro而不是JSON自定义Serializer怎么做敬请期待。上一篇【第11篇】KafkaProducer源码全景图——一条消息的奇幻旅程下一篇【第013篇】Kafka序列化器深度解析——自定义Serializer不再是难题