多租户数据隔离方案实践( 三 )


@Bean(name = "exportDataExecutorPool")public Executor exportDataExecutorPool() { ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setCorePoolSize(CPU_NUM); threadPoolTaskExecutor.setMaxPoolSize(CPU_NUM * 2); threadPoolTaskExecutor.setKeepAliveSeconds(60); threadPoolTaskExecutor.setQueueCapacity(100); threadPoolTaskExecutor.setThreadNamePrefix("ExportData Thread-"); threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); threadPoolTaskExecutor.initialize(); return TtlExecutors.getTtlExecutor(threadPoolTaskExecutor);}这样就可以确保线程池的线程随时可以都取到正确的companyId了 。
至此,是不是就完成了改造了呢?
还没有 。
为什么呢?
如果是同一个JVM确实是没问题了,如果不同的JVM呢?
一般较为复杂的系统都会按业务划分成不同的模块,同一个模块也可能部署多个不同的实例,这些不同的模块或不同的实例间的通信一般是通过远程调用或者消息队列进行数据传递 。那么问题就来了,如何在不同的模块或实例间传递这个companyId呢?
目前我们系统的远程调用用的是RestTemplate,消息队列用的Kafka 。那就要考虑怎么把companyId统一传递出去了 。
远程调用 RestTemplate 的改造

  1. 对于RestTemplate,发送前我们可以通过ClientHttpRequestInterceptor拦截器,统一把companyId放进header 。
@Slf4jpublic class BearerTokenHeaderInterceptor implements ClientHttpRequestInterceptor { public BearerTokenHeaderInterceptor() { } @Override public ClientHttpResponse intercept(HttpRequest request, byte[] body,ClientHttpRequestExecution execution) throws IOException {//通过拦截器统一把companyId放到headerString companyId = CompanyContext.getCompanyId();log.info("companyId={}", companyId);if (!StringUtils.isEmpty(companyId)) {request.getHeaders().set("companyId", companyId);}return execution.execute(request, body); }}注意创建 RestTemplate 时需要把这个拦截器加进去:
@Bean@LoadBalancedpublic RestTemplate restTemplate(RestTemplateBuilder restTemplateBuilder) {final RestTemplate restTemplate = restTemplateBuilder.setConnectTimeout(Duration.ofMillis(getConnectTimeout())).setReadTimeout(Duration.ofMillis(getReadTimeout())).requestFactory(()->httpRequestFactory()).build();List<ClientHttpRequestInterceptor> interceptors = restTemplate.getInterceptors();if (interceptors == null) {interceptors = Collections.emptyList();}interceptors = new ArrayList<>(interceptors);interceptors.removeIf(BearerTokenHeaderInterceptor.class::isInstance);interceptors.add(new BearerTokenHeaderInterceptor());restTemplate.setInterceptors(interceptors);return restTemplate;}
  1. 接收的地方也通过拦截器从header取得companyId并设置到本地变量:
@Slf4jpublic class TokenParseAndLoginFilter extends OncePerRequestFilter {@Overrideprotected void doFilterInternal(HttpServletRequest request,HttpServletResponse response, FilterChain filterChain)throws ServletException, IOException {String accessToken = null;String companyId = null;try {//从header取得并设置companyId本地变量companyId = request.getHeader("companyId");new CompanyContext(companyId);filterChain.doFilter(request, response);} catch (Exception e) {log.error("request error:",e);response.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE);response.setStatus(500);response.getWriter().write(e.getMessage());response.getWriter().close();}}}消息队列 kafka 的改造
  1. 发送消息的地方,我们统一把companyId放到kafka message header:
/** * 发送消息 */public void sendMsg(String topic, Object value, Map<String, String> headers) {RecordHeaders kafkaHeaders = new RecordHeaders();headers.forEach((k,v)->{RecordHeader recordHeader = new RecordHeader(k,v.getBytes());kafkaHeaders.add(recordHeader);});RecordHeader recordHeader = new RecordHeader("companyId", CompanyContext.getCompanyId().getBytes());kafkaHeaders.add(recordHeader);//kafka默认分区ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, null, null, JsonUtil.toJson(value), kafkaHeaders);kafkaTemplate.send(producerRecord);}
  1. 消息消费的地方,我们就可以从kafka message header中拿到companyId设置线程本地变量:
/** * 获取实例-手动处理ack */@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaManualAckListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(concurrency);factory.getContainerProperties().setPollTimeout(3000);//RetryingAcknowledgingMessageListenerAdapterfactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);factory.setRetryTemplate(retryTemplate);factory.setRecoveryCallback(recoveryCallback());factory.setRecordFilterStrategy(consumerRecord -> {String companyId = getHead(consumerRecord, "company_id");// 设置companyId本地变量new CompanyContext(companyId);logger.info("Getting the company from kafka message header : {}", companyId);if(needRequestId) {String requestId = getHead(consumerRecord, KafkaHeadEnum.REQUEST_ID.getKey());new RequestIdContext(requestId);}return false;});return factory;}


推荐阅读