刚在现在org.springframework.cloud.netflix.eureka.server.InstanceRegistry的每个方法都打了一个断点,而且EurekaServer已经开始调试运行状态,那我们就随便找一个被@EnableEurekaClient的微服务启动微服务来有用吧,直接运行,下面我们就来说一说关于eureka底层原理?我们一起去了解并探讨一下这个问题吧!

eureka底层原理(从源码层面让你认识Eureka工作流程和运作机制)

eureka底层原理

还原还原
  1. Eureka Server 提供服务注册服务,各个节点启动后,会在 Eureka Server 中进行,这样 Eureka Server 中的服务注册表中会启用所有服务节点的信息,服务节点的信息可以在界面中直观的看到。
  2. Eureka Client 是一个 Java 客户端,用于简化与 Eureka Server 的交互,客户端同时也具备一个内置的、使用轮询浏览器的负载均衡器。
  3. 在应用启动后,将向Eureka Server发送心跳(默认周期为30秒),如果Eureka Server在多个默认心跳周期(3个心跳周期=90秒)没有收到某个节点的心跳,Eureka Server将会从服务注册表中把这个服务节点移除。
  4. 高可用情况下的:Eureka Server 之间将通过复制的方式完成数据的同步;
  5. Eureka Client有检查的机制,即使所有的Eureka Server都挂掉的话,客户端依然可以利用中的信息消费其他服务的API;
EurekaServer启动流程分析EurekaServer 处理服务注册、消息数据复制EurekaClient 是如何注册到 EurekaServer 的?

刚在现在org.springframework.cloud.netflix.eureka.server.InstanceRegistry的每个方法都打了一个断点,而且EurekaServer已经开始调试运行状态,那我们就随便找一个被@EnableEurekaClient的微服务启动微服务来有用吧,直接运行。

实例注册方法

InstanceRegistry.register( final InstanceInfo info, final boolean isReplication) 方法进断点了。

应用资源类

主要是处理接收 Http 的服务请求。

@POST @Consumes({ "application/json" , "application/xml" }) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug( "注册实例{} (replication={})" , info.getId(), isReplication); // 验证 instanceinfo 是否包含所有必需的字段 if (isBlank(info.getId())) { return Response.status( 400 ).entity( "Missing instanceId" ).build(); } else if (isBlank(info.getHostName())) { return Response.status( 400 ).entity( "Missing hostname" ).build(); } else if (isBlank(info.getAppName())) { return Response.status( 400 ).entity( "Missing appName" ).build(); } else if (!appName.equals(info.getAppName())) { return Response.status( 400 ).entity( "不匹配的appName,期望" appName " 但是是 " info.getAppName()).build( ); } else if (info.getDataCenterInfo() == null ) { return Response.status( 400 ).entity( "Missing dataCenterInfo" ) .build (); } else if (info.getDataCenterInfo().getName() == null ) { return Response.status( 400 ).entity( "Missing dataCenterInfo Name" ) .build (); } // 处理客户端可能使用错误的 DataCenterInfo 注册并丢失数据的情况 DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); 如果(isBlank(dataCenterInfoId)){ 布尔实验 = "真" .equalsIgnoreCase(serverConfig.getExperimental( "registration.validation.dataCenterInfoId" )); 如果(实验){ String entity = "DataCenterInfo of type " dataCenterInfo.getClass() " 必须包含有效的 id" ; 返回Response.status( 400 ).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; 字符串有效 ID = amazonInfo。获取(AmazonInfo.MetaDataKey.instanceId); 如果(有效 ID == null){ amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } }其他{ logger.warn( "在没有合适的 id 的情况下注册 {} 类型的 DataCenterInfo" , dataCenterInfo.getClass()); } } } registry.register(info, "true" .equals(isReplication)); 返回Response.status( 204 ).build(); // 204 向后兼容 }

@Override public void register (最终InstanceInfo信息,最终 布尔值isReplication) { handleRegistration(信息,resolveInstanceLeaseDuration(信息),isReplication); super .register(info, isReplication); }

私人 无效 handleRegistration (InstanceInfo信息,INT leaseDuration, boolean isReplication) { log ( "register " info.getAppName() ", vip " info.getVIPAddress() ",leaseDuration" leaseDuration ", isReplication " isReplication); 发布事件(新EurekaInstanceRegisteredEvent(这个,信息,leaseDuration, isReplication)); }

服务户厕机制

进入PeerAwareInstanceRegistryImpl类的register(final InstanceInfo info, final boolean isReplication)方法;

