news 2026/6/15 2:53:38

Kafka 和springboot 整合Logback日志

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Kafka 和springboot 整合Logback日志

Spring Boot 整合 Kafka 进行日志处理是一个常见的任务,可以帮助你更好地管理和分析应用程序的日志。以下是一个基本的步骤指南,帮助你完成这个整合。

本文介绍了如何在SpringBoot应用中整合Kafka,利用Logback收集日志并发送到Kafka。详细步骤包括添加Kafka相关依赖,配置logback.xml,自定义KafkaAppender以及提供测试。

首先安装kafka.。之前已经安装过了,我们这里不再讲。,可以参考前面讲的类容:

https://blog.csdn.net/lchmyhua88/article/details/155640953?spm=1001.2014.3001.5502

接着我们按照下面的步骤开始:

  • 1.pom依赖
  • 2.logback.xml配置
  • 3.配置 kafka消费者
  • 4.监听消费消息&测试代码

1. 添加依赖

<!-- Spring Boot Starter Kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!-- Logback Kafka Appender --> <dependency> <groupId>com.github.danielwegener</groupId> <artifactId>logback-kafka-appender</artifactId> <version>0.2.0-RC2</version> </dependency> <!-- Kafka Clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency>

2.配置 Logback

<configuration> <!-- 控制台输出appender --> <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> </appender> <!-- Kafka appender --> <appender name="KAFKA" class="com.github.danielwegener.logback.kafka.KafkaAppender"> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> <!-- Kafka topic名称 --> <topic>log-topic</topic> <!-- Kafka bootstrap servers --> <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" /> <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" /> <!-- Kafka producer配置 --> <producerConfig>bootstrap.servers=192.168.33.10:9092</producerConfig> <producerConfig>acks=0</producerConfig> <producerConfig>linger.ms=1000</producerConfig> <producerConfig>max.block.ms=0</producerConfig> <!-- 过滤器设置(可选) --> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>INFO</level> </filter> </appender> <!-- 根logger配置 --> <root level="INFO"> <appender-ref ref="CONSOLE"/> <appender-ref ref="KAFKA"/> </root> </configuration>

3.配置 kafka消费者,为了方便测试我们把生产者消费者都配置上

spring.kafka.bootstrap-servers=192.168.33.10:9092 #spring.kafka.consumer.group-id=my-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages=* spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonDeserializer

4.注入kafka bean ,方便后面调试生产者。

@Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public ProducerFactory<String, Object> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, Object> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }

5.监听消费消息

@Component public class LogConsumer { @KafkaListener(topics = "log-topic",groupId = "my-group") public void listen(String message) { System.out.println("收到日志消息: " + message); } }

6.添加日志打印

@RestController @RequestMapping("/kafka") public class LogController { @Autowired private KafkaProducerServiceImpl producerService; private static final Logger logger = (Logger) LoggerFactory.getLogger(LogController.class); @GetMapping("/log") public String log() { //producerService.sendLogMessage("This is a test log message"); logger.info("这是一条测试日志消息"); logger.warn("这是一条警告日志消息"); logger.error("这是一条错误日志消息"); return "Log message sent"; } }

下面我们开始测试,通过调用接口:

控制台可以看到我写3条日志,kafka消费3条消息

这样我们就通过logback把日志收集到kafka里面。方便数据分析。当然你也可以写入es进行分析画像。

我们可以通过kafka manger控制台来监控消息。

下载kafka-manager,下载地址: https://github.com/yahoo/kafka-manager

解压后配置application.conf kafka地址:

保存后,指定8888端口启动:

bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8888

启动后打开控制面板,可以看到kafka集群节点信息和分区,消费偏移等信息:

http://192.168.33.10:8888/

详情可以去官网看看介绍:

https://github.com/yahoo/CMAK

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/15 12:30:41

ansible-hoc 模块使用

文章目录ansible模块ansible工具ansible代码验收ansible-hoc 常用模块命令说明命令执行的模块常用模块练习file模块练习copy模块练习yum_repository模块练习yum模块练习service模块练习systemd模块练习cron模块练习user模块练习group模块练习fetch模块练习get_url模块练习unarc…

作者头像 李华
网站建设 2026/6/15 10:28:50

ansible部署nfs

文章目录实验环境安装ansible部署nfs网络文件系统1、创建系统用户和组2、在nfs上创建共享目录3、编辑nfs配置文件4、开启nfs服务5、进行挂载实验环境 主机IP【配置静态IP地址】主机名字&#xff08;身份&#xff09;10.0.0.61m01&#xff08;管理节点&#xff09;10.0.0.31nfs…

作者头像 李华
网站建设 2026/6/15 14:33:29

29、Ubuntu多媒体工具与其他发行版使用指南

Ubuntu多媒体工具与其他发行版使用指南 1. 多媒体工具介绍 Ubuntu拥有丰富的多媒体工具,以下为你详细介绍几个实用的工具。 1.1 Rosegarden音乐制作 Rosegarden是一款强大的音乐制作软件。除了亲自试用其各项功能来了解用途外,你还可以通过帮助菜单找到完整的在线手册链接…

作者头像 李华
网站建设 2026/6/15 12:55:05

探索多虚拟电厂联合调度优化模型:集中式算法的实践

&#xff08;集中式算法&#xff09;多虚拟电厂联合调度优化模型&#xff0c;包括电动汽车&#xff0c;柔性负荷等&#xff0c;有建模文件&#xff0c;代码根据文件编写&#xff0c;注释清晰。 可扩展改写性强&#xff0c; 运行平台&#xff1a;matlabyalmipcplex 代码一经&…

作者头像 李华
网站建设 2026/6/15 13:06:35

C++ 相对 C 的语法补充:解决痛点,让代码更简洁安全

C 语言作为结构化编程的经典&#xff0c;但在大型项目、代码灵活性和安全性上有不少短板 —— 比如名字冲突、指针难用、函数传参死板等。C 作为 C 的超集&#xff0c;不仅兼容所有 C 语法&#xff0c;还新增了多个特性精准解决这些问题。今天用 “痛点 方案 极简代码” 的方…

作者头像 李华
网站建设 2026/6/15 13:28:52

22、正则表达式全解析

正则表达式全解析 正则表达式是一种用于匹配文本模式的强大工具。除了普通字符外,正则表达式还包含元字符,用于指定更复杂的匹配规则。 正则表达式元字符 正则表达式的元字符如下: ^ $ . [ ] { } - ? * + ( ) | \除了这些元字符,其他字符都被视为普通字符。不过,反斜…

作者头像 李华