spring boot学习第八篇:kafka监听消费

news/2024/7/24 1:04:49 标签: spring boot, 学习, kafka

为了实现监听器功能

pom.xml文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.hmblogs</groupId>
    <artifactId>hmblogs</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>hmblogs</name>
    <description>hmblogs</description>
    <properties>
        <java.version>8</java.version>
        <druid.version>1.2.8</druid.version>
        <log4jdbc.version>1.16</log4jdbc.version>
    </properties>
    <dependencies>
        <!-- druid数据源驱动 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>${druid.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- mybatis -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.5.3.1</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!--Mysql依赖包-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>

        <!--lombok插件-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!--监控sql日志-->
        <dependency>
            <groupId>org.bgee.log4jdbc-log4j2</groupId>
            <artifactId>log4jdbc-log4j2-jdbc4.1</artifactId>
            <version>${log4jdbc.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.9</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.7.2</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

 application.yml文件内容如下:

server:
  port: 8081
  servlet.context-path: /

#配置数据源
spring:
  datasource:
    druid:
      db-type: com.alibaba.druid.pool.DruidDataSource
      driverClassName: net.sf.log4jdbc.sql.jdbcapi.DriverSpy
      url: jdbc:log4jdbc:mysql://${DB_HOST:localhost}:${DB_PORT:3306}/${DB_NAME:eladmin}?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false
      username: ${DB_USER:root}
      password: ${DB_PWD:123456}
  redis:
    host: localhost
    port: 6379
    password: heming
    database: 10

logback.xml文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="10 seconds">
    <!-- 日志级别从低到高分为TRACE < DEBUG < INFO < WARN < ERROR < FATAL,如果设置为WARN,则低于WARN的信息都不会输出 -->
    <!-- scan:当此属性设置为true时,配置文件如果发生改变,将会被重新加载,默认值为true -->
    <!-- scanPeriod:设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,默认单位是毫秒。当scan为true时,此属性生效。默认的时间间隔为1分钟。 -->
    <!-- debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。默认值为false。 -->
    <contextName>logback</contextName>
    <property name="log.path" value="logs"></property>
    <property name="Console_Pattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%logger{50}] - %msg%n"/>

    <appender name="Console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <Pattern>${Console_Pattern}</Pattern>
            <!-- 设置字符集 -->
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <!-- 时间滚动输出 level为 INFO 日志 -->
    <appender name="RollingFileBackend" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${log.path}/hmblogs.log</file>
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level [%logger{50}] - %msg%n</pattern>
            <charset>UTF-8</charset>
        </encoder>
        <!-- 日志记录器的滚动策略,按日期,按大小记录 -->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!-- 每天日志归档路径以及格式 -->
            <fileNamePattern>${log.path}/hmblogs/log-hmblogs-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>100MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
            <!--日志文件保留天数-->
            <maxHistory>15</maxHistory>
        </rollingPolicy>
        <!-- 此日志文件只记录info级别的 -->
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>Info</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <!--additivity:是否继承root节点,默认是true继承。默认情况下子Logger会继承父Logger的appender,
    也就是说子Logger会在父Logger的appender里输出。
    若是additivity设为false,则子Logger只会在自己的appender里输出,而不会在父Logger的appender里输出。-->
    <logger name="org.springframework" level="INFO" additivity="false">
        <appender-ref ref="Console"/>
        <appender-ref ref="RollingFileBackend"/>
    </logger>
    <logger name="org.mybatis" level="INFO"></logger>
    <logger name="org.hibernate.SQL" level="DEBUG"  additivity="false">
        <appender-ref ref="Console"/>
        <appender-ref ref="RollingFileBackend"/>
    </logger>
    <Logger name="org.apache.catalina" level="info"/>
    <Logger name="org.apache.tomcat.util" level="info"/>
    <!-- 从低到高为:All < Trace < Debug < Info < Warn < Error < Fatal < OFF-->
    <root level="Info">
        <appender-ref ref="Console"/>
        <appender-ref ref="RollingFileBackend"/>
    </root>

</configuration>

BackendApplication.java文件内容如下:

package com.hmblogs.backend;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BackendApplication {

    public static void main(String[] args) {
        SpringApplication.run(BackendApplication.class, args);
    }

}

然后添加了kafkaConsumerListenerExample.java文件

package com.hmblogs.backend.util;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 *
 * @description:  kafka 消费者
 * @copyright: @Copyright (c) 2022
 * @company: hmblogs
 * @author: heming
 * @version: 1.0.0
 * @createTime: 2024-01-18 8:31
 */
@Component
@Slf4j
public class kafkaConsumerListenerExample {

    @KafkaListener(topics = "test", groupId = "0")
    public void consume(ConsumerRecord<?, ?> record) {
        Optional<?> value = Optional.ofNullable(record.value());
        // 进行消息处理逻辑
        log.info("print message: " + value);
    }
}


发到服务器上,启动hmblogs报错,截图如下:

Caused by: java.lang.TypeNotPresentException: Type org.springframework.kafka.listener.CommonErrorHandler not present

java.lang.ClassNotFoundException: org.springframework.kafka.listener.CommonErrorHandler

网上搜索资料,大部分讲的都是包冲突,在本地启动也是报这样的错,如下所示:

 


http://www.niftyadmin.cn/n/5333906.html

相关文章

element-ui的el-upload组件实现上传拖拽排序图片顺序(sortablejs)

<template><!-- 省略其他配置 --><el-upload ref"upload" :file-list.sync"fileList"></el-upload></template><script>import Sortable from sortablejs;export default {data() {return {fileList: []};},mounted()…

阿里巴巴为什么要去做中台呢?

2015年底&#xff0c;当大部分企业忙着进行年度总结和下一年的规划的时候&#xff0c;阿里巴巴集团对外宣布全面启动阿里巴巴集团2018年中台战略&#xff0c;构建符合DT时代的更具创新性、灵活性的“大中台、小前台”组织机制和业务机制&#xff0c;即作为前台的一线业务会更敏…

医学图像的图像处理、分割、分类和定位-1

一、说明 本报告全面探讨了应用于医学图像的图像处理和分类技术。开展了四项不同的任务来展示这些方法的多功能性和有效性。任务 1 涉及读取、写入和显示 PNG、JPG 和 DICOM 图像。任务 2 涉及基于定向变化的多类图像分类。此外&#xff0c;我们在任务 3 中包括了胸部 X 光图像…

ReactNative中样式与布局的书写

样式 const styles StyleSheet.create({container: {flex: 1,justifyContent: center,alignItems: center,backgroundColor: #F5FCFF,}, welcome: {fontSize: 20, textAlign: center,margin: 10, }, instructions: {textAlign: center,color: #333333,marginBottom: 5,}, …

基于R语言的NDVI的Sen-MK趋势检验

本实验拟分析艾比湖地区2010年至2020年间的NDVI数据&#xff0c;数据从MODIS遥感影像中提取的NDVI值&#xff0c;在GEE遥感云平台上将影像数据下载下来。代码如下&#xff1a; import ee import geemap geemap.set_proxy(port7890)# 设置全局网络代理 Map geemap.Map()# 指定…

解决Python安装库时出现的Requirement already satisfied问题

uirement already satisfied的问题当我们用pip install 库名时&#xff0c;出现了下面 Requirement already satisfied WARNING: Ignoring invalid distribution -ip 的问题 对于这样的问题&#xff0c;解决办法就是在 pip install 后加 - -target你所要添加的库文件地址(注意…

2018年认证杯SPSSPRO杯数学建模A题(第二阶段)海豚与沙丁鱼全过程文档及程序

2018年认证杯SPSSPRO杯数学建模 基于聚类分析的海豚捕食合作策略 A题 海豚与沙丁鱼 原题再现&#xff1a; 沙丁鱼以聚成大群的方式来对抗海豚的捕食。由于水下光线很暗&#xff0c;所以在距离较远时&#xff0c;海豚只能使用回声定位方法来判断鱼群的整体位置&#xff0c;难…

DNS - 全家桶(114 DNS、阿里DNS、百度DNS 、360 DNS、Google DNS)

DNS是什么&#xff1f; DNS是域名系统,Domain Name System的缩写,是一个服务。 DNS就是把域名解析为IP地址&#xff0c;提供我们上网&#xff0c;我们能够上网最终是找到IP地址。 比如&#xff0c;http://xxxx.com是域名&#xff0c;那么他的IP地址假设是144.144.144.144&am…