注册中心相关组件讨论

eureka 原理

1、鸟瞰


每个eureka客户端都需要添加@EnableDiscoverClient注解,作用是开启一个DiscoveryClient客户端,具体实现是在eureka相关配置类加载的时候调用了DiscoveryClient构造方法来初始化的

1
2
3
4
5
6
7
8
9
 
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import({EnableDiscoveryClientImportSelector.class})
public @interface EnableDiscoveryClient {
boolean autoRegister() default true;
}

看看DiscoveryClient它的注解

构造函数

1
2
3
4
5
6
7
 @Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
...
initScheduledTasks();//初始化定时任务(注册、续约、取消续约、拉取服务)
...
}

1
2
3
4
5
6
7
8
9
10
private void initScheduledTasks() {
// 拉取服务
if (clientConfig.shouldFetchRegistry()) {
。。。
}
// 注册 & 续约
if (clientConfig.shouldRegisterWithEureka()) {
。。。
}
}

客户端

注册&续约

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
       if (clientConfig.shouldRegisterWithEureka()) {
// 续约、心跳时间间隔
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
// 续约、心跳如果失败,则重新开始时间倍数
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
//续约、心跳 定时任务
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);

// 1、注册的主要类,实际上是个线程 implement runnable接口
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
// 2、服务状态监听器,发生错误了会激活主动退回送信息到注册中心
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
// 主动推送信息到注册中心
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 开启注册服务
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
}

我们来看看InstanceInfoReplicator 这个类的start和run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
  public void start(int initialDelayMs) {
// cas 原子比较(定时任务和服务出错2操作并存,保证单线程操作)
if (started.compareAndSet(false, true)) {
instanceInfo.setIsDirty(); // for initial register
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}

public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
// 真正的注册方法
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}

DiscoveryClient里面的register注册方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
boolean register() throws Throwable {
logger.info(PREFIX + appPathIdentifier + ": registering service...");
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}

实际上它是调用了 RetryableEurekaHttpClient 的execute 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Override
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
List<EurekaEndpoint> candidateHosts = null;
int endpointIdx = 0;
// 循环次数,默认3次,成功就reutrn
for (int retry = 0; retry < numberOfRetries; retry++) {
EurekaHttpClient currentHttpClient = delegate.get();
EurekaEndpoint currentEndpoint = null;
if (currentHttpClient == null) {
if (candidateHosts == null) {
// 获取注册中心的节点(全部和无效取交集)
candidateHosts = getHostCandidates();
if (candidateHosts.isEmpty()) {
throw new TransportException("There is no known eureka server; cluster server list is empty");
}
}
if (endpointIdx >= candidateHosts.size()) {
throw new TransportException("Cannot execute request on any known server");
}
// 只获取了一个节点
currentEndpoint = candidateHosts.get(endpointIdx++);
currentHttpClient = clientFactory.newClient(currentEndpoint);
}
try {
// 发送请求注册
EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
delegate.set(currentHttpClient);
if (retry > 0) {
logger.info("Request execution succeeded on retry #{}", retry);
}
// 直接reutrn 了
return response;
}
logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
} catch (Exception e) {
logger.warn("Request execution failed with message: {}", e.getMessage()); // just log message as the underlying client should log the stacktrace
}
// Connection error or 5xx from the server that must be retried on another server
delegate.compareAndSet(currentHttpClient, null);
if (currentEndpoint != null) {
// 无效的节点
quarantineSet.add(currentEndpoint);
}
}
throw new TransportException("Retry limit reached; giving up on completing the request");
}

代码太多就不继续跟进了,实质是向注册中心的一个节点发送POST方法向注册中心注册,那么注册中心是怎么处理这个请求的呢,后面有分析。

同理我们看看DiscoveryClient里面的renew注册方法,不再具体分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
// 发送http请求
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
// 各种状态码判断
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
return register();
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
return false;
}
}

服务获取

这个比较简单,也不再赘述,按照注册思路可以很容易看明白

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
// 这个线程为实现拉取服务主要工作
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// 内部类,一个线程
class CacheRefreshThread implements Runnable {
public void run() {
// 真正实现功能方法
refreshRegistry();
}
}

