news 2026/6/15 16:19:35

Spring Boot 4.0 + MyBatis-Plus 实战响应式编程的能力实战

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Spring Boot 4.0 + MyBatis-Plus 实战响应式编程的能力实战

Spring Boot 4.0 + MyBatis-Plus 实战响应式编程的能力实战

Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!

目前 MyBatis-Plus 官方还不完全支持响应式编程,但我们可以结合 R2DBC 和 MyBatis-Plus 的部分特性来实现。这里提供两种方案:

方案一:使用 MyBatis-Plus 增强 R2DBC (推荐)

Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!

1. 项目依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-r2dbc</artifactId></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.3.1</version><!-- 注意:只使用其工具类,不启用SQL执行 --></dependency><!-- 数据库驱动 --><dependency><groupId>io.asyncer</groupId><artifactId>r2dbc-mysql</artifactId><version>1.0.2</version></dependency><!-- 或 PostgreSQL --><!-- <dependency> <groupId>org.postgresql</groupId> <artifactId>r2dbc-postgresql</artifactId> <scope>runtime</scope> </dependency> -->

2. 配置类

importcom.baomidou.mybatisplus.annotation.DbType;importcom.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;importcom.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.data.r2dbc.config.EnableR2dbcAuditing;@Configuration@EnableR2dbcAuditingpublicclassR2dbcMybatisConfig{/** * 只使用 MyBatis-Plus 的分页插件 */@BeanpublicMybatisPlusInterceptormybatisPlusInterceptor(){MybatisPlusInterceptorinterceptor=newMybatisPlusInterceptor();interceptor.addInnerInterceptor(newPaginationInnerInterceptor(DbType.MYSQL));returninterceptor;}}

3. 实体类 (使用 MyBatis-Plus 注解)

Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!

