芋道源码解读之多租户
博主和芋道源码作者及其官方开发团队无任何关联
一、概述
租户(Tenant)是系统中的一个逻辑隔离的单元,代表一个独立使用系统的组织(如企业、高校等),在多租户系统中,不同租户共享相同的应用程序和基础设施,但各自拥有独立的数据、配置、组织架构及用户等。
芋道是一个支持多租户的系统,对多租户功能的组件和框架封装的代码位于yudao-spring-boot-starter-biz-tenant模块中,对于读写数据库和Redis,消息队列中消息的生产消费以及定时任务派发,调用异步方法等都分别实现了租户隔离,实现原理都是利用线程ThreadLocal进行租户标识传递和线程内共享,处理租户业务的线程(例如WebApi的HTTP请求线程,定时任务执行线程,消息消费回调线程)开始执行时首先获取具体场景下的租户ID,存到当前线程的ThreadLocal中,后续基于该线程执行或调用的各种方法中如遇到读写数据库或Redis以及发送消息和调用异步方法的操作时,便能从ThreadLocal获取租户ID再执行进一步操作。
项目通过一个TenantContextHolder类来封装ThreadLocal进而实现不同场景下基于线程的租户隔离,为每个执行带有租户隔离逻辑代码的线程都绑定一个TENANT_ID对象来储存和共享当前场景的租户ID,同时还绑定了一个布尔类型的IGNORE用于标识当前线程即将要执行的代码是否需要处理租户。
只有深入正确理解TenantContextHolder中ThreadLocal的原理,才能真正理解多租户的实现原理
cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder
public class TenantContextHolder { /** * 当前租户编号 */ private static final ThreadLocal<Long> TENANT_ID = new TransmittableThreadLocal<>(); /** * 是否忽略租户 */ private static final ThreadLocal<Boolean> IGNORE = new TransmittableThreadLocal<>(); /** * 获得租户编号 * * @return 租户编号 */ public static Long getTenantId() { return TENANT_ID.get(); } /** * 获得租户编号。如果不存在,则抛出 NullPointerException 异常 * * @return 租户编号 */ public static Long getRequiredTenantId() { Long tenantId = getTenantId(); if (tenantId == null) { throw new NullPointerException("TenantContextHolder 不存在租户编号!可参考文档:" + DocumentEnum.TENANT.getUrl()); } return tenantId; } public static void setTenantId(Long tenantId) { TENANT_ID.set(tenantId); } public static void setIgnore(Boolean ignore) { IGNORE.set(ignore); } /** * 当前是否忽略租户 * * @return 是否忽略 */ public static boolean isIgnore() { return Boolean.TRUE.equals(IGNORE.get()); } public static void clear() { TENANT_ID.remove(); IGNORE.remove(); }}多租户还需要考虑忽略租户和指定租户的情况:
调用某些方法时,租户应当被忽略,例如超级管理员获取系统全部数据、项目启动后获取全部数据去创建全局静态缓存等,因此该项目也提供了租户忽略的实现方案,对于某段需要忽略租户执行的代码,提供了忽略租户去执行某个代码块的公共方法,对于整个方法需要忽略租户的情况,则通过AOP处理自定义注解的方式,对某个方法标记忽略租户,该方法内执行的代码便不再对多租户的情况进行处理。
调用某些方法时,应当以指定的某个租户ID去执行,而不是采用当前登录用户的租户ID,例如超管新建了一个租户,并为新租户一并创建管理员用户以及基本的角色,菜单和权限等数据时,这些数据的租户ID应该是新建的租户的ID,针对这种情况,项目也实现了一个按照指定租户ID执行某个代码块的公共方法。
二、数据库的租户隔离
2.1 数据库的租户隔离实现方案
数据库中租户的隔离方案有三种:
- 库隔离:每个租户拥有独立的数据库实例
- 表隔离:每个租户建属于自己的一套数据表
- 记录隔离:表中使用租户标识字段(
tenant_id)区分不同租户的数据
三种方案对比:
| 库隔离 | 表隔离 | 记录隔离 | |
|---|---|---|---|
| 隔离性 | 最高 | 较高 | 最低 |
| 性能 | 高 | 中 | 低,受租户数量影响 |
| 备份还原难度 | 简单 | 中 | 困难 |
| 硬件成本 | 高 | 中 | 低 |
| 维护成本 | 高,开租户就要建库 | 中 | 低 |
芋道采用了记录隔离的方式来实现数据库的租户隔离。
2.2 实现原理和源码解读
租户本质上也是一种特殊的数据权限,不同于数据权限的是对于涉及租户的表的增、删、改、查四种操作,都需要对SQL语句进行处理,实现原理是执行SQL前进行拦截,并获取要执行的SQL,然后解析SQL语句中的表,遇到需要租户隔离的表就要进行处理,对于查询、删除和更新的场景,就在现有的SQL条件中追加一个tenant_id = ?的条件,获取当前操作的用户或要执行的某种任务所属的租户ID赋值给tenant_id,对于添加操作,则是将tenant_id字段加入到INSERT列表中并赋值。
芋道采用MyBatis-Plus的插件拦截机制实现数据库的记录级别的租户隔离,这和数据权限的实现原理是完全一样的,实现租户隔离的插件是TenantLineInnerInterceptor,该类也像数据权限插件一样继承了用于解析和追加条件的BaseMultiTableInnerInterceptor类来实现表的解析和租户ID过滤条件的追加,实现原理具体见:TenantLineInnerInterceptor源码解读
TenantLineInnerInterceptor需要一个TenantLineHandler类型的租户处理器,TenantLineHandler是一个接口,用于给TenantLineInnerInterceptor判断某个表是否需要租户隔离,以及获取租户ID值表达式、租户字段名,我们需要实现这个接口并在回调方法中将这些信息封装好后返回。
com.baomidou.mybatisplus.extension.plugins.handler.TenantLineHandler
public interface TenantLineHandler { /** * 获取租户 ID 值表达式,只支持单个 ID 值 * <p> * * @return 租户 ID 值表达式 */ Expression getTenantId(); /** * 获取租户字段名 * <p> * 默认字段名叫: tenant_id * * @return 租户字段名 */ default String getTenantIdColumn() { return "tenant_id"; } /** * 根据表名判断是否忽略拼接多租户条件 * <p> * 默认都要进行解析并拼接多租户条件 * * @param tableName 表名 * @return 是否忽略, true:表示忽略,false:需要解析并拼接多租户条件 */ default boolean ignoreTable(String tableName) { return false; } /** * 忽略插入租户字段逻辑 * * @param columns 插入字段 * @param tenantIdColumn 租户 ID 字段 * @return */ default boolean ignoreInsert(List<Column> columns, String tenantIdColumn) { return columns.stream().map(Column::getColumnName).anyMatch(i -> i.equalsIgnoreCase(tenantIdColumn)); }}TenantDatabaseInterceptor就是芋道项目的租户处理器实现类,创建时构造方法内会读取项目配置文件,将所有需要忽略租户的表保存起来,用于执行ignoreTable()方法时判断当前表是否需要租户隔离。通过getTenantId()返回当前执行mapper方法和数据库交互的线程所绑定的租户ID,直接从TenantContextHolder获取即可。租户标识的字段名统一都叫”tenant_id”,getTenantIdColumn()直接从接口的默认方法中继承不需重写。
cn.iocoder.yudao.framework.tenant.core.db.TenantDatabaseInterceptor
public class TenantDatabaseInterceptor implements TenantLineHandler { private final Set<String> ignoreTables = new HashSet<>(); public TenantDatabaseInterceptor(TenantProperties properties) { // 不同 DB 下,大小写的习惯不同,所以需要都添加进去 properties.getIgnoreTables().forEach(table -> { ignoreTables.add(table.toLowerCase()); ignoreTables.add(table.toUpperCase()); }); // 在 OracleKeyGenerator 中,生成主键时,会查询这个表,查询这个表后,会自动拼接 TENANT_ID 导致报错 ignoreTables.add("DUAL"); } @Override public Expression getTenantId() { return new LongValue(TenantContextHolder.getRequiredTenantId()); } @Override public boolean ignoreTable(String tableName) { return TenantContextHolder.isIgnore() // 情况一,全局忽略多租户 || CollUtil.contains(ignoreTables, SqlParserUtils.removeWrapperSymbol(tableName)); // 情况二,忽略多租户的表 }}最后,将TenantDatabaseInterceptor包装进TenantLineInnerInterceptor,注入MyBatis-Plus插件队列中,根据MyBatis-Plus插件机制在执行SQL前调用
cn.iocoder.yudao.framework.tenant.config.YudaoTenantAutoConfiguration
@AutoConfiguration@ConditionalOnProperty(prefix = "yudao.tenant", value = "enable", matchIfMissing = true) // 允许使用 yudao.tenant.enable=false 禁用多租户@EnableConfigurationProperties(TenantProperties.class)public class YudaoTenantAutoConfiguration { ...... @Bean public TenantLineInnerInterceptor tenantLineInnerInterceptor(TenantProperties properties, MybatisPlusInterceptor interceptor) { TenantLineInnerInterceptor inner = new TenantLineInnerInterceptor(new TenantDatabaseInterceptor(properties)); // 添加到 interceptor 中 // 需要加在首个,主要是为了在分页插件前面。这个是 MyBatis Plus 的规定 MyBatisUtils.addInterceptor(interceptor, inner, 0); return inner; } ......}三、Redis的租户隔离
Redis和MySQL这样的关系型数据库不同,Redis采用KV键值对存储,因此芋道采用的办法是如果当前Redis读写需要处理租户隔离,则将KEY字符串的最后追加一个冒号和租户ID进行标识,需要注意的是,项目仅仅针对SpringCache方式操作Redis的情况进行了处理,如要使用RedisTemplate,需要自行实现KEY追加租户ID的逻辑。
Redis读写的租户隔离实现还是非常简单的,具体原理和源码解读如下:
1.实现一个TenantRedisCacheManager类继承TimeoutRedisCacheManager来拓展原有类的租户功能,操作Redis时,如需要处理租户,则在键名后面追加租户ID,再将数据保存到Redis或从Redis读出:
cn.iocoder.yudao.framework.tenant.core.redis.TenantRedisCacheManager
public class TenantRedisCacheManager extends TimeoutRedisCacheManager { private final Set<String> ignoreCaches; public TenantRedisCacheManager(RedisCacheWriter cacheWriter, RedisCacheConfiguration defaultCacheConfiguration, Set<String> ignoreCaches) { super(cacheWriter, defaultCacheConfiguration); this.ignoreCaches = ignoreCaches; } @Override public Cache getCache(String name) { // 如果开启多租户,则 name 拼接租户后缀 if (!TenantContextHolder.isIgnore() && TenantContextHolder.getTenantId() != null && !CollUtil.contains(ignoreCaches, name)) { name = name + ":" + TenantContextHolder.getTenantId(); } // 继续基于父方法 return super.getCache(name); }}2.将TenantRedisCacheManager注入Spring容器,并添加注解@Primary,只要引用了租户模块,TenantRedisCacheManager就是主Bean,代替TimeoutRedisCacheManager类提供带租户隔离功能的Redis客户端:
cn.iocoder.yudao.framework.tenant.config.YudaoTenantAutoConfiguration
@AutoConfiguration@ConditionalOnProperty(prefix = "yudao.tenant", value = "enable", matchIfMissing = true) // 允许使用 yudao.tenant.enable=false 禁用多租户@EnableConfigurationProperties(TenantProperties.class)public class YudaoTenantAutoConfiguration { ...... @Bean @Primary // 引入租户时,tenantRedisCacheManager 为主 Bean public RedisCacheManager tenantRedisCacheManager(RedisTemplate<String, Object> redisTemplate, RedisCacheConfiguration redisCacheConfiguration, YudaoCacheProperties yudaoCacheProperties, TenantProperties tenantProperties) { // 创建 RedisCacheWriter 对象 RedisConnectionFactory connectionFactory = Objects.requireNonNull(redisTemplate.getConnectionFactory()); RedisCacheWriter cacheWriter = RedisCacheWriter.nonLockingRedisCacheWriter(connectionFactory, BatchStrategies.scan(yudaoCacheProperties.getRedisScanBatchSize())); // 创建 TenantRedisCacheManager 对象 return new TenantRedisCacheManager(cacheWriter, redisCacheConfiguration, tenantProperties.getIgnoreCaches()); } ......}⚠️如果不加@Primary注解,模块yudao-spring-boot-starter-redis中定义的不带租户隔离功能的TimeoutRedisCacheManager就会生效:
cn.iocoder.yudao.framework.redis.core.TimeoutRedisCacheManager
@Beanpublic RedisCacheManager redisCacheManager(RedisTemplate<String, Object> redisTemplate, RedisCacheConfiguration redisCacheConfiguration, YudaoCacheProperties yudaoCacheProperties) { // 创建 RedisCacheWriter 对象 RedisConnectionFactory connectionFactory = Objects.requireNonNull(redisTemplate.getConnectionFactory()); RedisCacheWriter cacheWriter = RedisCacheWriter.nonLockingRedisCacheWriter(connectionFactory, BatchStrategies.scan(yudaoCacheProperties.getRedisScanBatchSize())); // 创建 TenantRedisCacheManager 对象 return new TimeoutRedisCacheManager(cacheWriter, redisCacheConfiguration);}四、Web请求的租户隔离
Web访问作为整个系统对外提供功能的入口,用户登录系统时需要选择以哪个组织(租户)的成员身份使用系统,每次调用需要租户隔离的接口,都要传入租户ID放进TenantContextHolder并存在于整个http请求线程的生命周期中供各种需要租户的场景读取使用,如果访问的URL在忽略租户列表中,则标记整个线程生命周期忽略租户,如果访问者访问的URL是必须登录才能访问的资源,还需要校验登录用户所属的租户和用户传入的租户是否一致防止越权访问。
Web请求的租户隔离的处理逻辑由TenantContextWebFilter和TenantSecurityWebFilter两个类来实现,TenantContextWebFilter用于维护租户ID在整个http请求线程,TenantSecurityWebFilter则用于鉴权,两个都是原生过滤器,直接注册到Servlet容器中。
TenantContextWebFilter优先执行,用于从请求头获得传过来的租户编号,并存放在TenantContextHolder中,请求完成后,再把租户编号从TenantContextHolder移除,这样在整个访问过程中,涉及到租户隔离的逻辑代码都能从TenantContextHolder中获取当前操作所属的租户ID:
cn.iocoder.yudao.framework.tenant.core.web.TenantContextWebFilter
public class TenantContextWebFilter extends OncePerRequestFilter { @Override protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain chain) throws ServletException, IOException { // 设置 Long tenantId = WebFrameworkUtils.getTenantId(request); if (tenantId != null) { TenantContextHolder.setTenantId(tenantId); } try { chain.doFilter(request, response); } finally { // 清理 TenantContextHolder.clear(); } }}仅仅将请求的租户ID用于业务操作是不可以的,对于请求的租户是否合规还要进一步检验,先要判断租户ID是不是用户随意传的,如果租户可以随意指定,访问一些接口时就可能发生越权,故对于登录的用户,还需要TenantSecurityWebFilter校验当前登录的用户是否属于传入的租户,如果不一致直接报错。
接下来还要判断访问的URL是否必须进行租户隔离,如果URL不在忽略租户的URL配置中当前请求却没传租户ID就直接报错,如果传递了租户则继续校验租户可用状态是否正常,如正常就放行请求,如URL在忽略列表则直接将整个http线程标记为忽略租户,然后放行:
cn.iocoder.yudao.framework.tenant.core.security.TenantSecurityWebFilter
public class TenantSecurityWebFilter extends ApiRequestFilter { private final TenantProperties tenantProperties; private final AntPathMatcher pathMatcher; private final GlobalExceptionHandler globalExceptionHandler; private final TenantFrameworkService tenantFrameworkService; public TenantSecurityWebFilter(TenantProperties tenantProperties, WebProperties webProperties, GlobalExceptionHandler globalExceptionHandler, TenantFrameworkService tenantFrameworkService) { super(webProperties); this.tenantProperties = tenantProperties; this.pathMatcher = new AntPathMatcher(); this.globalExceptionHandler = globalExceptionHandler; this.tenantFrameworkService = tenantFrameworkService; } @Override protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain chain) throws ServletException, IOException { Long tenantId = TenantContextHolder.getTenantId(); // 1. 登陆的用户,校验是否有权限访问该租户,避免越权问题。 LoginUser user = SecurityFrameworkUtils.getLoginUser(); if (user != null) { // 如果获取不到租户编号,则尝试使用登陆用户的租户编号 if (tenantId == null) { tenantId = user.getTenantId(); TenantContextHolder.setTenantId(tenantId); // 如果传递了租户编号,则进行比对租户编号,避免越权问题 } else if (!Objects.equals(user.getTenantId(), TenantContextHolder.getTenantId())) { log.error("[doFilterInternal][租户({}) User({}/{}) 越权访问租户({}) URL({}/{})]", user.getTenantId(), user.getId(), user.getUserType(), TenantContextHolder.getTenantId(), request.getRequestURI(), request.getMethod()); ServletUtils.writeJSON(response, CommonResult.error(GlobalErrorCodeConstants.FORBIDDEN.getCode(), "您无权访问该租户的数据")); return; } } // 如果非允许忽略租户的 URL,则校验租户是否合法 if (!isIgnoreUrl(request)) { // 2. 如果请求未带租户的编号,不允许访问。 if (tenantId == null) { log.error("[doFilterInternal][URL({}/{}) 未传递租户编号]", request.getRequestURI(), request.getMethod()); ServletUtils.writeJSON(response, CommonResult.error(GlobalErrorCodeConstants.BAD_REQUEST.getCode(), "请求的租户标识未传递,请进行排查")); return; } // 3. 校验租户是合法,例如说被禁用、到期 try { tenantFrameworkService.validTenant(tenantId); } catch (Throwable ex) { CommonResult<?> result = globalExceptionHandler.allExceptionHandler(request, ex); ServletUtils.writeJSON(response, result); return; } } else { // 如果是允许忽略租户的 URL,若未传递租户编号,则默认忽略租户编号,避免报错 if (tenantId == null) { TenantContextHolder.setIgnore(true); } } // 继续过滤 chain.doFilter(request, response); } private boolean isIgnoreUrl(HttpServletRequest request) { // 快速匹配,保证性能 if (CollUtil.contains(tenantProperties.getIgnoreUrls(), request.getRequestURI())) { return true; } // 逐个 Ant 路径匹配 for (String url : tenantProperties.getIgnoreUrls()) { if (pathMatcher.match(url, request.getRequestURI())) { return true; } } return false; }}注册两个过滤器,并指定优先级:
cn.iocoder.yudao.framework.tenant.config.YudaoTenantAutoConfiguration
@AutoConfiguration@ConditionalOnProperty(prefix = "yudao.tenant", value = "enable", matchIfMissing = true) // 允许使用 yudao.tenant.enable=false 禁用多租户@EnableConfigurationProperties(TenantProperties.class)public class YudaoTenantAutoConfiguration { // ========== WEB ========== @Bean public FilterRegistrationBean<TenantContextWebFilter> tenantContextWebFilter() { FilterRegistrationBean<TenantContextWebFilter> registrationBean = new FilterRegistrationBean<>(); registrationBean.setFilter(new TenantContextWebFilter()); registrationBean.setOrder(WebFilterOrderEnum.TENANT_CONTEXT_FILTER);// -104 return registrationBean; } // ========== Security ========== @Bean public FilterRegistrationBean<TenantSecurityWebFilter> tenantSecurityWebFilter(TenantProperties tenantProperties, WebProperties webProperties, GlobalExceptionHandler globalExceptionHandler, TenantFrameworkService tenantFrameworkService) { FilterRegistrationBean<TenantSecurityWebFilter> registrationBean = new FilterRegistrationBean<>(); registrationBean.setFilter(new TenantSecurityWebFilter(tenantProperties, webProperties, globalExceptionHandler, tenantFrameworkService)); registrationBean.setOrder(WebFilterOrderEnum.TENANT_SECURITY_FILTER);// -99 return registrationBean; }}五、消息队列的租户隔离
对于消息的租户隔离,芋道是分为发送和消费两种场景分别处理的
发送消息:如当前线程绑定了租户,取出当前线程绑定的租户ID,在发送消息时将租户ID设置在“消息头”,和消息内容一并发送到消息中间件。
消费消息:从消息中间件推送过来的消息中,如消息头带着租户ID则先行取出消息头中的租户ID,与消息消费回调线程绑定在一起,再执行消息消费的回调方法。
芋道对采用多种常见消息中间件产品发送和消费消息的场景都支持了租户隔离。
5.1 Redis PubSub/Stream
Redis除了缓存,还是一个轻量级的消息中间件产品,它的Pub/Sub机制和Stream数据结构均能实现消息中间件功能,芋道对这两种方式的消息都提供了多租户的支持。
yudao-spring-boot-starter-redis模块在整合Redis时,将RedisTemplate进行了封装,对发送消息和消费消息都拓展出了前置后置操作功能,租户模块的TenantRedisMessageInterceptor就通过实现RedisMessageInterceptor接口来实现租户隔离。发送消息时从线程中取出绑定的租户ID,添加到“消息头”,发送给Redis,消费时,因为消费消息的方法运行在单独的线程,因此从消息的头取出消息所属的租户ID直接绑定在消息消费的回调线程,供线程中用到租户的场景使用,因为消费线程通常是通过线程池复用的,消费完成后,要把租户ID从消费线程中移除。
cn.iocoder.yudao.framework.tenant.core.mq.redis.TenantRedisMessageInterceptor
public class TenantRedisMessageInterceptor implements RedisMessageInterceptor { @Override public void sendMessageBefore(AbstractRedisMessage message) { Long tenantId = TenantContextHolder.getTenantId(); if (tenantId != null) { message.addHeader(HEADER_TENANT_ID, tenantId.toString()); } } @Override public void consumeMessageBefore(AbstractRedisMessage message) { String tenantIdStr = message.getHeader(HEADER_TENANT_ID); if (StrUtil.isNotEmpty(tenantIdStr)) { TenantContextHolder.setTenantId(Long.valueOf(tenantIdStr)); } } @Override public void consumeMessageAfter(AbstractRedisMessage message) { // 注意,Consumer 是一个逻辑的入口,所以不考虑原本上下文就存在租户编号的情况 TenantContextHolder.clear(); }}5.2 RocketMQ
RocketMQ支持多租户采用的是Spring的BeanPostProcessor对注入的Bean进行修改,将RocketMQ发送和消费消息相关的两个类:DefaultRocketMQListenerContainer和RocketMQTemplate设置了可以执行后置和前置操作的“Hook”,当发送和消费消息时,同Redis一样的将租户ID和当前线程进行绑定。
cn.iocoder.yudao.framework.tenant.core.mq.rocketmq.TenantRocketMQInitializer
public class TenantRocketMQInitializer implements BeanPostProcessor { @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if (bean instanceof DefaultRocketMQListenerContainer) { DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean; initTenantConsumer(container.getConsumer()); } else if (bean instanceof RocketMQTemplate) { RocketMQTemplate template = (RocketMQTemplate) bean; initTenantProducer(template.getProducer()); } return bean; } private void initTenantProducer(DefaultMQProducer producer) { if (producer == null) { return; } DefaultMQProducerImpl producerImpl = producer.getDefaultMQProducerImpl(); if (producerImpl == null) { return; } producerImpl.registerSendMessageHook(new TenantRocketMQSendMessageHook()); } private void initTenantConsumer(DefaultMQPushConsumer consumer) { if (consumer == null) { return; } DefaultMQPushConsumerImpl consumerImpl = consumer.getDefaultMQPushConsumerImpl(); if (consumerImpl == null) { return; } consumerImpl.registerConsumeMessageHook(new TenantRocketMQConsumeMessageHook()); }}cn.iocoder.yudao.framework.tenant.core.mq.rocketmq.TenantRocketMQConsumeMessageHook
public class TenantRocketMQConsumeMessageHook implements ConsumeMessageHook { @Override public String hookName() { return getClass().getSimpleName(); } @Override public void consumeMessageBefore(ConsumeMessageContext context) { // 校验,消息必须是单条,不然设置租户可能不正确 List<MessageExt> messages = context.getMsgList(); Assert.isTrue(messages.size() == 1, "消息条数({})不正确", messages.size()); // 设置租户编号 String tenantId = messages.get(0).getUserProperty(HEADER_TENANT_ID); if (StrUtil.isNotEmpty(tenantId)) { TenantContextHolder.setTenantId(Long.parseLong(tenantId)); } } @Override public void consumeMessageAfter(ConsumeMessageContext context) { TenantContextHolder.clear(); }}cn.iocoder.yudao.framework.tenant.core.mq.rocketmq.TenantRocketMQSendMessageHook
public class TenantRocketMQSendMessageHook implements SendMessageHook { @Override public String hookName() { return getClass().getSimpleName(); } @Override public void sendMessageBefore(SendMessageContext sendMessageContext) { Long tenantId = TenantContextHolder.getTenantId(); if (tenantId == null) { return; } sendMessageContext.getMessage().putUserProperty(HEADER_TENANT_ID, tenantId.toString()); } @Override public void sendMessageAfter(SendMessageContext sendMessageContext) { }}5.3 RabbitMQ
RabbitMQ处理方法和RockerMQ类似,将消息发送的RabbitTemplate进行前置操作,设置租户ID到消息头。
cn.iocoder.yudao.framework.tenant.core.mq.rabbitmq.TenantRabbitMQInitializer
public class TenantRabbitMQInitializer implements BeanPostProcessor { @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if (bean instanceof RabbitTemplate) { RabbitTemplate rabbitTemplate = (RabbitTemplate) bean; rabbitTemplate.addBeforePublishPostProcessors(new TenantRabbitMQMessagePostProcessor()); } return bean; }}cn.iocoder.yudao.framework.tenant.core.mq.rabbitmq.TenantRabbitMQMessagePostProcessor
public class TenantRabbitMQMessagePostProcessor implements MessagePostProcessor { @Override public Message postProcessMessage(Message message) throws AmqpException { Long tenantId = TenantContextHolder.getTenantId(); if (tenantId != null) { message.getMessageProperties().getHeaders().put(HEADER_TENANT_ID, tenantId); } return message; }}5.4 Kafka
针对Kafka的消息发送适配租户,实现了一个TenantKafkaEnvironmentPostProcessor用于当Spring环境(Environment)准备好了之后,将自己实现的TenantKafkaProducerInterceptor类加入到spring.kafka.producer.properties.interceptor.classes变量中,如果变量没有值就直接赋值,如果变量中已经有了别的值就追加上,TenantKafkaProducerInterceptor将在发送消息到Kafka前,将线程中绑定的租户ID取出设置在消息头上,然后发送到Kafka。
cn.iocoder.yudao.framework.tenant.core.mq.kafka.TenantKafkaEnvironmentPostProcessor
@Slf4jpublic class TenantKafkaEnvironmentPostProcessor implements EnvironmentPostProcessor { private static final String PROPERTY_KEY_INTERCEPTOR_CLASSES = "spring.kafka.producer.properties.interceptor.classes"; @Override public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) { // 添加 TenantKafkaProducerInterceptor 拦截器 try { String value = environment.getProperty(PROPERTY_KEY_INTERCEPTOR_CLASSES); if (StrUtil.isEmpty(value)) { value = TenantKafkaProducerInterceptor.class.getName(); } else { value += "," + TenantKafkaProducerInterceptor.class.getName(); } environment.getSystemProperties().put(PROPERTY_KEY_INTERCEPTOR_CLASSES, value); } catch (NoClassDefFoundError ignore) { // 如果触发 NoClassDefFoundError 异常,说明 TenantKafkaProducerInterceptor 类不存在,即没引入 kafka-spring 依赖 } }}cn.iocoder.yudao.framework.tenant.core.mq.kafka.TenantKafkaProducerInterceptor
public class TenantKafkaProducerInterceptor implements ProducerInterceptor<Object, Object> { @Override public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> record) { Long tenantId = TenantContextHolder.getTenantId(); if (tenantId != null) { Headers headers = (Headers) ReflectUtil.getFieldValue(record, "headers"); // private 属性,没有 get 方法,智能反射 headers.add(HEADER_TENANT_ID, tenantId.toString().getBytes()); } return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { }}5.5 InvocableHandlerMethod
Spring整合Kafka和RabbitMQ的过程中,很难通过一些常规方式(设置拦截器等)在消息消费回调方法即将执行前获取消息所属租户ID绑定在MQ消费回调线程上,因此作者把Spring整合Kafka和RabbitMQ的源码中的InvocableHandlerMethod类进行了重写并放置在yudao-spring-boot-starter-biz-tenant/src/main/java下代替原有的类来实现这一功能,InvocableHandlerMethod类的invoke()会对收到的消息进一步执行doInvoke()来调用消费方法,所以对invoke()方法实现进行了修改,如果发来的消息中存在租户ID,则将源码修改为return TenantUtils.execute(tenantId, () -> doInvoke(args));将租户ID绑定在线程上。
作者采用这种办法可能是一种无奈之举,权宜之计,因此如果要自行更改spring版本,须先查看新版本的本类实现和当前版本是否有出入,不能随意更改版本。
org.springframework.messaging.handler.invocation.InvocableHandlerMethod
⚠️ 重写的类和spring-messaging-*.jar包中的原类肯定是重复的,JVM加载类时总是先加载到的类优先。打包时,spring-boot会将主启动模块(yudao-server)代码编译为class文件放入
BOOT-INF/classes/,runtime依赖的jar包放入BOOT-INF/lib/,启动时,spring-boot自定义的类加载器LaunchedURLClassLoader会先加载BOOT-INF/classes/下的类,后加载BOOT-INF/lib/下jar包中的类,重写的InvocableHandlerMethod类所在的yudao-spring-boot-starter-biz-tenant也是以jar包形式引入主启动模块,因此会和spring-messaging-*.jar一样编译打包为jar包打入BOOT-INF/lib/,jar包中的类是按jar包文件顺序加载,但无法保证打包时jar文件排列顺序,因此打包后如果重写的类不生效,我认为可以把它迁移到主启动模块中,编译打包到BOOT-INF/classes/目录,以得到优先加载。使用IDEA启动项目时,会按照jar包出现在-cp参数列表中的顺序加载,这个顺序也无法保证,如不生效可同样办法处理。
/* * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.springframework.messaging.handler.invocation;import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;import org.springframework.core.DefaultParameterNameDiscoverer;import org.springframework.core.MethodParameter;import org.springframework.core.ParameterNameDiscoverer;import org.springframework.core.ResolvableType;import org.springframework.lang.Nullable;import org.springframework.messaging.Message;import org.springframework.messaging.handler.HandlerMethod;import org.springframework.util.ObjectUtils;import java.lang.reflect.InvocationTargetException;import java.lang.reflect.Method;import java.lang.reflect.Type;import java.util.Arrays;import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;/** * Extension of {@link HandlerMethod} that invokes the underlying method with * argument values resolved from the current HTTP request through a list of * {@link HandlerMethodArgumentResolver}. * * 针对 rabbitmq-spring 和 kafka-spring,不存在合适的拓展点,可以实现 Consumer 消费前,读取 Header 中的 tenant-id 设置到 {@link TenantContextHolder} 中 * TODO 芋艿:持续跟进,看看有没新的拓展点 * * @author Rossen Stoyanchev * @author Juergen Hoeller * @since 4.0 */public class InvocableHandlerMethod extends HandlerMethod { ......... @Nullable public Object invoke(Message<?> message, Object... providedArgs) throws Exception { Object[] args = getMethodArgumentValues(message, providedArgs); if (logger.isTraceEnabled()) { logger.trace("Arguments: " + Arrays.toString(args)); } // 注意:如下是本类的改动点!!! // 情况一:无租户编号的情况 Long tenantId= parseTenantId(message); if (tenantId == null) { return doInvoke(args); } // 情况二:有租户的情况下 return TenantUtils.execute(tenantId, () -> doInvoke(args)); } private Long parseTenantId(Message<?> message) { Object tenantId = message.getHeaders().get(HEADER_TENANT_ID); if (tenantId == null) { return null; } if (tenantId instanceof Long) { return (Long) tenantId; } if (tenantId instanceof Number) { return ((Number) tenantId).longValue(); } if (tenantId instanceof String) { return Long.parseLong((String) tenantId); } if (tenantId instanceof byte[]) { return Long.parseLong(new String((byte[]) tenantId)); } throw new IllegalArgumentException("未知的数据类型:" + tenantId); } .........}5.6 集成
对于Redis/RocketMQ/RabbitMQ的实现,直接注入容器即可,对于Kafka的EnvironmentPostProcessor实现,采用配置在spring.factories的方式
cn.iocoder.yudao.framework.tenant.config.YudaoTenantAutoConfiguration
@AutoConfiguration@ConditionalOnProperty(prefix = "yudao.tenant", value = "enable", matchIfMissing = true) // 允许使用 yudao.tenant.enable=false 禁用多租户@EnableConfigurationProperties(TenantProperties.class)public class YudaoTenantAutoConfiguration { ......... @Bean public TenantRedisMessageInterceptor tenantRedisMessageInterceptor() { return new TenantRedisMessageInterceptor(); } @Bean @ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate") public TenantRabbitMQInitializer tenantRabbitMQInitializer() { return new TenantRabbitMQInitializer(); } @Bean @ConditionalOnClass(name = "org.apache.rocketmq.spring.core.RocketMQTemplate") public TenantRocketMQInitializer tenantRocketMQInitializer() { return new TenantRocketMQInitializer(); } .........}yudao-spring-boot-starter-biz-tenant/src/main/resources/META-INF/spring.factories
org.springframework.boot.env.EnvironmentPostProcessor=\ cn.iocoder.yudao.framework.tenant.core.mq.kafka.TenantKafkaEnvironmentPostProcessor六、定时任务Quartz的租户隔离
芋道中集成了定时任务功能,需要注意的是它的定时任务功能是给超级管理员用的,定义和下发任务功能不对使用系统的租户开放。如果任务的执行需要租户隔离,那么执行定时任务的过程中要以租户为单位进行数据隔离。
定时任务的租户隔离实现原理是自定义一个方法级别的注解@TenantJob,并为加了注解的job方法实现一个环绕通知的TenantJobAspect,当一个任务即将执行时首先查出系统中所有租户的ID到tenantIds,让tenantIds列表通过parallelStream().forEach()实现每次遍历都是不同的线程。每次遍历又去调用TenantUtils.execute()(具体见8.2 TenantUtils)并传入当次遍历到的租户ID和执行joinPoint.proceed();的匿名类,把每次遍历到的租户ID绑定到这次遍历使用的线程上,并用该线程执行加了@TenantJob注解的目标方法,这样当一个任务下发执行时,系统中的每个租户都派发一个绑定了自己租户ID的线程执行一次目标方法,实现一个任务被每个租户在自己的数据范围内各自执行一次,数据相互隔离。
cn.iocoder.yudao.framework.tenant.core.job.TenantJob
@Target({ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)public @interface TenantJob {}cn.iocoder.yudao.framework.tenant.core.job.TenantJobAspect
@Aspect@RequiredArgsConstructor@Slf4jpublic class TenantJobAspect { private final TenantFrameworkService tenantFrameworkService; @Around("@annotation(tenantJob)") public String around(ProceedingJoinPoint joinPoint, TenantJob tenantJob) { // 获得租户列表 List<Long> tenantIds = tenantFrameworkService.getTenantIds(); if (CollUtil.isEmpty(tenantIds)) { return null; } // 逐个租户,执行 Job Map<Long, String> results = new ConcurrentHashMap<>(); tenantIds.parallelStream().forEach(tenantId -> { // TODO 芋艿:先通过 parallel 实现并行;1)多个租户,是一条执行日志;2)异常的情况 TenantUtils.execute(tenantId, () -> { try { Object result = joinPoint.proceed(); results.put(tenantId, StrUtil.toStringOrEmpty(result)); } catch (Throwable e) { log.error("[execute][租户({}) 执行 Job 发生异常", tenantId, e); results.put(tenantId, ExceptionUtil.getRootCauseMessage(e)); } }); }); return JsonUtils.toJsonString(results); }}cn.iocoder.yudao.framework.tenant.config.YudaoTenantAutoConfiguration
@AutoConfiguration@ConditionalOnProperty(prefix = "yudao.tenant", value = "enable", matchIfMissing = true) // 允许使用 yudao.tenant.enable=false 禁用多租户@EnableConfigurationProperties(TenantProperties.class)public class YudaoTenantAutoConfiguration { ...... @Bean public TenantJobAspect tenantJobAspect(TenantFrameworkService tenantFrameworkService) { return new TenantJobAspect(tenantFrameworkService); } ......}七、Spring @Async方法的租户隔离
之前提到,芋道的多租户实现原理都是租户ID绑定在当前线程,但是如果一个绑定了租户ID的线程在一些情况下调用了加了@Async注解的方法异步执行一些逻辑时,如果异步执行的逻辑需要租户隔离,就会导致出错,因为都不在一个线程上了,租户ID无法传递和共享,因此如有异步调用的情况,应当对多租户情况做特殊处理。
对异步方法执行的配置定义在yudao-spring-boot-starter-job模块的YudaoAsyncAutoConfiguration类下,通过设置executor.setTaskDecorator(TtlRunnable::get);实现芋道项目中所有的异步方法执行时,被派发执行异步方法的线程会继承派发它的线程ThreadLocal中的元素,因此一个绑定了租户ID的线程调用了异步方法时,异步方法会直接继承租户ID保存在自己的线程上下文,同样可以执行一些需要租户隔离的业务代码,这是依靠阿里巴巴的TransmittableThreadLocal组件实现的,具体实现原理以后再进一步研究。
cn.iocoder.yudao.framework.quartz.config.YudaoAsyncAutoConfiguration
@AutoConfiguration@EnableAsyncpublic class YudaoAsyncAutoConfiguration { @Bean public BeanPostProcessor threadPoolTaskExecutorBeanPostProcessor() { return new BeanPostProcessor() { @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { if (!(bean instanceof ThreadPoolTaskExecutor)) { return bean; } // 修改提交的任务,接入 TransmittableThreadLocal ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) bean; executor.setTaskDecorator(TtlRunnable::get); return executor; } }; }}八、忽略和指定租户
针对系统中一些需要忽略租户或指定租户的场景进行特殊处理
⚠️需要注意的是有时忽略租户对Redis和MQ而言是没有意义的,因为它们不像数据库那样通过表组织存储数据,数据库可以通过在过滤条件中不追加租户ID字段的方式忽略租户,但是Redis和MQ不可以。
8.1 @TenantIgnore
采用自定义注解的形式对一个方法标记忽略租户,定义一个注解并通过AOP类TenantIgnoreAspect为加了注解的方法进行环绕增强,执行前先获取当前线程的忽略标记到oldIgnore进行备份,然后将当前线程标记为忽略租户,目标方法执行完成后,再将执行前的忽略标记恢复到线程上下文,这样是一个比较严谨的设计,因为忽略标记是整个线程的生命周期内共享的,如果方法嵌套调用,且都加了这个注解,仅是简单的将当前方法先设置忽略后取消忽略可能导致外层的方法标记的忽略标识被覆盖。
cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnore
/** * 忽略租户,标记指定方法不进行租户的自动过滤 * * 注意,只有 DB 的场景会过滤,其它场景暂时不过滤: * 1、Redis 场景:因为是基于 Key 实现多租户的能力,所以忽略没有意义,不像 DB 是一个 column 实现的 * 2、MQ 场景:有点难以抉择,目前可以通过 Consumer 手动在消费的方法上,添加 @TenantIgnore 进行忽略 * * @author 芋道源码 */@Target({ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)@Inheritedpublic @interface TenantIgnore {}cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnoreAspect
/** * 忽略多租户的 Aspect,基于 {@link TenantIgnore} 注解实现,用于一些全局的逻辑。 * 例如说,一个定时任务,读取所有数据,进行处理。 * 又例如说,读取所有数据,进行缓存。 * * 整体逻辑的实现,和 {@link TenantUtils#executeIgnore(Runnable)} 需要保持一致 * * @author 芋道源码 */@Aspect@Slf4jpublic class TenantIgnoreAspect { @Around("@annotation(tenantIgnore)") public Object around(ProceedingJoinPoint joinPoint, TenantIgnore tenantIgnore) throws Throwable { Boolean oldIgnore = TenantContextHolder.isIgnore(); try { TenantContextHolder.setIgnore(true); // 执行逻辑 return joinPoint.proceed(); } finally { TenantContextHolder.setIgnore(oldIgnore); } }}cn.iocoder.yudao.framework.tenant.config.YudaoTenantAutoConfiguration
@AutoConfiguration@ConditionalOnProperty(prefix = "yudao.tenant", value = "enable", matchIfMissing = true) // 允许使用 yudao.tenant.enable=false 禁用多租户@EnableConfigurationProperties(TenantProperties.class)public class YudaoTenantAutoConfiguration { ......... @Bean public TenantIgnoreAspect tenantIgnoreAspect() { return new TenantIgnoreAspect(); } .........}8.2 TenantUtils
TenantUtils是一个工具类,可以指定租户或忽略租户执行某段代码,原理和TenantIgnoreAspect类似
cn.iocoder.yudao.framework.tenant.core.util.TenantUtils
public class TenantUtils { /** * 使用指定租户,执行对应的逻辑 * * 注意,如果当前是忽略租户的情况下,会被强制设置成不忽略租户 * 当然,执行完成后,还是会恢复回去 * * @param tenantId 租户编号 * @param runnable 逻辑 */ public static void execute(Long tenantId, Runnable runnable) { Long oldTenantId = TenantContextHolder.getTenantId(); Boolean oldIgnore = TenantContextHolder.isIgnore(); try { TenantContextHolder.setTenantId(tenantId); TenantContextHolder.setIgnore(false); // 执行逻辑 runnable.run(); } finally { TenantContextHolder.setTenantId(oldTenantId); TenantContextHolder.setIgnore(oldIgnore); } } /** * 使用指定租户,执行对应的逻辑 * * 注意,如果当前是忽略租户的情况下,会被强制设置成不忽略租户 * 当然,执行完成后,还是会恢复回去 * * @param tenantId 租户编号 * @param callable 逻辑 */ public static <V> V execute(Long tenantId, Callable<V> callable) { Long oldTenantId = TenantContextHolder.getTenantId(); Boolean oldIgnore = TenantContextHolder.isIgnore(); try { TenantContextHolder.setTenantId(tenantId); TenantContextHolder.setIgnore(false); // 执行逻辑 return callable.call(); } catch (Exception e) { throw new RuntimeException(e); } finally { TenantContextHolder.setTenantId(oldTenantId); TenantContextHolder.setIgnore(oldIgnore); } } /** * 忽略租户,执行对应的逻辑 * * @param runnable 逻辑 */ public static void executeIgnore(Runnable runnable) { Boolean oldIgnore = TenantContextHolder.isIgnore(); try { TenantContextHolder.setIgnore(true); // 执行逻辑 runnable.run(); } finally { TenantContextHolder.setIgnore(oldIgnore); } } .........}结束语
设计一个支持多租户的系统,需要考虑的点很多,首先要考虑访问时对用户连同租户一并进行鉴权,防止越权。其次对于操作数据库、Redis、消息队列等涉及存取信息的操作必然要考虑数据隔离,对于定时任务也要确保在自己租户范围内执行,还要考虑到如执行异步方法要透传租户ID保证租户功能在子线程不失效,在一些场景下还需要忽略和指定租户,对于不同场景下的租户隔离,芋道考虑的还是比较全面,通过阅读租户模块的实现源码,了解到了设计一个多租户系统的大致思路和实现原理。








