简介

OpenFeign 是 Spring Cloud 家族的一个成员, 它最核心的作用是为 HTTP 形式的 Rest API 提供了非常简洁高效的 RPC 调用方式.它并不是真正意义上的RPC通信,因为他是通过封装代理来实现的。

简单实用

<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency>

@FeignClient(name="springcloud-product-provider") public interface ProductService{ @GetMapping("/product/get/info") public Result getInfo(); }

工作原理

主程序入口添加了@EnableFeignClients注解开启对FeignClient扫描加载处理。根据Feign Client的开发规范,定义接口并加@FeignClientd注解。

当程序启动时,会进行包扫描,扫描所有@FeignClients的注解的类,并且将这些信息注入Spring IOC容器中,当定义的的Feign接口中的方法被调用时,通过JDK动态代理方式,来生成具体的RequestTemplate。当生成代理时,Feign会为每个接口方法创建一个RequestTemplate对象,该对象封装可HTTP请求需要的全部信息,如请求参数名,请求方法等信息都是在这个过程中确定的。

然后RequestTemplate生成Request,然后把Request交给Client去处理,这里指的Client可以是JDK原生的URLConnection、Apache的HttpClient、也可以是OKhttp,最后Client被封装到LoadBalanceClient类,这个类结合Ribbon负载均衡发器服务之间的调用。

springcloud 注册与发现代码(springCloud-openfeign源码解析)(1)

源码解析

启动类中加入了@EnableFeignClients注解,这个注解的作用其实就是开启了FeignClient的扫描,@Import(FeignClientRegistrar.class)跟bean的动态装载有关。

OpenFeign Configuration :