服务端

  • 接受注册
    怎么找到接受注册的类呢,我们看看日志

1
2
3
4
5
6
7
8
9
10
11
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
// 调用了父类的注册方法
super.register(info, leaseDuration, isReplication);
// 同步到其他节点
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

父类的regiser方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
//首先根据appName获取一些列的服务实例对象,如果为Null,则新创建一个map并把当前的注册应用程序信息添加到此Map当中
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
... 注册过程

} finally {
read.unlock();
}
}

同步到其他节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
    private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
...
//重点
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}


private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}

其他源码不讲了,大致差不多的思路。

feign 机制原理

使用

  • 在启动类上新增@EnableFeignClients注解
  • 在service接口上添加@FeignClient(“注册到eureka服务名字”)
  • service方法上添加 @RequestMapping(提供者controller的url)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@SpringBootApplication
@RestController
@EnableEurekaClient
@EnableFeignClients
public class EurekaConsumerApplication {
public static void main(String[] args) {

SpringApplication.run(EurekaConsumerApplication.class, args);

}
}

@FeignClient("produce-service-eureka")
public interface FeignClientService {

@RequestMapping("ek/provider")
String consumer();
}

原理

  • 通过EnableFeignClients 注解开启FeignClien,他会扫描带有@FeignClient的注解,注入到IOC容器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void registerFeignClients(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
ClassPathScanningCandidateComponentProvider scanner = getScanner();
scanner.setResourceLoader(this.resourceLoader);

Set<String> basePackages;
//获取带有EnableFeignClients注解的默认元数据
Map<String, Object> attrs = metadata.getAnnotationAttributes(EnableFeignClients.class.getName());
...
for (String basePackage : basePackages) {
Set<BeanDefinition> candidateComponents = scanner
.findCandidateComponents(basePackage);
for (BeanDefinition candidateComponent : candidateComponents) {
if (candidateComponent instanceof AnnotatedBeanDefinition) {
// verify annotated class is an interface
AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent;
AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();
Assert.isTrue(annotationMetadata.isInterface(),
"@FeignClient can only be specified on an interface");
Map<String, Object> attributes = annotationMetadata.getAnnotationAttributes(FeignClient.class.getCanonicalName());
String name = getClientName(attributes);
registerClientConfiguration(registry, name, attributes.get("configuration"));
// 注册
registerFeignClient(registry, annotationMetadata, attributes);
}
}
}
}

private void registerFeignClient(BeanDefinitionRegistry registry,
AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {
String className = annotationMetadata.getClassName();
// 关键,将BeanDefinition包装成FeignClientFactoryBean类型的工厂Bean,待注入到IOC
BeanDefinitionBuilder definition = BeanDefinitionBuilder.genericBeanDefinition(FeignClientFactoryBean.class);
。。。
BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
}

工厂Bean我们都知道通过getObject方法获取的是具体的Bean实例,不是工厂本身

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override
public Object getObject() throws Exception {
FeignContext context = applicationContext.getBean(FeignContext.class);
Feign.Builder builder = feign(context);
if (!StringUtils.hasText(this.url)) {
String url;
if (!this.name.startsWith("http")) {
url = "http://" + this.name;
}
else {
url = this.name;
}
url += cleanPath();
//加入robbin作负载均衡
return loadBalance(builder, context, new HardCodedTarget<>(this.type,
this.name, url));
}
if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) {
this.url = "http://" + this.url;
}
String url = this.url + cleanPath();
Client client = getOptional(context, Client.class);
if (client != null) {
if (client instanceof LoadBalancerFeignClient) {
// not lod balancing because we have a url,
// but ribbon is on the classpath, so unwrap
client = ((LoadBalancerFeignClient)client).getDelegate();
}
builder.client(client);
}
Targeter targeter = get(context, Targeter.class);
// jdk 动态代理生成代理实例
return targeter.target(this, builder, context, new HardCodedTarget<>(
this.type, this.name, url));
}

再一步步跟进去会发现是ReflectiveFeign 类的newInstance方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
   @Override
