OpenFeign 是 Spring Cloud 家族的一个成员, 它最核心的作用是为 HTTP 形式的 Rest API 提供了非常简洁高效的 RPC 调用方式.它并不是真正意义上的RPC通信,因为他是通过封装代理来实现的。
简单实用- pom.xml文件添加依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
- 主启动类加上@EnableFeginClients注解
- 新增提供者api接口
@FeignClient(name="springcloud-product-provider")
public interface ProductService{
@GetMapping("/product/get/info")
public Result getInfo();
}
主程序入口添加了@EnableFeignClients注解开启对FeignClient扫描加载处理。根据Feign Client的开发规范,定义接口并加@FeignClientd注解。
当程序启动时,会进行包扫描,扫描所有@FeignClients的注解的类,并且将这些信息注入Spring IOC容器中,当定义的的Feign接口中的方法被调用时,通过JDK动态代理方式,来生成具体的RequestTemplate。当生成代理时,Feign会为每个接口方法创建一个RequestTemplate对象,该对象封装可HTTP请求需要的全部信息,如请求参数名,请求方法等信息都是在这个过程中确定的。
然后RequestTemplate生成Request,然后把Request交给Client去处理,这里指的Client可以是JDK原生的URLConnection、Apache的HttpClient、也可以是OKhttp,最后Client被封装到LoadBalanceClient类,这个类结合Ribbon负载均衡发器服务之间的调用。
源码解析
启动类中加入了@EnableFeignClients注解,这个注解的作用其实就是开启了FeignClient的扫描,@Import(FeignClientRegistrar.class)跟bean的动态装载有关。
OpenFeign Configuration :
public static class FeignClientConfiguration {
// 日志
private Logger.Level loggerLevel;
// 连接超时
private Integer connectTimeout;
private Integer readTimeout;
//重试
private Class<Retryer> retryer;
//解码
private Class<ErrorDecoder> errorDecoder;
private List<Class<RequestInterceptor>> requestInterceptors;
// 编码
private Boolean decode404;
private Class<Decoder> decoder;
private Class<Encoder> encoder;
// 解析
private Class<Contract> contract;
private ExceptionPropagationPolicy exceptionPropagationPolicy;
}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(FeignClientsRegistrar.class)
public @interface EnableFeignClients {
}
/**
*org.springframework.context.annotation#ImportBeanDefinitionRegistrar接口创建自定义注入bean
*/
class FeignClientsRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware {
@Override
public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
//注册@EnableFeignClients中定义defaultConfiguration属性下的类,包装成FeignClientSpecification,注册到Spring容器。
//在@FeignClient中有一个属性:configuration,这个属性是表示各个FeignClient自定义的配置类,
//后面也会通过调用registerClientConfiguration方法来注册成FeignClientSpecification到容器。
//所以,这里可以完全理解在@EnableFeignClients中配置的是做为兜底的配置,在各个@FeignClient配置的就是自定义的情况。
registerDefaultConfiguration(metadata, registry);
//该方法负责读取@EnableFeignClients的属性,获取需要扫描的包名,
//然后扫描指定的所有包名下的被@FeignClient注解注释的接口,
//将扫描出来的接口调用registerFeignClient方法注册到spring容器。
registerFeignClients(metadata, registry);
}
/**
*注册默认配置
*/
private void registerDefaultConfiguration(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
//获取注解中默认的配置
Map<String, Object> defaultAttrs = metadata.getAnnotationAttributes(EnableFeignClients.class.getName(), true);
if (defaultAttrs != null && defaultAttrs.containsKey("defaultConfiguration")) {
String name;
if (metadata.hasEnclosingClass()) {
name = "default." metadata.getEnclosingClassName();
}
else {
name = "default." metadata.getClassName();
}
//注册FeignClientSpecification类型的Bean
registerClientConfiguration(registry, name, defaultAttrs.get("defaultConfiguration"));
}
}
private void registerClientConfiguration(BeanDefinitionRegistry registry, Object name, Object configuration) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(FeignClientSpecification.class);
builder.addConstructorArgValue(name);
builder.addConstructorArgValue(configuration);
//注册BeanDefinition
registry.registerBeanDefinition(name "." FeignClientSpecification.class.getSimpleName(),
builder.getBeanDefinition());
}
}
//注册registerFeignClients
public void registerFeignClients(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
//候选BeanDeinition集合
LinkedHashSet<BeanDefinition> candidateComponents = new LinkedHashSet<>();
Map<String, Object> attrs = metadata.getAnnotationAttributes(EnableFeignClients.class.getName());
final Class<?>[] clients = attrs == null ? null : (Class<?>[]) attrs.get("clients");
//判断是否为空,如果为空获取注解@EnableFeignClients参数
if (clients == null || clients.length == 0) {
// ClassPath的条件扫描组件提供者
ClassPathScanningCandidateComponentProvider scanner = getScanner();
scanner.setResourceLoader(this.resourceLoader);
//获取注解的扫描包路径
scanner.addIncludeFilter(new AnnotationTypeFilter(FeignClient.class));
Set<String> basePackages = getBasePackages(metadata);
for (String basePackage : basePackages) {
candidateComponents.addAll(scanner.findCandidateComponents(basePackage));
}
}
else {
for (Class<?> clazz : clients) {
candidateComponents.add(new AnnotatedGenericBeanDefinition(clazz));
}
}
for (BeanDefinition candidateComponent : candidateComponents) {
if (candidateComponent instanceof AnnotatedBeanDefinition) {
// verify annotated class is an interface
AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent;
AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();
Assert.isTrue(annotationMetadata.isInterface(), "@FeignClient can only be specified on an interface");
Map<String, Object> attributes = annotationMetadata
.getAnnotationAttributes(FeignClient.class.getCanonicalName());
String name = getClientName(attributes);
//注册被调用客户端配置
//注册(微服务名).FeignClientSpecification类型的bean
//beanname: order-center.FeignClientSpecification
registerClientConfiguration(registry, name, attributes.get("configuration"));
//注册 FeignClient
registerFeignClient(registry, annotationMetadata, attributes);
}
}
}
// 注册 FeignClient,组装BeanDefinition,实质是一个FeignClientFactoryBean,然后注册到Spring IOC容器。
private void registerFeignClient(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata,
Map<String, Object> attributes) {
String className = annotationMetadata.getClassName();
Class clazz = ClassUtils.resolveClassName(className, null);
ConfigurableBeanFactory beanFactory = registry instanceof ConfigurableBeanFactory
? (ConfigurableBeanFactory) registry : null;
String contextId = getContextId(beanFactory, attributes);
String name = getName(attributes);
//FeignClientFactoryBean
FeignClientFactoryBean factoryBean = new FeignClientFactoryBean();
factoryBean.setBeanFactory(beanFactory);
factoryBean.setName(name);
factoryBean.setContextId(contextId);
factoryBean.setType(clazz);
factoryBean.setRefreshableClient(isClientRefreshEnabled());
BeanDefinitionBuilder definition = BeanDefinitionBuilder.genericBeanDefinition(clazz, () -> {
factoryBean.setUrl(getUrl(beanFactory, attributes));
factoryBean.setPath(getPath(beanFactory, attributes));
factoryBean.setDecode404(Boolean.parseBoolean(String.valueOf(attributes.get("decode404"))));
Object fallback = attributes.get("fallback");
if (fallback != null) {
factoryBean.setFallback(fallback instanceof Class ? (Class<?>) fallback
: ClassUtils.resolveClassName(fallback.toString(), null));
}
Object fallbackFactory = attributes.get("fallbackFactory");
if (fallbackFactory != null) {
factoryBean.setFallbackFactory(fallbackFactory instanceof Class ? (Class<?>) fallbackFactory
: ClassUtils.resolveClassName(fallbackFactory.toString(), null));
}
return factoryBean.getObject();
});
definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
definition.setLazyInit(true);
validate(attributes);
AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();
beanDefinition.setAttribute(FactoryBean.OBJECT_TYPE_ATTRIBUTE, className);
beanDefinition.setAttribute("feignClientsRegistrarFactoryBean", factoryBean);
// has a default, won't be null
boolean primary = (Boolean) attributes.get("primary");
beanDefinition.setPrimary(primary);
String[] qualifiers = getQualifiers(attributes);
if (ObjectUtils.isEmpty(qualifiers)) {
qualifiers = new String[] { contextId "FeignClient" };
}
BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className, qualifiers);
BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
registerOptionsBeanDefinition(registry, contextId);
}
Spring容器启动,调用AbstractApplicationContext#refresh方法,
在refresh方法内部调用finishBeanFactoryInitialization方法对单例bean进行初始化,
finishBeanFactoryInitialization方法调用getBean获取name对应的bean实例,如果不存在,则创建一个,即调用doGetBean方法。
doGetBean调用createBean方法,createBean方法调用doCreateBean方法。
doCreateBean()方法主要是根据 beanName、mbd、args,使用对应的策略创建 bean 实例,并返回包装类 BeanWrapper。
doCreateBean方法中调用populateBean对 bean 进行属性填充;其中,可能存在依赖于其他 bean 的属性,则会递归初始化依赖的 bean 实例。
在初始化FeignContext时,会把configurations在容器中放入FeignContext中。configurations 的来源就是在前面registerFeignClients方法中将@FeignClient的配置 configuration。
接着,构建feign.builder,在构建时会向FeignContext获取配置的Encoder,Decoder等各种信息。FeignContext在上文中已经提到会为每个Feign客户端分配了一个容器,它们的父容器就是spring容器
配置完Feign.Builder之后,再判断是否需要LoadBalance,如果需要,则通过LoadBalance的方法来设置。实际上他们最终调用的是Target.target()方法。
public <T> T target(Target<T> target) {
return build().newInstance(target);
}
public Feign build() {
SynchronousMethodHandler.Factory synchronousMethodHandlerFactory =
new SynchronousMethodHandler.Factory(client, retryer, requestInterceptors, logger,
logLevel, decode404, closeAfterDecode, propagationPolicy);
ParseHandlersByName handlersByName =
new ParseHandlersByName(contract, options, encoder, decoder, queryMapEncoder,
errorDecoder, synchronousMethodHandlerFactory);
return new ReflectiveFeign(handlersByName, invocationHandlerFactory, queryMapEncoder);
}
}
//ReflectiveFeign
public <T> T newInstance(Target<T> target) {
Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();
for (Method method : target.type().getMethods()) {
if (method.getDeclaringClass() == Object.class) {
continue;
} else if (Util.isDefault(method)) {
DefaultMethodHandler handler = new DefaultMethodHandler(method);
defaultMethodHandlers.add(handler);
methodToHandler.put(method, handler);
} else {
methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
}
}
InvocationHandler handler = factory.create(target, methodToHandler);
T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(),
new Class<?>[] {target.type()}, handler);
for (DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
defaultMethodHandler.bindTo(proxy);
}
return proxy;
}
SynchronousMethodHandler
@Override
public Object invoke(Object[] argv) throws Throwable {
RequestTemplate template = buildTemplateFromArgs.create(argv);
Options options = findOptions(argv);
Retryer retryer = this.retryer.clone();
while (true) {
try {
//执行查询
return executeAndDecode(template, options);
} catch (RetryableException e) {
try {
retryer.continueOrPropagate(e);
} catch (RetryableException th) {
Throwable cause = th.getCause();
if (propagationPolicy == UNWRAP && cause != null) {
throw cause;
} else {
throw th;
}
}
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
continue;
}
}
}
//SynchronousMethodHandler 获取请求结果后,解码
Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
Request request = targetRequest(template);
if (logLevel != Logger.Level.NONE) {
logger.logRequest(metadata.configKey(), logLevel, request);
}
Response response;
long start = System.nanoTime();
try {
response = client.execute(request, options);
// ensure the request is set. TODO: remove in Feign 12
response = response.toBuilder()
.request(request)
.requestTemplate(template)
.build();
} catch (IOException e) {
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
}
throw errorExecuting(request, e);
}
long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
if (decoder != null)
return decoder.decode(response, metadata.returnType());
CompletableFuture<Object> resultFuture = new CompletableFuture<>();
asyncResponseHandler.handleResponse(resultFuture, metadata.configKey(), response,
metadata.returnType(),
elapsedTime);
try {
if (!resultFuture.isDone())
throw new IllegalStateException("Response handling not done");
return resultFuture.join();
} catch (CompletionException e) {
Throwable cause = e.getCause();
if (cause != null)
throw cause;
throw e;
}
}
获取负载均衡服务并执行查询
public class LoadBalancerFeignClient implements Client {
@Override
public Response execute(Request request, Request.Options options) throws IOException {
try {
URI asUri = URI.create(request.url());
String clientName = asUri.getHost();
URI uriWithoutHost = cleanUrl(request.url(), clientName);
FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
this.delegate, request, uriWithoutHost);
IClientConfig requestConfig = getClientConfig(options, clientName);
return lbClient(clientName)
.executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
}
catch (ClientException e) {
IOException io = findIOException(e);
if (io != null) {
throw io;
}
throw new RuntimeException(e);
}
}
//
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}
}
//客户端调用fegin 接口时,拦截器
static class FeignInvocationHandler implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if ("equals".equals(method.getName())) {
try {
Object otherHandler =
args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
return equals(otherHandler);
} catch (IllegalArgumentException e) {
return false;
}
} else if ("hashCode".equals(method.getName())) {
return hashCode();
} else if ("toString".equals(method.getName())) {
return toString();
}
//@FeginClent注解下的方法 启动的时候已经维护
return dispatch.get(method).invoke(args);
}
}
//选择定义的服务地址
//@FeignClient(value = "openfeign-server") openfegin-server查询对应的服务地址
Server svc = lb.chooseServer(loadBalancerKey);
调用 ZoneAwareLoadBalancer#chooseServer
PredicateBasedRule#choose方法
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
/**
* 默认获取 第几个服务器数
* Referenced from RoundRobinRule
* Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}.
*
* @param modulo 所有服务器个数
* @return The next value.
*/
private final AtomicInteger nextIndex = new AtomicInteger();
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextIndex.get();
int next = (current 1) % modulo;
if (nextIndex.compareAndSet(current, next) && current < modulo)
return current;
}
}
设计模式
动态代理
,