public static class FeignClientConfiguration { // 日志 private Logger.Level loggerLevel; // 连接超时 private Integer connectTimeout; private Integer readTimeout; //重试 private Class<Retryer> retryer; //解码 private Class<ErrorDecoder> errorDecoder; private List<Class<RequestInterceptor>> requestInterceptors; // 编码 private Boolean decode404; private Class<Decoder> decoder; private Class<Encoder> encoder; // 解析 private Class<Contract> contract; private ExceptionPropagationPolicy exceptionPropagationPolicy; }

@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) @Documented @Import(FeignClientsRegistrar.class) public @interface EnableFeignClients { } /** *org.springframework.context.annotation#ImportBeanDefinitionRegistrar接口创建自定义注入bean */ class FeignClientsRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware { @Override public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) { //注册@EnableFeignClients中定义defaultConfiguration属性下的类,包装成FeignClientSpecification,注册到Spring容器。  //在@FeignClient中有一个属性:configuration,这个属性是表示各个FeignClient自定义的配置类, //后面也会通过调用registerClientConfiguration方法来注册成FeignClientSpecification到容器。  //所以,这里可以完全理解在@EnableFeignClients中配置的是做为兜底的配置,在各个@FeignClient配置的就是自定义的情况。 registerDefaultConfiguration(metadata, registry); //该方法负责读取@EnableFeignClients的属性,获取需要扫描的包名, //然后扫描指定的所有包名下的被@FeignClient注解注释的接口, //将扫描出来的接口调用registerFeignClient方法注册到spring容器。 registerFeignClients(metadata, registry); } /** *注册默认配置 */ private void registerDefaultConfiguration(AnnotationMetadata metadata, BeanDefinitionRegistry registry) { //获取注解中默认的配置 Map<String, Object> defaultAttrs = metadata.getAnnotationAttributes(EnableFeignClients.class.getName(), true); if (defaultAttrs != null && defaultAttrs.containsKey("defaultConfiguration")) { String name; if (metadata.hasEnclosingClass()) { name = "default." metadata.getEnclosingClassName(); } else { name = "default." metadata.getClassName(); } //注册FeignClientSpecification类型的Bean registerClientConfiguration(registry, name, defaultAttrs.get("defaultConfiguration")); } } private void registerClientConfiguration(BeanDefinitionRegistry registry, Object name, Object configuration) { BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(FeignClientSpecification.class); builder.addConstructorArgValue(name); builder.addConstructorArgValue(configuration); //注册BeanDefinition registry.registerBeanDefinition(name "." FeignClientSpecification.class.getSimpleName(), builder.getBeanDefinition()); } } //注册registerFeignClients public void registerFeignClients(AnnotationMetadata metadata, BeanDefinitionRegistry registry) { //候选BeanDeinition集合 LinkedHashSet<BeanDefinition> candidateComponents = new LinkedHashSet<>(); Map<String, Object> attrs = metadata.getAnnotationAttributes(EnableFeignClients.class.getName()); final Class<?>[] clients = attrs == null ? null : (Class<?>[]) attrs.get("clients"); //判断是否为空,如果为空获取注解@EnableFeignClients参数 if (clients == null || clients.length == 0) { // ClassPath的条件扫描组件提供者 ClassPathScanningCandidateComponentProvider scanner = getScanner(); scanner.setResourceLoader(this.resourceLoader); //获取注解的扫描包路径 scanner.addIncludeFilter(new AnnotationTypeFilter(FeignClient.class)); Set<String> basePackages = getBasePackages(metadata); for (String basePackage : basePackages) { candidateComponents.addAll(scanner.findCandidateComponents(basePackage)); } } else { for (Class<?> clazz : clients) { candidateComponents.add(new AnnotatedGenericBeanDefinition(clazz)); } } for (BeanDefinition candidateComponent : candidateComponents) { if (candidateComponent instanceof AnnotatedBeanDefinition) { // verify annotated class is an interface AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent; AnnotationMetadata annotationMetadata = beanDefinition.getMetadata(); Assert.isTrue(annotationMetadata.isInterface(), "@FeignClient can only be specified on an interface"); Map<String, Object> attributes = annotationMetadata .getAnnotationAttributes(FeignClient.class.getCanonicalName()); String name = getClientName(attributes); //注册被调用客户端配置 //注册(微服务名).FeignClientSpecification类型的bean //beanname: order-center.FeignClientSpecification registerClientConfiguration(registry, name, attributes.get("configuration")); //注册 FeignClient registerFeignClient(registry, annotationMetadata, attributes); } } } // 注册 FeignClient,组装BeanDefinition,实质是一个FeignClientFactoryBean,然后注册到Spring IOC容器。 private void registerFeignClient(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata, Map<String, Object> attributes) { String className = annotationMetadata.getClassName(); Class clazz = ClassUtils.resolveClassName(className, null); ConfigurableBeanFactory beanFactory = registry instanceof ConfigurableBeanFactory ? (ConfigurableBeanFactory) registry : null; String contextId = getContextId(beanFactory, attributes); String name = getName(attributes); //FeignClientFactoryBean FeignClientFactoryBean factoryBean = new FeignClientFactoryBean(); factoryBean.setBeanFactory(beanFactory); factoryBean.setName(name); factoryBean.setContextId(contextId); factoryBean.setType(clazz); factoryBean.setRefreshableClient(isClientRefreshEnabled()); BeanDefinitionBuilder definition = BeanDefinitionBuilder.genericBeanDefinition(clazz, () -> { factoryBean.setUrl(getUrl(beanFactory, attributes)); factoryBean.setPath(getPath(beanFactory, attributes)); factoryBean.setDecode404(Boolean.parseBoolean(String.valueOf(attributes.get("decode404")))); Object fallback = attributes.get("fallback"); if (fallback != null) { factoryBean.setFallback(fallback instanceof Class ? (Class<?>) fallback : ClassUtils.resolveClassName(fallback.toString(), null)); } Object fallbackFactory = attributes.get("fallbackFactory"); if (fallbackFactory != null) { factoryBean.setFallbackFactory(fallbackFactory instanceof Class ? (Class<?>) fallbackFactory : ClassUtils.resolveClassName(fallbackFactory.toString(), null)); } return factoryBean.getObject(); }); definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE); definition.setLazyInit(true); validate(attributes); AbstractBeanDefinition beanDefinition = definition.getBeanDefinition(); beanDefinition.setAttribute(FactoryBean.OBJECT_TYPE_ATTRIBUTE, className); beanDefinition.setAttribute("feignClientsRegistrarFactoryBean", factoryBean); // has a default, won't be null boolean primary = (Boolean) attributes.get("primary"); beanDefinition.setPrimary(primary); String[] qualifiers = getQualifiers(attributes); if (ObjectUtils.isEmpty(qualifiers)) { qualifiers = new String[] { contextId "FeignClient" }; } BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className, qualifiers); BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry); registerOptionsBeanDefinition(registry, contextId); }

Spring容器启动,调用AbstractApplicationContext#refresh方法,

在refresh方法内部调用finishBeanFactoryInitialization方法对单例bean进行初始化,

finishBeanFactoryInitialization方法调用getBean获取name对应的bean实例,如果不存在,则创建一个,即调用doGetBean方法。

doGetBean调用createBean方法,createBean方法调用doCreateBean方法。

doCreateBean()方法主要是根据 beanName、mbd、args,使用对应的策略创建 bean 实例,并返回包装类 BeanWrapper。

doCreateBean方法中调用populateBean对 bean 进行属性填充;其中,可能存在依赖于其他 bean 的属性,则会递归初始化依赖的 bean 实例。

在初始化FeignContext时,会把configurations在容器中放入FeignContext中。configurations 的来源就是在前面registerFeignClients方法中将@FeignClient的配置 configuration。

  接着,构建feign.builder,在构建时会向FeignContext获取配置的Encoder,Decoder等各种信息。FeignContext在上文中已经提到会为每个Feign客户端分配了一个容器,它们的父容器就是spring容器

  配置完Feign.Builder之后,再判断是否需要LoadBalance,如果需要,则通过LoadBalance的方法来设置。实际上他们最终调用的是Target.target()方法。

public <T> T target(Target<T> target) { return build().newInstance(target); } public Feign build() { SynchronousMethodHandler.Factory synchronousMethodHandlerFactory = new SynchronousMethodHandler.Factory(client, retryer, requestInterceptors, logger, logLevel, decode404, closeAfterDecode, propagationPolicy); ParseHandlersByName handlersByName = new ParseHandlersByName(contract, options, encoder, decoder, queryMapEncoder, errorDecoder, synchronousMethodHandlerFactory); return new ReflectiveFeign(handlersByName, invocationHandlerFactory, queryMapEncoder); } } //ReflectiveFeign public <T> T newInstance(Target<T> target) { Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target); Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>(); List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>(); for (Method method : target.type().getMethods()) { if (method.getDeclaringClass() == Object.class) { continue; } else if (Util.isDefault(method)) { DefaultMethodHandler handler = new DefaultMethodHandler(method); defaultMethodHandlers.add(handler); methodToHandler.put(method, handler); } else { methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method))); } } InvocationHandler handler = factory.create(target, methodToHandler); T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(), new Class<?>[] {target.type()}, handler); for (DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) { defaultMethodHandler.bindTo(proxy); } return proxy; }

