快捷搜索: 长连接 前端 源码 pan

Spring Cloud 负载均衡解析

1、负载均衡的自动配置类

Spring Cloud 的负载均衡的自动注入类,注意这个类是spring-cloud-commons 模块下面的,org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration,而不是spring-cloud-loadbalancer模块下面的org.springframework.cloud.loadbalancer.config.LoadBalancerAutoConfiguration,这两个同名

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {
          
   
    @LoadBalanced
	@Autowired(required = false)
	private List<RestTemplate> restTemplates = Collections.emptyList();

这个自动注入依赖 RestTemplate 和 LoadBalancerClient,毕竟负载均衡是为RestTemplate 提供的能力。

类中注入了一个RestTemplate 集合,且用 @LoadBalanced 标注,这个注解只是一个标记注解,被 @Qualifier 注解标注,做了一个过滤,并没有处理@LoadBalanced的逻辑。

@Target({
          
    ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
          
   

}

接下来的代码 生成了 LoadBalancerRequestFactory,loadBalancerInterceptor 和restTemplateCustomizer

@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
		final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
          
   
	return () -> restTemplateCustomizers.ifAvailable(customizers -> {
          
   
		for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
          
   
			for (RestTemplateCustomizer customizer : customizers) {
          
   
				customizer.customize(restTemplate);
			}
		}
	});
}

@Bean
@ConditionalOnMissingBean
public LoadBalancerRequestFactory loadBalancerRequestFactory(
      LoadBalancerClient loadBalancerClient) {
          
   
   return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
}

@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
static class LoadBalancerInterceptorConfig {
          
   

   @Bean
   public LoadBalancerInterceptor loadBalancerInterceptor(
         LoadBalancerClient loadBalancerClient,
         LoadBalancerRequestFactory requestFactory) {
          
   
      return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
   }

   @Bean
   @ConditionalOnMissingBean
   public RestTemplateCustomizer restTemplateCustomizer(
         final LoadBalancerInterceptor loadBalancerInterceptor) {
          
   
      return restTemplate -> {
          
   
         List<ClientHttpRequestInterceptor> list = new ArrayList<>(
               restTemplate.getInterceptors());
         list.add(loadBalancerInterceptor);
         restTemplate.setInterceptors(list);
      };
   }

}

2、LoadBalancerClient 接口

public interface LoadBalancerClient extends ServiceInstanceChooser {
          
   

	//用服务实例执行请求
   <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;

   <T> T execute(String serviceId, ServiceInstance serviceInstance,
         LoadBalancerRequest<T> request) throws IOException;

	//构建URI、
   URI reconstructURI(ServiceInstance instance, URI original);

}
public interface ServiceInstanceChooser {
          
   
	//根据serviceId 选择服务实例
	ServiceInstance choose(String serviceId);

}

3、LoadBalancerInterceptor

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
          
   

   private LoadBalancerClient loadBalancer;

   private LoadBalancerRequestFactory requestFactory;

   public LoadBalancerInterceptor(LoadBalancerClient loadBalancer,
         LoadBalancerRequestFactory requestFactory) {
          
   
      this.loadBalancer = loadBalancer;
      this.requestFactory = requestFactory;
   }

   public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
          
   
      // for backwards compatibility
      this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
   }

   @Override
   public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
         final ClientHttpRequestExecution execution) throws IOException {
          
   
      final URI originalUri = request.getURI();
      String serviceName = originalUri.getHost();
      Assert.state(serviceName != null,
            "Request URI does not contain a valid hostname: " + originalUri);
      return this.loadBalancer.execute(serviceName,
            this.requestFactory.createRequest(request, body, execution));
   }

}

LoadBalancerInterceptor 中有LoadBalancerClient ,LoadBalancerRequestFactory在构造LoadBalancerInterceptor 时会设置这两个成员。

当LoadBalancerInterceptor 拦截http请求时,执行intercept 方法,这是执行loadBalancer.execute方法,根据serviceName选择服务实例进行请求。

4、LoadBalancerRequestFactory

public class LoadBalancerRequestFactory {
          
   

   private LoadBalancerClient loadBalancer;

   private List<LoadBalancerRequestTransformer> transformers;

   public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer,
         List<LoadBalancerRequestTransformer> transformers) {
          
   
      this.loadBalancer = loadBalancer;
      this.transformers = transformers;
   }

   public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer) {
          
   
      this.loadBalancer = loadBalancer;
   }

   public LoadBalancerRequest<ClientHttpResponse> createRequest(
         final HttpRequest request, final byte[] body,
         final ClientHttpRequestExecution execution) {
          
   
      return instance -> {
          
   
         HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance,
               this.loadBalancer);
         if (this.transformers != null) {
          
   
            for (LoadBalancerRequestTransformer transformer : this.transformers) {
          
   
               serviceRequest = transformer.transformRequest(serviceRequest,
                     instance);
            }
         }
         return execution.execute(serviceRequest, body);
      };
   }

}

LoadBalancerRequestFactory会具体执行request请求。

5、接上篇restTemplate,看一下执行流程。

//InterceptingClientHttpRequest.InterceptingRequestExecution#execute
protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
          
   
    //创建一个拦截器执行器
   InterceptingRequestExecution requestExecution = new InterceptingRequestExecution();
    //执行
   return requestExecution.execute(this, bufferedOutput);
}
//org.springframework.http.client.InterceptingClientHttpRequest.InterceptingRequestExecution#execute
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
          
   
      //执行拦截器的方法
      if (this.iterator.hasNext()) {
          
   
         ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
         return nextInterceptor.intercept(request, body, this);
      }
      else {
          
   
         HttpMethod method = request.getMethod();
         Assert.state(method != null, "No standard HTTP method");
         ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
         request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
         if (body.length > 0) {
          
   
            if (delegate instanceof StreamingHttpOutputMessage) {
          
   
               StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
               streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
            }
            else {
          
   
               StreamUtils.copy(body, delegate.getBody());
            }
         }
         return delegate.execute();
      }
   }
}
经验分享 程序员 微信小程序 职场和发展