Kafka(三)生产者发送JSON消息+使用统一序列化器+提升吞吐量
文章目录生产者发送思路使用统一序列化器配置生产者参数提升吞吐量发送消息关闭生产者结语示例源码仓库生产者发送思路如何确保消息格式正确的前提下最终一定能发送到Kafka? 这里的实现思路是ack使用默认的all 同时设置 min.insync.replicas 以提高容错性开启重试在一定时间内重试不成功则入库后续由定时任务继续发送这里在某些异常情况下一定会生产重复消息如何确保消息只消费一次后续在Consumer实现中详细展开这里我们只要确保生产的消息不论重试多少次最终都只会被发送到同一分区。Kafka的确定消息的分区策略是 如果提供了key则根据hash(key)计算分区。由于我们每个消息都有一个消息ID不管是重试多少次ID是不会变的同时我们不会在消息高峰阶段调整分区数量。所以基于这些我们保证一个消息无论多少次都会发送到同一分区。有MQ开发经验的同学大概都知道消息被发送到Broker之后不一定会马上被持久化到磁盘上基本上都是会写入到操作系统的缓存中由操作系统决定什么时候将数据刷新到磁盘上。Kafka也是同理所以严格意义上来说即使我们采用上述5个步骤也不一定保证发送给Kafka的数据不会丢失。Kafka提供了几个log.flush参数和flush.message、flush.ms来控制刷新磁盘的时间和写入log时间具体参数和配置参考Kafka官方文档 Broker配置和Kafka官方文档 Topic配置。Kafka官方是不建议设置的让操作系统自己决定。并且官方提到了这样一句话We generally feel that the guarantees provided by replication are stronger than sync to local disk, however the paranoid still may prefer having both and application level fsync policies are still supported也就是官方认为数据的可靠性靠副本机制来保证而不是强制本地磁盘刷新当然了如果你没有副本只是一个单机节点的话可以考虑设置磁盘刷新相关配置。关于flush的更多细节推荐阅读 Kafka 官方文档 Application vs. OS Flush Management以及后面的几节使用统一序列化器消息格式为JSON, 需要使用Jackson将类序列化为JSON字符串。但是如果我们有多种POJO消息每一个都去实现官方的Serializer接口显然不太好能不能利用泛型的帮我们完成呢开源的Kafka版本没有提到这一部分好在我在confluent官方GitHub仓库找到了对应实现只需要引入依赖即可dependencygroupIdio.confluent/groupIdartifactIdkafka-json-serializer/artifactIdversion7.5.1/version/dependency同时要指明仓库URLrepositoriesrepositoryidconfluent/idurlhttps://packages.confluent.io/maven//url/repository/repositories具体配置见下面详细代码的这一行result.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,KafkaJsonSerializer.class.getName());配置生产者参数有几点需要注意开启压缩retries 官方建议不配置 官方建议使用delivery.timeout.ms 参数来控制重试时间 默认2分钟buffer.memory 如果没有什么特别情况使用默认的即可 32MBack使用默认的all配置client.id 防止防止InstanceAlreadyExistsException/** * 以下配置建议搭配 官方文档 kafka权威指南相关章节 实际业务场景吞吐量需求 自己调整 * 如果是本地 bootstrap.server的IP地址和docker-compose.yml中的EXTERNAL保持一致 * 关于消息压缩类型官方建议选择lz4详情见博文 https://www.confluent.io/blog/apache-kafka-message-compression/ * * 建议设置client.id, 防止InstanceAlreadyExistsException 异常, * 如果不设置, kafka会自动生成一个client.id, 默认格式是producer-1, 代码逻辑见{link ProducerConfig#maybeOverrideClientId(Map)} * kafka Java client 会使用client.id生成JMX的ObjectName, 代码逻辑见{link KafkaProducer#KafkaProducer(ProducerConfig, Serializer, Serializer, ProducerMetadata, KafkaClient, ProducerInterceptors, Time)} 中的registerAppInfo * 如果多个应用(也就是多个进程)都没有设置client.id, 使用默认的client.id的规则生成的client.id则重复, 会抛出InstanceAlreadyExistsException * 如果是同一应用(也就是同一进程)创建多个producer, 不设置client.id的话不会抛出InstanceAlreadyExistsException, 因为其内部有一个自动递增的计数器{link ProducerConfig#PRODUCER_CLIENT_ID_SEQUENCE} */publicstaticPropertiesloadProducerConfig(StringvalueSerializer){PropertiesresultnewProperties();result.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.0.102:9093);// 建议设置client.idresult.put(ProducerConfig.CLIENT_ID_CONFIG,SERVER_ID);result.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());result.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,KafkaJsonSerializer.class.getName());result.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,CompressionType.LZ4.name);// 每封邮件消息大小大约20KB, 使用默认配置吞吐量不高下列配置增加kafka的吞吐量// 默认16384 bytes太小了这会导致邮件消息一个一个发送到kafka达不到批量发送的目的不符合发送邮件的场景result.put(ProducerConfig.BATCH_SIZE_CONFIG,1048576*10);// 默认1048576 bytes限制的是一个batch的大小对于20KB的消息来说消息太小result.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576*10);// 等10ms, 为了让更多的消息聚合到一个batch中提高吞吐量result.put(ProducerConfig.LINGER_MS_CONFIG,10);returnresult;}提升吞吐量在实际场景中我们的邮件消息一个大概20KB而batch.size默认是16KB也就是说在不修改该参数的情况下生产者只能一个一个的发消息这会导致我们的吞吐量上不去 所以修改batch.size为10MB只修改这个参数还不行 max.request.size 限制了单次请求的大小默认为1MB也就是说即使batch.size为10MB但是由于一次只能最多发1MB吞吐量也上不去所以这里将max.request.size也改为10MB由于我们将一个批次可发送的数量大大提高所以可以让生产者等一会再发等更多的数据到达。linger.ms默认是为0也就是立刻发送根据实际情况适当增加等待时间发送消息LogpublicclassMessageProducer{publicstaticfinalKafkaProducerString,UserDTOPRODUCERnewKafkaProducer(KafkaConfiguration.loadProducerConfig(UserDTOSerializer.class.getName()));privateMessageFailedServicemessageFailedServicenewMessageFailedService();/** * kafka producer 发送失败时会进行重试相关参数 retries 和 delivery.timeout.ms, 官方建议使用delivery.timeout.ms默认2分钟 * callback函数只有在最后一次重试之后才会调用 如果你想在本地测试Kafka生产者的重试详情可以看https://lists.apache.org/thread/nwg326bxpo7ry116nqhxmsmc3bokc6hm * param userDTO */publicvoidsendMessage(finalUserDTOuserDTO){ProducerRecordString,UserDTOusernewProducerRecord(email,userDTO.getMessageId(),userDTO);try{PRODUCER.send(user,(recordMetadata,e)-{if(Objects.nonNull(e)){log.severe(message has sent failed);MessageFailedEntitymessageFailedEntitynewMessageFailedEntity();messageFailedEntity.setMessageId(userDTO.getMessageId());ObjectMappermappernewObjectMapper();try{messageFailedEntity.setMessageContentJsonFormat(mapper.writeValueAsString(userDTO));}catch(JsonProcessingExceptionjsonProcessingException){log.severe(message content json format failed);}messageFailedEntity.setMessageType(MessageType.EMAIL);messageFailedEntity.setMessageFailedPhrase(MessageFailedPhrase.PRODUCER);messageFailedEntity.setFailedReason(e.getMessage());// 如果sendMessage传进来的是个list也同理不能放到list.foreach外面// 如果放在主线程里由于kafka producer是异步的// kafka producer的执行速度可能慢于主线程可能拿到的值是空的是有问题的例如拿到的failedReason是空的messageFailedService.saveOrUpdateMessageFailed(messageFailedEntity);}else{log.info(message has sent to topic: recordMetadata.topic(), partition: recordMetadata.partition());}});}catch(TimeoutExceptione){log.info(send message to kafka timeout, message: );// TODO: 自定义逻辑比如发邮件通知kafka管理员}}}对上述代码做几点解释我们使用异步的方式发送如果发送成功打印一条消息关键在于重试callback函数只有在最后一次重试之后才会调用。不会重试多少次就调用多少次callback 这个问题我发邮件问过社区 详情见这里的 邮件想要本地测试或debug重试相关逻辑的话可以将min.insync.replicas改大一点例如我只有一个Kafka节点那么我设置min.insync.replicas为大于1的值并且设置Kafka的Producer的确认设置是all那么这时候发送消息就会看到重试相关日志关闭生产者实现ServletContextListener接口, 然后在web.xml的listener元素中配置publicclassKafkaListenerimplementsServletContextListener{privatestaticfinalListKafkaProducerKAFKA_PRODUCERSnewLinkedList();OverridepublicvoidcontextInitialized(ServletContextEventsce){KAFKA_PRODUCERS.add(MessageProducer.PRODUCER);}OverridepublicvoidcontextDestroyed(ServletContextEventsce){KAFKA_PRODUCERS.forEach(KafkaProducer::close);}}?xml version1.0 encodingUTF-8 ?web-appxmlnshttps://jakarta.ee/xml/ns/jakartaeexmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttps://jakarta.ee/xml/ns/jakartaee https://jakarta.ee/xml/ns/jakartaee/web-app_6_0.xsdversion6.0listenerlistener-classcom.business.server.listener.KafkaListener/listener-class/listener/web-app结语在实际编码过程中可以参考官方写的Kafka权威指南对应章节书写或者参考各大云服务厂商的Kafak的开发者文档。不过我建议还是看Kafka权威指南 我看了阿里云和华为云的虽然都号称兼容开源Kafka但是发现其版本和开源版本之间存在一定的滞后性许多最佳实践已经过时Kafka生产者端没什么特别的主要是根据业务场景设计消息格式以及如何尽可能的减小消息体积如果你的消息很大比我的场景还大达到了1M以上生产者的吞吐量是个问题消费者的消费速度也是个问题。你要是问我有什么好的想法没有具体场景我确实想不出什么好的方式。我目前能想到的解决方式是减少序列化之后的消息体积例如可以使用Avro或Protobuf不过这两个框架我还没实践过。有相关经验的同学可以分享一下示例源码仓库Github地址项目下business-server module代表生产者运行时IDEA配置如下注意Application context的路径, 启动之后访问端口Application context, 例如http://localhost:8999/business-server下一篇博文将介绍消费者消费消息以及消费者的重要参数配置还有消费逻辑的重试机制等。