@覆盖 公共 无效 寄存器(最终InstanceInfo信息,最终 布尔isReplication) { //注释:续约时间,默认时间是常量值90秒 INT leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; // 自己:续约时间,也可以从配置文件中取出来,所以说续约时间值也是可以让我们自定义配置的 if (info.getLeaseInfo() != null && info.getLeaseInfo()。 getDurationInSecs() > 0 ) { LeaseDuration = info.getLeaseInfo().getDurationInSecs(); } // 注释:将注册方的信息写入 EurekaServer 的注册表,父类为 AbstractInstanceRegistry super .register(info,leasedDuration,isReplication); // 注释:EurekaServer节点之间的数据同步,复制到其他 PeerreplicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null , isReplication); }

进入super.register(info,leasedDuration,isReplication),如何写入EurekaServer的注册表的,进入AbstractInstanceRegistry.register(InstanceInfo registrant, inthaleDuration, boolean isReplication)方法。

公共 无效 注册(InstanceInfo 注册人,int租赁持续时间,布尔值是复制) { 尝试{ 读。锁(); // 这个变量,就是我们的注册表注释,保存在内存中的; Map<String, Lease<InstanceInfo>> gMap = registry. 获取(registrant.getAppName()); REGISTER.increment(isReplication); if (gMap == null ) { final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null ) { gMap = gNewMap; } } 租约<InstanceInfo> existingLease = gMap。获取(registrant.getId()); // 保留最后一个脏时间戳而不覆盖它,如果已经有租约 if (existingLease != null && (existingLease.getHolder() != null )) { Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug( "现有租约找到 (existing={},provided={}" , existingLastDirtyTimestamp, registrationLastDirtyTimestamp); if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn( "有一个现有租约,并且现有租约的脏时间戳 {} 大于" " 比正在注册的那个 {}" , existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn( "使用现有的 instanceInfo 而不是新的 instanceInfo 作为注册人" ); 注册人 = existingLease.getHolder(); } } else { // 租约不存在,因此它是一个新的注册 synchronized ( lock ) { if ( this .expectedNumberOfRenewsPerMin > 0 ) { // 由于客户端想要取消它,降低阈值 // (1 // 30 秒,2 一分钟) this .expectedNumberOfRenewsPerMin = this .expectedNumberOfRenewsPerMin 2 ; 这个.nu​mberOfRenewsPerMinThreshold = ( int ) ( this .expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); } } logger.debug( "没有找到以前的租约信息;这是新注册的" ); } 租约<InstanceInfo>租约=新租约<InstanceInfo>(注册人,租用持续时间); if (existingLease != null ) { Lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } gMap.put(registrant.getId(), 租约); 同步(最近注册队列){ 最近注册队列。添加( new Pair<Long, String>( System.currentTimeMillis(), registrant.getAppName() "(" registrant.getId() ")" )); } //这是其中重载状态初始状态转移发生 ,如果(!InstanceStatus.UNKNOWN。等于(registrant.getOverriddenStatus())){ logger.debug( "Found overridden status {} for instance {}. 检查是否需要添加到 " "overrides" , registrant.getOverriddenStatus(), registrant.getId()); 如果(!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info( "未找到覆盖的 id {} 并因此添加它" , registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap。获取(registrant.getId()); if (overriddenStatusFromMap != null ) { logger.info( "从地图存储覆盖状态{}",overriddenStatusFromMap ); registrant.setOverriddenStatus(overriddenStatusFromMap); } // 根据覆盖的状态规则设置状态 InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); //如果租赁与UP状态,集租赁服务了时间戳注册 ,如果(InstanceStatus.UP。等于(registrant.getStatus())){ 租赁.serviceUp(); } registrant.setActionType(ActionType.ADDED); 最近更改队列。添加(新最近更改的项目(租赁)); registrant.setLastUpdatedTimestamp(); invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info( "注册实例 {}/{} 状态为 {} (replication={})" , registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); }最后{ 读。解锁(); } }

集群之间的复制

replicaToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication) 的这个方法。

私人无效replicateToPeers(行动行动,字符串appName,字符串ID, InstanceInfo 信息/* 可选 */ , InstanceStatus newStatus /* optional */ , boolean isReplication) { Stopwatch tracer = action.getTimer().start(); 尝试{ 如果(isReplication){ numberOfReplicationsLastMin.increment(); } // 如果已经是复制,不要再复制,因为这会造成毒复制 // 注释:如果已经复制过,就不再复制 if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return ; } // 遍历Eureka Server 集群中的所有节点,进行复制操作 for ( final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // 如果url代表本主机,则不要复制到自己。 if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue ; } // 没有复制过,遍历Eureka Server中的节点节点,操作,包括取消、注册、状态、状态更新等。 replicaInstanceActionsToPeers(action, appName, id, info, newStatus, node); } }最后{ tracer.stop(); } }

private void replicationInstanceActionsToPeers(Action action, String appName, 字符串 id、InstanceInfo 信息、InstanceStatus newStatus、 PeerEurekaNode 节点) { 试试{ InstanceInfo infoFromRegistry = null ; CurrentRequestVersion.set(Version.V2); 开关(动作){ case 取消: node.cancel(appName, id); 打破; 案例 心跳: InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false ); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false ); 打破; 案例 注册: 节点注册(信息); 打破; case StatusUpdate: infoFromRegistry = getInstanceByAppAndId(appName, id, false ); node.statusUpdate(appName, id, newStatus, infoFromRegistry); 打破; case DeleteStatusOverride: infoFromRegistry = getInstanceByAppAndId(appName, id, false ); node.deleteStatusOverride(appName, id, infoFromRegistry); 打破; } }捕捉(可扔的 t){ logger.error( "无法将信息复制到 {} 以进行操作 {}" , node.getServiceUrl(), action.name(), t); } }

同级之间的复制机制

PeerEurekaNode.register(final InstanceInfo info) 方法,一窥究竟如何同步数据。

public void register ( final InstanceInfo) throws Exception { // 注释:任务到期时间给任务服务处理,默认时间 偏移当前时间 30 秒long expiryTime = System.currentTimeMillis() getLeaseRenewalOf(info); batchingDispatcher.process( taskId( "register" , info), new InstanceReplicationTask(targetHost, Action.Register, info, null , true ) { public EurekaHttpResponse<Void> execute () { return replicationClient.register(info); } }, 到期时间 ); }

公共 静态<ID,T> TaskDispatcher <ID,T> createBatchingTaskDispatcher (串ID, INT MAXBUFFERSIZE, INT workloadSize, INT workerCount, 长maxBatchingDelay, 长congestionRetryDelayMs, 长networkFailureRetryMs, TaskProcessor<T> taskProcessor) { final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>( id、maxBufferSize、workloadSize、maxBatchingDelay、congestionRetryDelayMs、networkFailureRetryMs ); final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor); return new TaskDispatcher<ID, T>() { @Override public void process (ID id, T task, long expiryTime) { acceptorExecutor.process(id, task, expiryTime); } @Override public void shutdown () { acceptorExecutor.shutdown(); taskExecutor.shutdown(); } }; }

static <ID, T> TaskExecutors<ID, T> batchExecutors ( final String name, int workerCount, final TaskProcessor<T> 处理器, final AcceptorExecutor<ID, T> acceptorExecutor) { final AtomicBoolean isShutdown = new AtomicBoolean(); 最终的TaskExecutorMetrics 指标 =新的TaskExecutorMetrics(name); 返回 新的TaskExecutors<>( newWorkerRunnableFactory <ID,T>(){ @覆盖 公共WorkerRunnable <ID,T>创建(INT IDX) { 返回 新BatchWorkerRunnable <>(“TaskBatchingWorker-” 名称 ' - ' IDX,isShutdown,指标,处理器,acceptorExecutor ); } }, workerCount, isShutdown); }

@Override public void run() { try { while (!isShutdown.get()) { // 注释:获取信号量释放batchWorkRequests.release(),返回任务集合列表 List<TaskHolder<ID, T>> holder = getWork(); 指标.registerExpiryTimes(持有人); List<T> tasks = getTasksOf(holders); //注释:将完成任务打包请求Peer节点 ProcessingResult 结果 = processor.process(tasks); switch (result) { case Success: break ; 案例 拥塞: 案例 瞬态错误: taskDispatcher.reprocess(持有人,结果); 打破; case PermanentError: logger.warn( "由于永久错误而放弃 {} 的 {} 个任务" ,holder.size(), workerName); } metrics.registerTaskResult(result, tasks.size()); } } catch (InterruptedException e) { // 忽略 } catch (Throwable e) { // 安全保护,所以我们永远不会以不受控制的方式退出这个循环。 logger.warn( "发现工作线程错误" , e); } }

@Override public ProcessingResult process(List<ReplicationTask> tasks) { ReplicationList list = createReplicationListOf(tasks); try { // 注释:这里通过 JerseyReplicationClient 客户端对象直接发送列表请求数据 EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list); int statusCode = response.getStatusCode(); if (!isSuccess(statusCode)) { if (statusCode == 503 ) { logger.warn( "Server busy (503) HTTP status code received from peer {};延迟后重新调度任务" , peerId); 返回ProcessingResult.Congestion; } else { // 从服务器返回的意外错误。理想情况下,这应该永远不会发生。 logger.error( "批量更新失败,HTTP 状态码为{};放弃{}复制任务" , statusCode, tasks.size()); 返回ProcessingResult.PermanentError; } }其他{ handleBatchResponse(tasks, response.getEntity().getResponseList()); } } catch (Throwable e) { if (isNetworkConnectException(e)) { logNetworkErrorSample( null , e); 返回ProcessingResult.TransientError; }其他{ logger.error( "不重试这个异常,因为它似乎不是网络异常" , e); 返回ProcessingResult.PermanentError; } } 返回ProcessingResult.Success; }

@覆盖 公共EurekaHttpResponse <ReplicationListResponse> submitBatchUpdates(ReplicationList replicationList){ ClientResponse 响应 = null ; 试试{ 响应 = jerseyApacheClient.resource(serviceUrl) // 注释:这才是重点,请求相对路径,peerreplication/batch/ .path(PeerEurekaNode.BATCH_URL_PATH) .accept(MediaType.APPLICATION_JSON_TYPE) .type(MediaType.APPLICATION_JSON_TYPE) .POST(ClientResponse类,replicationList); 如果(isSuccess(response.getStatus())!){ 回报anEurekaHttpResponse(response.getStatus(),ReplicationListResponse。类).build(); } ReplicationListResponse batchResponse = response.getEntity(ReplicationListResponse。类); 返回anEurekaHttpResponse(response.getStatus(), batchResponse).type(MediaType.APPLICATION_JSON_TYPE).build(); }最后{ 如果(响应!=空){ response.close(); } } }

@Path( "batch" ) @POST public Response batchReplication(ReplicationList replicationList) { try { ReplicationListResponse batchResponse = new ReplicationListResponse(); // 实例:这里将收到的任务列表,方法循环解析处理,主要核心在调度方法中。 for (ReplicationInstanceInfo : replicationList.getReplicationList()) { try { batchResponse.addResponse(dispatch(instanceInfo)); }捕获(异常 e){ batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null )); logger.error(instanceInfo.getAction() "请求处理批处理失败" instanceInfo.getAppName() '/' instanceInfo.getId(), e); } } 返回Response.ok(batchResponse).build(); } catch (Throwable e) { logger.error( "无法执行批量请求" , e); 返回Response.status(Status.INTERNAL_SERVER_ERROR).build(); } }

私有ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) { ApplicationResource applicationResource = createApplicationResource(instanceInfo); InstanceResource 资源 = createInstanceResource(instanceInfo, applicationResource); String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp()); String overriddenStatus = toString(instanceInfo.getOverriddenStatus()); String instanceStatus = toString(instanceInfo.getStatus()); Builder singleResponseBuilder = new Builder(); switch (instanceInfo.getAction()) { case 注册: singleResponseBuilder = handleRegister(instanceInfo, applicationResource); 打破; 案例 心跳: singleResponseBuilder = handleHeartbeat(resource, lastDirtyTimestamp, overriddenStatus, instanceStatus); 打破; 案例 取消: singleResponseBuilder = handleCancel(resource); 打破; 案例状态更新 : singleResponseBuilder = handleStatusUpdate(instanceInfo, resource); 打破; 案例 删除状态覆盖: singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource); 打破; } 返回singleResponseBuilder.build(); }

private static Builder handleRegister (ReplicationInstance instanceInfo, ApplicationResource applicationResource) { // 注释:private static final String REPLICATION = "true"; 定义一个常量值,而且还是方法描述ApplicationResource.addInstance applicationResource.addInstance(instanceInfo.getInstanceInfo(), REPLICATION); return new Builder().setStatusCode(Status.OK.getStatusCode()); }

EurekaClient启动流程分析调换运行模式运行discovery-eureka服务,调试运行provider-user服务,先观察日志先;

2017-10-23 19 :43 :07.688 INFO 1488 --- [main] o .s .c .support .DefaultLifecycleProcessor :在阶段0 中启动 bean 2017-10-23 19 :43 :07.694 INFO --- [main] 1488 ] ø .S .C .N .eureka .InstanceInfoFactory :设置初始实例状态为:STARTING 2017年10月23日19 :43 :07.874 INFO 1488 --- [主要] Ç .N .D .provider .DiscoveryJerseyProvider :使用 JSON 编码 的编解码器 LegacyJacksonJson 2017年10月23日 19 :43 :07.874 INFO 1488 --- [主要] Ç .N .D .provider .DiscoveryJerseyProvider :使用 JSON 解码 编 解码器LegacyJacksonJson 2017-10-23 19 :43 :07.971 INFO 1488 --- [main] c.N .D .provider .DiscoveryJerseyProvider :使用 XML 编码 的编解码器 XStreamXml 2017年10月23日 19 :43 :07.971 INFO 1488 --- [主要] Ç .N .D .provider .DiscoveryJerseyProvider :使用 XML 解码 的编解码器 XStreamXml 2017-10 -23 19 :43 :08.134 INFO 1488 --- [主要] ç .N .D .S .R .aws .ConfigClusterResolver :解决 尤里卡 端点 经由 配置 2017年10月23日 19 :43 :08.344 INFO 1488 --- [主要] COM .netflix夏卓.DiscoveryClient :禁用 增量 属性:假 2017年10月23日 19 :43 :08.344 INFO 1488 - -- [main] com .netflix .discovery .DiscoveryClient :单一 vip 注册表 刷新 属性:空 2017年10月23日 19 :43 :08.344 INFO 1488 --- [主] COM .netflix夏卓.DiscoveryClient :部队 全面 注册表 获取:假 2017年10月23日 19 :43 :08.344 INFO 1488 --- [主] com .netflix .discovery .DiscoveryClient : Application is null : false 2017-10-23 19 :43 :08.344 INFO 1488 --- [主要] COM .netflix夏卓.DiscoveryClient :注册 应用程序 大小 是 零:真 2017年10月23日 19 :43 :08.344 INFO 1488 --- [主要] COM .netflix夏卓.DiscoveryClient :应用 版本 是 -1:真 2017年10月23日 19 :43 :08.345 INFO 1488 --- [主要] COM .netflix夏卓.DiscoveryClient :获取 的所有 实例 的注册表 信息 从 该 尤里卡 服务器 2017年10月23日 19 :43 :08.630 INFO 1488 --- [主] COM .netflix夏卓.DiscoveryClient :该 响应 状态 是 200 2017年10月23日 19 :43 :08.631 信息 1488 --- [主要] com .netflix .discovery .DiscoveryClient :开始 心跳 执行:更新 间隔 是:30 2017年10月23日 19 :43 :08.634 INFO 1488 --- [主要] Ç .N夏卓.InstanceInfoReplicator :InstanceInfoReplicator 按需 更新 允许 速率 每 分钟 是 4 2017年10月23日 19 :43 :08.637 信息 1488 --- [主要] com .netflix .discovery .DiscoveryClient :发现 客户端 初始化 在 时间戳 1508758988637 与 初始 实例 计数:0 2017年10月23日 19 :43 :08.657 INFO 1488 --- [主要] Ç .N .E .EurekaDiscoveryClientConfiguration:注册 应用 springms提供商用户 与 尤里卡 与 状态 UP 2017-10-23 19 :43 :08.658 INFO 1488 --- [主要] com .netflix夏卓.DiscoveryClient :锯 本地 状态 变化 事件 StatusChangeEvent [时间戳= 1508758988658,电流= UP,以前= STARTING] 2017年10月23日 19 :43 :08.659 INFO 1488 --- [nfoReplicator-0] COM .netflix夏卓。 DiscoveryClient : DiscoveryClient_SPRINGMS-PROVIDER-USER / springms-provider-user :192.168.3.101 :7900 :注册 服务... 2017-10-23 19 :43 :08.768 INFO 1488 --- [main] s .b .c .e .t .TomcatEmbeddedServletContainer:Tomcat 在端口(s)上启动 :7900 (http) 2017-10-23 19 :43 :08.768 INFO 1488 --- [main] c . n .e .EurekaDiscoveryClientConfiguration:将端口更新为7900 2017-10-23 19 :43 :08.773 INFO 1488 --- [main] c .s .cloud .MsProviderUserApplication :在882 .1秒内启动 ProviderApplication (JVM 运行10.398)

服务提供方主体加载流程EnableEurekaClient 组件。

@Target (ElementType.TYPE) @Retention (RetentionPolicy.RUNTIME) @Documented @Inherited @EnableDiscoveryClient public @interface EnableEurekaClient {}

@EnableEurekaClient这个注解类自己也使用了注解@EnableDiscoveryClient,那么我们有必要去这个注解类看看。

@target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Import(EnableDiscoveryClientImportSelector.class) 公共@interface EnableDiscoveryClient {}

@EnableDiscoveryClient

这个注解类有个比较特殊的 注解@Import ,由此我们猜想,这里的大多数逻辑是不是都写 在这个EnableDiscoveryClientImportSelector 类呢?

启用DiscoveryClientImportSelector

@Order(Ordered.LOWEST_PRECEDENCE - 100) 公共 类 EnableDiscoveryClientImportSelector 延伸 SpringFactoryImportSelector < EnableDiscoveryClient > { @覆盖 保护 布尔 的IsEnabled () { 返回 新。RelaxedPropertyResolver(getEnvironment())的getProperty( “spring.cloud.discovery.enabled”,Boolean.class , Boolean.TRUE); } @Override protected boolean hasDefaultFactory () { return true ; } }

EnableDiscoveryClientImportSelector 类继承了 SpringFactoryImportSelector 类,但是重写了一个 isEnabled() 方法,默认值返回 true,为什么会返回 true。

/** *选择并返回了其中的类(ES)的名称应是基于对进口 *在{@link AnnotationMetadata}中的进口@ {@链路配置}类。 */ @Override public String [] selectImports(AnnotationMetadata metadata) { if (!isEnabled()) { return new String [ 0 ]; } AnnotationAttributes 属性 = AnnotationAttributes.fromMap( metadata.getAnnotationAttributes( this .annotationClass.getName(), true )); Assert.notNull(attributes, "No " getSimpleName() " attributes found. Is " metadata.getClassName() " @" getSimpleName() "?" ); // 查找所有可能的自动配置类,过滤重复项 List < String > factory = new ArrayList<>( new LinkedHashSet<>(SpringFactoriesLoader .loadFactoryNames(这个.annotationClass,这个.beanClassLoader))); if (factories.isEmpty() && !hasDefaultFactory()) { throw new IllegalStateException( "Annotation @" getSimpleName() " 找到了,但没有实现。你忘记包含一个启动器了吗?" ); } if (factories.size() > 1 ) { // 应该只有一个 DiscoveryClient,但可能不止 一个工厂 log.warn( "More than one implementation" "of @" getSimpleName() "(现在依靠@Conditionals 选择一个):" 工厂); } return factory.toArray( new String [factories.size()]); }

EnableDiscoveryClientImportSelector.selectImports首先通过注解获取一些属性,然后加载了一些类名称,我们进入loadFactoryNames方法看看。

public static List < String > loadFactoryNames(Class<?> factoryClass, ClassLoader classLoader) { String factoryClassName = factoryClass.getName(); try { // 注释:public static final String FACTORIES_RESOURCE_LOCATION = "META-INF/spring.factories"; //注释:这个jar包下的一个配置文件 Enumeration<URL> urls = (classLoader != null ? classLoader.getResources(FACTORIES_RESOURCE_LOCATION) : ClassLoader.getSystemResources(FACTORIES_RESOURCE_LOCATION)); List < String > result = new ArrayList< String >(); 而(urls.hasMoreElements()) { URL url = urls.nextElement(); Properties properties = PropertiesLoaderUtils.loadProperties( new UrlResource(url)); String factoryClassNames = properties.getProperty(factoryClassName); result.addAll(Arrays.asList(StringUtils.commaDelimitedListToStringArray(factoryClassNames))); } 返回结果; } catch (IOException ex) { throw new IllegalArgumentException( "无法加载 [" factoryClass.getName() "] 工厂 [" FACTORIES_RESOURCE_LOCATION "]" , ex); } }

加载了一个配置文件,配置文件里面写了呢?打开SpringFactoryImportSelector该文件属于什么jar包的spring.factories文件一看。

# AutoConfiguration org.springframework.boot.autoconfigure.EnableAutoConfiguration= \ org.springframework.cloud.client.CommonsClientAutoConfiguration, \ org.springframework.cloud.client.discovery.noop.NoopDiscoveryClientAutoConfiguration, \ org.springframework.cloud.client.hypermedia.CloudHypermediaAutoConfiguration , \ org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration, \ org.springframework.cloud.commons.util.UtilAutoConfiguration # 环境后处理器 org.springframework.boot.env.EnvironmentPostProcessor= \ org.springframework.cloud.client.HostInfoEnvironmentPostProcessor

都是一些配置后缀的类名,所以这些都是加载的工作堆的配置文件类。

工厂对象里面只有一个类名路径为 org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration 。

EurekaDiscoveryClientConfiguration

@Configuration @EnableConfigurationProperties @ConditionalOnClass(EurekaClientConfig.class) @ConditionalOnProperty(值= “eureka.client.enabled”,matchIfMissing =真) @CommonsLog 公共 类 EurekaDiscoveryClientConfiguration 器具 SmartLifecycle,有序{ @覆盖 公共无效启动(){ //仅集如果nonSecurePort端口设置为0和this.port!= 0 ,如果(这个.port。获得()!= 0 &&这.instanceConfig.getNonSecurePort()== 0){ 这个.instanceConfig.setNonSecurePort(此.port。获得()); } //初始化只有当nonSecurePort大于0且尚未运行 ,因为containerPortInitializer的//下面 如果(!此.running。获得()&&这个.instanceConfig.getNonSecurePort()> 0){ 也许InitializeClient(); 如果(log.isInfoEnabled()){ log.info( "注册应用程序" this .instanceConfig.getAppname() “带有状态的尤里卡” 这个.instanceConfig.getInitialStatus()); } 这个.applicationInfoManager .setInstanceStatus( this .instanceConfig.getInitialStatus()); if ( this .healthCheckHandler != null ) { this .eurekaClient.registerHealthCheck( this .healthCheckHandler); } 这个.context.publishEvent( new InstanceRegisteredEvent<>( this , this .instanceConfig)); 这个.running。设置(真); } } }

// ApplicationInfoManager.setInstanceStatus 的方法 public synchronized void setInstanceStatus (InstanceStatus status) { // 打上断点 InstanceStatus prev = instanceInfo.setStatus(status); if (prev != null ) { for (StatusChangeListener listener : listeners.values()) { try { listener.notify( new StatusChangeEvent(prev, status)); }捕获(异常 e){ logger.warn( "通知监听失败:{}" , listener.getId(), e); } } } }

public void registerStatusChangeListener (StatusChangeListener listener) { // 打上断点 listeners.put(listener.getId(), listener); }

果不方法其然,EurekaDiscoveryClientConfiguration.start被调用了,紧接着this.applicationInfoManager.setInstanceStatus(this.instanceConfig.getInitialStatus())也进入断点,然后在往下走,又进入的

DiscoveryClient.initScheduledTasks 方法中的通知位置处。

DiscoveryClient 通过@Inject 注解过的构造方法。

@进样 DiscoveryClient(ApplicationInfoManager applicationInfoManager,EurekaClientConfig配置,DiscoveryClientOptionalArgs指定参数时,提供<BackupRegistry> backupRegistryProvider) { 如果(参数=!空){ 此.healthCheckHandlerProvider = args.healthCheckHandlerProvider; 这个.healthCheckCallbackProvider = args.healthCheckCallbackProvider; 这个.eventListeners.addAll(args.getEventListeners()); } else { this .healthCheckCallbackProvider = null ; 这个.healthCheckHandlerProvider = null ; } 这个.applicationInfoManager = applicationInfoManager; InstanceInfo myInfo = applicationInfoManager.getInfo(); 客户端配置 = 配置; staticClientConfig = clientConfig; 运输配置 = config.getTransportConfig(); 实例信息 = 我的信息; 如果(我的信息!= null){ appPathIdentifier = instanceInfo.getAppName() "/" instanceInfo.getId(); }其他{ logger.warn( "将 instanceInfo 设置为传入的空值" ); } 这个.backupRegistryProvider = backupRegistryProvider; this .urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); 本地区域应用程序。设置(新应用程序()); fetchRegistryGeneration = new AtomicLong( 0 ); remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions()); remoteRegionsRef =新的AtomicReference <>(remoteRegionsToFetch。得到()==空?空:remoteRegionsToFetch得到().split( “”)); if (config.shouldFetchRegistry()) { this .registryStalenessMonitor = new ThresholdLevelsMetric( this , METRIC_REGISTRY_PREFIX "lastUpdateSec_" , new long []{ 15 L, 30 L, 60 L, 120 L, 240 L, 480 L); } else { 这个.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } 如果(config.shouldRegisterWithEureka()){ 此.heartbeatStalenessMonitor =新ThresholdLevelsMetric(此,METRIC_REGISTRATION_PREFIX “lastHeartbeatSec_” ,新 长[] { 15 L,30 L,60 L,120 L,240 L,480 L}); } else { 这个.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info( "客户端配置为既不注册也不查询数据。" ); 调度程序=空; heartbeatExecutor = null ; cacheRefreshExecutor = null ; eurekaTransport = null ; instanceRegionChecker = new InstanceRegionChecker( new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // 这是一个小技巧,允许使用 DiscoveryManager.getInstance() 的现有代码 // 与 DI 的 DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient( this ); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info( "Discovery Client 在时间戳 {} 初始化,初始实例计数:{}" , initTimestampMs,这个.getApplications().size()); 返回; // 不需要设置网络任务,我们就完成了 } try { // 注释:定时任务调度准备 scheduler = Executors.newScheduledThreadPool( 3 , new ThreadFactoryBuilder() .setNameFormat( "DiscoveryClient-%d" ) .setDaemon(真) 。建造()); // 注释:实例化心跳定时任务线程池 heartbeatExecutor = new ThreadPoolExecutor( 1 , clientConfig.getHeartbeatExecutorThreadPoolSize(), 0 , TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat( "DiscoveryClient-HeartbeatExecutor-%d" ) .setDaemon(真) 。建造() ); // 使用直接切换 // 注释:实例化刷新定时任务线程池 cacheRefreshExecutor = new ThreadPoolExecutor( 1 ,clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0 , TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat( "DiscoveryClient-CacheRefreshExecutor-%d" ) .setDaemon(真) 。建造() ); // 使用直接切换 eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); azToRegionMapper azToRegionMapper; 如果(clientConfig.shouldUseDnsForFetchingServiceUrls()){ azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); }其他{ azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } 如果(空!= remoteRegionsToFetch。获得()){ azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch。获得().split( “”)); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException( "初始化DiscoveryClient失败!" , e); } if (clientConfig.shouldFetchRegistry() && !fetchRegistry( false )) { fetchRegistryFromBackup(); } // 注释:初始化调度任务 initScheduledTasks(); 试试{ Monitors.registerObject( this ); } catch (Throwable e) { logger.warn( "无法注册定时器" , e); } // 这是一个小技巧,允许使用 DiscoveryManager.getInstance() 的现有代码 // 与 DI 的 DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient( this ); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info( "Discovery Client 在时间戳 {} 初始化,初始实例计数:{}" , initTimestampMs,这个.getApplications().size()); }

private void initScheduledTasks () { if (clientConfig.shouldFetchRegistry()) { // 注册表缓存刷新定时器 // 注释:间隔时间去拉取服务注册信息,默认时间 30秒 int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); //注释:定时任务,每间隔30秒去拉取一次服务注册信息 scheduler.schedule( new TimedSupervisorTask( "cacheRefresh" , 调度程序, 缓存刷新执行器, registryFetchIntervalSeconds, 时间单位.秒, expBackOffBound, 新的缓存刷新线程() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } if (clientConfig.shouldRegisterWithEureka()) { // 注释:间隔发送一次心跳约,默认间隔时间 30 秒 int refreshIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info( "启动心跳执行器:" "更新间隔为:" refreshIntervalInSecs); // 心跳定时器 // 注释:定时任务,每间隔 30 秒去想 EurekaServer 发送一次心跳周约 scheduler.schedule( new TimedSupervisorTask( "heartbeat" , 调度程序, 心跳执行器, 更新IntervalInSecs, 时间单位.秒, expBackOffBound, 新的心跳线程() ), 更新IntervalInSecs, TimeUnit.SECONDS); // InstanceInfo 复制器 // 注释:实例信息复制器,定时刷新dataCenterInfo 数据中心信息,默认30instanceInfoReplicator = new InstanceInfoReplicator( this , 实例信息, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2 ); // 突发大小 // 注释:实例化状态变化监听器 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId () { return "statusChangeListener" ; } @Override public void notify (StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // 如果涉及 DOWN, 则以警告级别记录logger.warn( "Saw local status change event {}" , statusChangeEvent); }其他{ logger.info( "看到本地状态变化事件{}" , statusChangeEvent); } // 状态有变化的话,会说这个方法 instanceInfoReplicator.onDemandUpdate(); } }; // 注释:注册状态变化监听器 if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); }其他{ logger.info( "没有在每个配置中注册尤里卡服务器" ); } }

public boolean onDemandUpdate () { if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { scheduler.submit( new Runnable() { @Override public void run () { logger.debug( "执行本地InstanceInfo的按需更新" ); 未来latestPeriodic = schedulePeriodicRef.get(); if (latestPeriodic != null && !latestPeriodic.isDone()) { logger.debug( "取消最新的预定更新,按需更新结束时重新调度" ); latestPeriodic.cancel( false ); } // 注释:这里进行了实例信息刷新和注册 InstanceInfoReplicator。这个.run(); } }); 返回 真; }其他{ logger.warn( "由于速率限制器而忽略按需更新" ); 返回 假; } }

公共 无效运行(){ 尝试{ discoveryClient.refreshInstanceInfo(); 长dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null ) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } }捕捉(可扔的 t){ logger.warn( "实例信息复制器出现问题" , t); }最后{ Future next = scheduler.schedule( this , replicationIntervalSeconds, TimeUnit.SECONDS); schedulePeriodicRef.set( next ); } }

布尔 寄存器() 抛出Throwable { logger.info(PREFIX appPathIdentifier ": 注册服务..." ); EurekaHttpResponse<Void> httpResponse; 试试{ httpResponse = eurekaTransport.registrationClient.register(instanceInfo); }捕获(异常 e){ logger.warn( "{} - 注册失败 {}" , PREFIX appPathIdentifier, e.getMessage(), e); 扔e; } 如果(logger.isInfoEnabled()){ logger.info( "{} - 注册状态:{}" , PREFIX appPathIdentifier, httpResponse.getStatusCode()); } 返回httpResponse.getStatusCode() == 204 ; }

@覆盖 公共EurekaHttpResponse <空隙>寄存器(InstanceInfo信息){ String urlPath = "apps/" info.getAppName(); ClientResponse 响应 = null ; 试试{ Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); addExtraHeaders(resourceBuilder); 响应 = 资源生成器 .header( "Accept-Encoding" , "gzip" ) .type(MediaType.APPLICATION_JSON_TYPE) .accept(MediaType.APPLICATION_JSON) //注释:打包带上当前应用的所有信息的信息 .POST(ClientResponse类,信息); 返回anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); }最后{ 如果(logger.isDebugEnabled()) { logger.debug( "Jersey HTTP POST {}/{} with instance {}; statusCode={}" , serviceUrl, urlPath, info.getId(), 响应 ==空?"N/A" : response.getStatus()); } 如果(响应!= null){ response.close(); } } }

,