SynchronousMethodHandler @Override public Object invoke(Object[] argv) throws Throwable { RequestTemplate template = buildTemplateFromArgs.create(argv); Options options = findOptions(argv); Retryer retryer = this.retryer.clone(); while (true) { try { //执行查询 return executeAndDecode(template, options); } catch (RetryableException e) { try { retryer.continueOrPropagate(e); } catch (RetryableException th) { Throwable cause = th.getCause(); if (propagationPolicy == UNWRAP && cause != null) { throw cause; } else { throw th; } } if (logLevel != Logger.Level.NONE) { logger.logRetry(metadata.configKey(), logLevel); } continue; } } }

//SynchronousMethodHandler 获取请求结果后,解码 Object executeAndDecode(RequestTemplate template, Options options) throws Throwable { Request request = targetRequest(template); if (logLevel != Logger.Level.NONE) { logger.logRequest(metadata.configKey(), logLevel, request); } Response response; long start = System.nanoTime(); try { response = client.execute(request, options); // ensure the request is set. TODO: remove in Feign 12 response = response.toBuilder() .request(request) .requestTemplate(template) .build(); } catch (IOException e) { if (logLevel != Logger.Level.NONE) { logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start)); } throw errorExecuting(request, e); } long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); if (decoder != null) return decoder.decode(response, metadata.returnType()); CompletableFuture<Object> resultFuture = new CompletableFuture<>(); asyncResponseHandler.handleResponse(resultFuture, metadata.configKey(), response, metadata.returnType(), elapsedTime); try { if (!resultFuture.isDone()) throw new IllegalStateException("Response handling not done"); return resultFuture.join(); } catch (CompletionException e) { Throwable cause = e.getCause(); if (cause != null) throw cause; throw e; } }

获取负载均衡服务并执行查询

public class LoadBalancerFeignClient implements Client { @Override public Response execute(Request request, Request.Options options) throws IOException { try { URI asUri = URI.create(request.url()); String clientName = asUri.getHost(); URI uriWithoutHost = cleanUrl(request.url(), clientName); FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest( this.delegate, request, uriWithoutHost); IClientConfig requestConfig = getClientConfig(options, clientName); return lbClient(clientName) .executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse(); } catch (ClientException e) { IOException io = findIOException(e); if (io != null) { throw io; } throw new RuntimeException(e); } } // public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException { LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig); try { return command.submit( new ServerOperation<T>() { @Override public Observable<T> call(Server server) { URI finalUri = reconstructURIWithServer(server, request.getUri()); S requestForServer = (S) request.replaceUri(finalUri); try { return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)); } catch (Exception e) { return Observable.error(e); } } }) .toBlocking() .single(); } catch (Exception e) { Throwable t = e.getCause(); if (t instanceof ClientException) { throw (ClientException) t; } else { throw new ClientException(e); } } }

//客户端调用fegin 接口时,拦截器 static class FeignInvocationHandler implements InvocationHandler { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if ("equals".equals(method.getName())) { try { Object otherHandler = args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null; return equals(otherHandler); } catch (IllegalArgumentException e) { return false; } } else if ("hashCode".equals(method.getName())) { return hashCode(); } else if ("toString".equals(method.getName())) { return toString(); } //@FeginClent注解下的方法 启动的时候已经维护 return dispatch.get(method).invoke(args); } } //选择定义的服务地址 //@FeignClient(value = "openfeign-server") openfegin-server查询对应的服务地址 Server svc = lb.chooseServer(loadBalancerKey); 调用 ZoneAwareLoadBalancer#chooseServer PredicateBasedRule#choose方法 public Server choose(Object key) { ILoadBalancer lb = getLoadBalancer(); Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); if (server.isPresent()) { return server.get(); } else { return null; } } public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) { List<Server> eligible = getEligibleServers(servers, loadBalancerKey); if (eligible.size() == 0) { return Optional.absent(); } return Optional.of(eligible.get(incrementAndGetModulo(eligible.size()))); } /** * 默认获取 第几个服务器数 * Referenced from RoundRobinRule * Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}. * * @param modulo 所有服务器个数 * @return The next value. */ private final AtomicInteger nextIndex = new AtomicInteger(); private int incrementAndGetModulo(int modulo) { for (;;) { int current = nextIndex.get(); int next = (current 1) % modulo; if (nextIndex.compareAndSet(current, next) && current < modulo) return current; } }

springcloud 注册与发现代码(springCloud-openfeign源码解析)(2)

springcloud 注册与发现代码(springCloud-openfeign源码解析)(3)

springcloud 注册与发现代码(springCloud-openfeign源码解析)(4)

springcloud 注册与发现代码(springCloud-openfeign源码解析)(5)

springcloud 注册与发现代码(springCloud-openfeign源码解析)(6)

设计模式

动态代理

,