importcom.baomidou.mybatisplus.annotation.*;importlombok.Data;importorg.springframework.data.annotation.CreatedDate;importorg.springframework.data.annotation.LastModifiedDate;importorg.springframework.data.relational.core.mapping.Table;importjava.time.LocalDateTime;@Data@Table("users")publicclassUser{@TableId(type=IdType.AUTO)privateLongid;@TableField("username")privateStringusername;@TableField("email")privateStringemail;@TableField("password")privateStringpassword;@TableField("age")privateIntegerage;@TableLogic@TableField("deleted")privateIntegerdeleted=0;@TableField(value="version",fill=FieldFill.INSERT)@VersionprivateIntegerversion=1;@CreatedDate@TableField("create_time")privateLocalDateTimecreateTime;@LastModifiedDate@TableField("update_time")privateLocalDateTimeupdateTime;// 响应式编程友好的构造方法publicstaticMono<User>of(Stringusername,Stringemail){Useruser=newUser();user.setUsername(username);user.setEmail(email);returnMono.just(user);}}

4. Repository 接口 (R2DBC)

importorg.springframework.data.r2dbc.repository.R2dbcRepository;importorg.springframework.data.r2dbc.repository.Query;importorg.springframework.stereotype.Repository;importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;@RepositorypublicinterfaceUserR2dbcRepositoryextendsR2dbcRepository<User,Long>{Mono<User>findByUsername(Stringusername);Mono<User>findByEmail(Stringemail);Flux<User>findByAgeGreaterThan(Integerage);@Query("SELECT * FROM users WHERE username LIKE :keyword OR email LIKE :keyword")Flux<User>searchUsers(Stringkeyword);@Query("UPDATE users SET age = :age WHERE id = :id")Mono<Integer>updateAgeById(Longid,Integerage);}

5. Service 层 (结合 MyBatis-Plus 工具)

importcom.baomidou.mybatisplus.core.conditions.query.QueryWrapper;importcom.baomidou.mybatisplus.extension.plugins.pagination.Page;importlombok.RequiredArgsConstructor;importorg.springframework.data.domain.Pageable;importorg.springframework.r2dbc.core.DatabaseClient;importorg.springframework.stereotype.Service;importorg.springframework.transaction.annotation.Transactional;importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;importreactor.core.scheduler.Schedulers;importjava.time.Duration;importjava.util.Map;@Service@RequiredArgsConstructorpublicclassReactiveUserService{privatefinalUserR2dbcRepositoryuserRepository;privatefinalDatabaseClientdatabaseClient;publicMono<User>createUser(Useruser){returnuserRepository.save(user);}publicMono<User>getUserById(Longid){returnuserRepository.findById(id).switchIfEmpty(Mono.error(newRuntimeException("User not found")));}publicFlux<User>getAllUsers(){returnuserRepository.findAll().delayElements(Duration.ofMillis(100))// 模拟流处理.subscribeOn(Schedulers.boundedElastic());}publicMono<User>updateUser(Longid,Useruser){returnuserRepository.findById(id).flatMap(existing->{existing.setUsername(user.getUsername());existing.setEmail(user.getEmail());existing.setAge(user.getAge());returnuserRepository.save(existing);});}@TransactionalpublicMono<Void>deleteUser(Longid){returnuserRepository.deleteById(id).then(Mono.fromRunnable(()->System.out.println("User deleted: "+id)));}/** * 使用 MyBatis-Plus 的 QueryWrapper 构建查询条件 * 然后转换为 R2DBC 查询 */publicFlux<User>queryUsers(Map<String,Object>params){// 使用 MyBatis-Plus 的 QueryWrapper 构建条件QueryWrapper<User>queryWrapper=newQueryWrapper<>();if(params.containsKey("username")){queryWrapper.like("username",params.get("username"));}if(params.containsKey("email")){queryWrapper.like("email",params.get("email"));}if(params.containsKey("minAge")){queryWrapper.ge("age",params.get("minAge"));}if(params.containsKey("maxAge")){queryWrapper.le("age",params.get("maxAge"));}queryWrapper.orderByDesc("create_time");// 将 QueryWrapper 转换为 SQLStringsql=buildQueryWrapperSql(queryWrapper);// 执行响应式查询returndatabaseClient.sql(sql).fetch().all().map(row->{Useruser=newUser();user.setId((Long)row.get("id"));user.setUsername((String)row.get("username"));user.setEmail((String)row.get("email"));user.setAge((Integer)row.get("age"));returnuser;});}/** * 响应式分页查询 */publicMono<Page<User>>getUsersPage(Pageablepageable){// 使用 MyBatis-Plus 的 Page 对象Page<User>mybatisPage=newPage<>(pageable.getPageNumber(),pageable.getPageSize());// 计算总数Mono<Long>countMono=databaseClient.sql("SELECT COUNT(*) FROM users").map(row->row.get(0,Long.class)).one();// 查询数据Flux<User>usersFlux=databaseClient.sql("SELECT * FROM users ORDER BY create_time DESC LIMIT :limit OFFSET :offset").bind("limit",pageable.getPageSize()).bind("offset",pageable.getOffset()).fetch().all().map(this::mapRowToUser);returnMono.zip(countMono,usersFlux.collectList()).map(tuple->{mybatisPage.setTotal(tuple.getT1());mybatisPage.setRecords(tuple.getT2());returnmybatisPage;});}privateStringbuildQueryWrapperSql(QueryWrapper<User>queryWrapper){// 简化示例,实际需要更复杂的转换return"SELECT * FROM users WHERE "+queryWrapper.getTargetSql();}privateUsermapRowToUser(Map<String,Object>row){Useruser=newUser();user.setId((Long)row.get("id"));user.setUsername((String)row.get("username"));user.setEmail((String)row.get("email"));user.setAge((Integer)row.get("age"));returnuser;}/** * 批量保存 */publicFlux<User>saveAll(Flux<User>users){returnuserRepository.saveAll(users).onErrorContinue((error,user)->System.err.println("Error saving user: "+error.getMessage()));}/** * 流式查询 */publicFlux<User>streamUsers(){returndatabaseClient.sql("SELECT * FROM users").fetch().all().delayElements(Duration.ofMillis(50))// 控制流速度.map(this::mapRowToUser);}}

6. Controller 层

Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!

importcom.baomidou.mybatisplus.extension.plugins.pagination.Page;importlombok.RequiredArgsConstructor;importorg.springframework.data.domain.PageRequest;importorg.springframework.http.HttpStatus;importorg.springframework.http.MediaType;importorg.springframework.http.ResponseEntity;importorg.springframework.web.bind.annotation.*;importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;importjavax.validation.Valid;importjava.time.Duration;importjava.util.Map;@RestController@RequestMapping("/api/reactive/users")@RequiredArgsConstructorpublicclassReactiveUserController{privatefinalReactiveUserServiceuserService;@PostMapping@ResponseStatus(HttpStatus.CREATED)publicMono<ResponseEntity<User>>create(@Valid@RequestBodyUseruser){returnuserService.createUser(user).map(saved->ResponseEntity.status(HttpStatus.CREATED).body(saved)).onErrorResume(e->Mono.just(ResponseEntity.badRequest().build()));}@GetMapping("/{id}")publicMono<ResponseEntity<User>>getById(@PathVariableLongid){returnuserService.getUserById(id).map(ResponseEntity::ok).defaultIfEmpty(ResponseEntity.notFound().build());}@GetMappingpublicFlux<User>getAll(){returnuserService.getAllUsers();}@GetMapping("/search")publicFlux<User>search(@RequestParamMap<String,Object>params){returnuserService.queryUsers(params);}@GetMapping("/page")publicMono<Page<User>>getPage(@RequestParam(defaultValue="0")intpage,@RequestParam(defaultValue="10")intsize){PageRequestpageRequest=PageRequest.of(page,size);returnuserService.getUsersPage(pageRequest);}@PutMapping("/{id}")publicMono<ResponseEntity<User>>update(@PathVariableLongid,@Valid@RequestBodyUseruser){returnuserService.updateUser(id,user).map(ResponseEntity::ok).defaultIfEmpty(ResponseEntity.notFound().build());}@DeleteMapping("/{id}")publicMono<ResponseEntity<Void>>delete(@PathVariableLongid){returnuserService.deleteUser(id).then(Mono.just(ResponseEntity.noContent().<Void>build())).defaultIfEmpty(ResponseEntity.notFound().build());}/** * Server-Sent Events (SSE) 流式接口 */@GetMapping(value="/stream",produces=MediaType.TEXT_EVENT_STREAM_VALUE)publicFlux<User>stream(){returnuserService.streamUsers();}/** * WebFlux WebSocket 支持 */@MessageMapping("users.chat")publicFlux<UserMessage>userChat(Flux<UserMessage>messages){returnmessages.doOnNext(message->System.out.println("Received: "+message.getContent())).map(message->newUserMessage("Server: "+message.getContent())).delayElements(Duration.ofSeconds(1));}/** * 批量操作 */@PostMapping("/batch")publicFlux<User>batchCreate(@RequestBodyFlux<User>users){returnuserService.saveAll(users);}}

7. 自定义响应式 Repository

Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!

importcom.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;importorg.springframework.data.r2dbc.repository.R2dbcRepository;importorg.springframework.data.repository.reactive.ReactiveCrudRepository;importorg.springframework.r2dbc.core.DatabaseClient;importorg.springframework.stereotype.Repository;importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;@RepositorypublicinterfaceCustomReactiveRepository{/** * 使用 MyBatis-Plus 的 Lambda 查询 */Flux<User>findUsersByCondition(LambdaQueryWrapper<User>wrapper);/** * 响应式分页查询 */Mono<Page<User>>findPage(Page<User>page,LambdaQueryWrapper<User>wrapper);}

8. 响应式事务配置

importorg.springframework.context.annotation.Configuration;importorg.springframework.transaction.ReactiveTransactionManager;importorg.springframework.transaction.reactive.TransactionalOperator;@ConfigurationpublicclassReactiveTransactionConfig{@BeanpublicTransactionalOperatortransactionalOperator(ReactiveTransactionManagertransactionManager){returnTransactionalOperator.create(transactionManager);}}

方案二:使用 MyBatis-Plus 响应式扩展 (第三方)

有一些第三方项目正在尝试为 MyBatis-Plus 添加响应式支持:
Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!

1. 添加依赖

<!-- 第三方响应式扩展 --><dependency><groupId>com.github.yulichang</groupId><artifactId>mybatis-plus-join</artifactId><version>1.4.6</version></dependency>

2. 自定义响应式 Mapper

importorg.apache.ibatis.annotations.SelectProvider;importorg.springframework.data.repository.reactive.ReactiveCrudRepository;importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;publicinterfaceReactiveBaseMapper<T>{Mono<Integer>insertReactive(Tentity);Mono<Integer>updateByIdReactive(Tentity);Mono<T>selectByIdReactive(Serializableid);Flux<T>selectListReactive(Wrapper<T>queryWrapper);Mono<Integer>deleteByIdReactive(Serializableid);}

重要提示

  1. MyBatis-Plus 官方还不完全支持响应式,上述方案是结合 R2DBC 和 MyBatis-Plus 的工具类
  2. 真正的响应式编程需要使用 R2DBC 或 MongoDB Reactive
  3. 如果需要复杂 SQL 查询,可以使用 DatabaseClient 或 R2DBC Entity Callbacks
  4. 生产环境建议使用成熟的响应式数据库驱动

完整配置 application.yml

Spring Cloud全栈实战:手撸企业级项目,从入门到架构师!

spring:r2dbc:url:r2dbc:mysql://localhost:3306/reactive_dbusername:rootpassword:passwordpool:initial-size:5max-size:20max-idle-time:30mwebflux:base-path:/apistatic-path-pattern:/static/**codec:max-in-memory-size:10MBlogging:level:org.springframework.r2dbc:DEBUGio.r2dbc:DEBUG

这种架构结合了 MyBatis-Plus 的便利性和 R2DBC 的响应式能力,适合需要复杂查询的场景。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/15 0:39:27

DuiLib_Ultimate:Windows桌面应用开发的终极解决方案

DuiLib_Ultimate&#xff1a;Windows桌面应用开发的终极解决方案 【免费下载链接】DuiLib_Ultimate DuiLib_Ultimate 是深耕 Windows 软件UI开发的利器&#xff0c; 以轻量化、高性能、易扩展 为核心&#xff0c;专为 Windows 平台打造极致桌面应用体验而生。 项目地址: http…

作者头像 李华
网站建设 2026/6/15 5:46:31

Windows11系统文件SettingSyncHost.exe丢失 下载修复

在使用电脑系统时经常会出现丢失找不到某些文件的情况&#xff0c;由于很多常用软件都是采用 Microsoft Visual Studio 编写的&#xff0c;所以这类软件的运行需要依赖微软Visual C运行库&#xff0c;比如像 QQ、迅雷、Adobe 软件等等&#xff0c;如果没有安装VC运行库或者安装…

作者头像 李华
网站建设 2026/6/15 6:59:43

56、Linux 系统常见问题及解决方法

Linux 系统常见问题及解决方法 在 Linux 系统的使用过程中,我们会遇到各种各样的问题。以下是一些常见问题及对应的解决方法。 一、基本建议 加入或创建 Linux 用户组 :如果所在地区有 Linux 用户组,可加入其中;若没有,则可以自行创建。通过与他人交流,接触不同的问题…

作者头像 李华
网站建设 2026/6/15 6:56:10

一年输送旅客数千万次,浦东国际机场的效率秘密藏在这个智能体里

秋冬旅游旺季&#xff0c;浦东机场将再次迎来百万客流大考&#xff1a;51万平方公里内&#xff0c;日均数十万旅客穿梭&#xff0c;找登机口、找证件、找车位……任何一次“找不到”都可能演变为航班延误、旅客投诉甚至安全事件。 在便利性、舒适度与效率需求不断走高的当下&a…

作者头像 李华
网站建设 2026/6/10 14:56:28

SpeedTree树模型制作软件:从零开始掌握专业植被建模

SpeedTree树模型制作软件&#xff1a;从零开始掌握专业植被建模 【免费下载链接】SpeedTree树模型制作软件的下载与安装指南 SpeedTree是一款业界领先的植被建模软件&#xff0c;特别适用于游戏开发和影视制作。它提供了两款主要软件&#xff1a;SpeedTree Modeler UE4 Subscri…

作者头像 李华
网站建设 2026/6/15 11:35:45

项目分享|AP2:让智能体学会安全支付的开源标准

引言 当AI智能体&#xff08;Agent&#xff09;从简单的问答工具演变为能自主执行复杂任务的“数字雇员”时&#xff0c;一个核心问题随之浮现&#xff1a;它们如何安全、可靠地代表我们完成涉及真金白银的交易&#xff1f;谷歌近期开源的 Agent Payments Protocol (AP2) &…

作者头像 李华