public <T> T newInstance(Target<T> target) {
Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();

for (Method method : target.type().getMethods()) {
if (method.getDeclaringClass() == Object.class) {
continue;
} else if(Util.isDefault(method)) {
DefaultMethodHandler handler = new DefaultMethodHandler(method);
defaultMethodHandlers.add(handler);
methodToHandler.put(method, handler);
} else {
methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
}
}
// jdk动态代理
InvocationHandler handler = factory.create(target, methodToHandler);
T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(), new Class<?>[]{target.type()}, handler);
// 注入handler,拦截器
for(DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
defaultMethodHandler.bindTo(proxy);
}
return proxy;
}

SynchronousMethodHandler的invoke方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 @Override
public Object invoke(Object[] argv) throws Throwable {
// 请求模板
RequestTemplate template = buildTemplateFromArgs.create(argv);
// 重试配置类,默认5次
Retryer retryer = this.retryer.clone();
while (true) {
try {
// 发送请求
return executeAndDecode(template);
} catch (RetryableException e) {
retryer.continueOrPropagate(e);
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
continue;
}
}
}

  • 负载均衡
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
       Object executeAndDecode(RequestTemplate template) throws Throwable {
    Request request = targetRequest(template);
    if (logLevel != Logger.Level.NONE) {
    logger.logRequest(metadata.configKey(), logLevel, request);
    }
    Response response;
    long start = System.nanoTime();
    try {
    // 发送请求,默认开启了负载,调用的是LoadBalancerFeignClient 的execute方法
    response = client.execute(request, options);
    // ensure the request is set. TODO: remove in Feign 10
    response.toBuilder().request(request).build();
    } catch (IOException e) {
    if (logLevel != Logger.Level.NONE) {
    logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
    }
    throw errorExecuting(request, e);
    }
    // 返回结果状态判断和关闭连接
    ···
    }
    }

    @Override
    public Response execute(Request request, Request.Options options) throws IOException {
    try {
    URI asUri = URI.create(request.url());
    String clientName = asUri.getHost();
    URI uriWithoutHost = cleanUrl(request.url(), clientName);
    FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
    this.delegate, request, uriWithoutHost);
    IClientConfig requestConfig = getClientConfig(options, clientName);
    // 核心,用负载均衡器执行
    return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
    requestConfig).toResponse();
    }
    catch (ClientException e) {
    IOException io = findIOException(e);
    if (io != null) {
    throw io;
    }
    throw new RuntimeException(e);
    }
    }

    public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
    LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
    try {
    // 命令模式构建
    return command.submit(
    new ServerOperation<T>() {
    @Override
    public Observable<T> call(Server server) {
    URI finalUri = reconstructURIWithServer(server, request.getUri());
    S requestForServer = (S) request.replaceUri(finalUri);
    try {
    return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
    } catch (Exception e) {
    return Observable.error(e);
    }
    }
    })
    .toBlocking()
    .single();
    } catch (Exception e) {
    Throwable t = e.getCause();
    if (t instanceof ClientException) {
    throw (ClientException) t;
    } else {
    throw new ClientException(e);
    }
    }
    }



    public Observable<T> submit(final ServerOperation<T> operation) {
    。。。
    // 重点方法selectServer
    Observable<T> o =
    (server == null ? selectServer() : Observable.just(server))
    .concatMap(new Func1<Server, Observable<T>>() {
    @Override
    // Called for each server being selected
    public Observable<T> call(Server server) {
    context.setServer(server);
    。。。

    }
    }

    private Observable<Server> selectServer() {
    return Observable.create(new OnSubscribe<Server>() {
    @Override
    public void call(Subscriber<? super Server> next) {
    try {
    // 交给loadBalancerContext 来处理
    Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
    next.onNext(server);
    next.onCompleted();
    } catch (Exception e) {
    next.onError(e);
    }
    }
    });
    }

整个负载还是比较复杂的,继续跟踪下去太多,大家可以自行看源码。

  • 熔断

jwt权限

不知道具体业务,只能做个demo

eureka 示例优化