eureka 原理
1、鸟瞰
每个eureka客户端都需要添加@EnableDiscoverClient注解,作用是开启一个DiscoveryClient客户端,具体实现是在eureka相关配置类加载的时候调用了DiscoveryClient构造方法来初始化的
1
2
3
4
5
6
7
8
9
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import({EnableDiscoveryClientImportSelector.class})
public @interface EnableDiscoveryClient {
boolean autoRegister() default true;
}
看看DiscoveryClient它的注解
构造函数1
2
3
4
5
6
7 @Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
...
initScheduledTasks();//初始化定时任务(注册、续约、取消续约、拉取服务)
...
}
1 | private void initScheduledTasks() { |
客户端
注册&续约
1 | if (clientConfig.shouldRegisterWithEureka()) { |
我们来看看InstanceInfoReplicator 这个类的start和run方法
1 | public void start(int initialDelayMs) { |
DiscoveryClient里面的register注册方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14boolean register() throws Throwable {
logger.info(PREFIX + appPathIdentifier + ": registering service...");
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
实际上它是调用了 RetryableEurekaHttpClient 的execute 方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47@Override
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
List<EurekaEndpoint> candidateHosts = null;
int endpointIdx = 0;
// 循环次数,默认3次,成功就reutrn
for (int retry = 0; retry < numberOfRetries; retry++) {
EurekaHttpClient currentHttpClient = delegate.get();
EurekaEndpoint currentEndpoint = null;
if (currentHttpClient == null) {
if (candidateHosts == null) {
// 获取注册中心的节点(全部和无效取交集)
candidateHosts = getHostCandidates();
if (candidateHosts.isEmpty()) {
throw new TransportException("There is no known eureka server; cluster server list is empty");
}
}
if (endpointIdx >= candidateHosts.size()) {
throw new TransportException("Cannot execute request on any known server");
}
// 只获取了一个节点
currentEndpoint = candidateHosts.get(endpointIdx++);
currentHttpClient = clientFactory.newClient(currentEndpoint);
}
try {
// 发送请求注册
EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
delegate.set(currentHttpClient);
if (retry > 0) {
logger.info("Request execution succeeded on retry #{}", retry);
}
// 直接reutrn 了
return response;
}
logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
} catch (Exception e) {
logger.warn("Request execution failed with message: {}", e.getMessage()); // just log message as the underlying client should log the stacktrace
}
// Connection error or 5xx from the server that must be retried on another server
delegate.compareAndSet(currentHttpClient, null);
if (currentEndpoint != null) {
// 无效的节点
quarantineSet.add(currentEndpoint);
}
}
throw new TransportException("Retry limit reached; giving up on completing the request");
}
代码太多就不继续跟进了,实质是向注册中心的一个节点发送POST方法向注册中心注册,那么注册中心是怎么处理这个请求的呢,后面有分析。
同理我们看看DiscoveryClient里面的renew注册方法,不再具体分析
1 | boolean renew() { |
服务获取
这个比较简单,也不再赘述,按照注册思路可以很容易看明白1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
// 这个线程为实现拉取服务主要工作
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// 内部类,一个线程
class CacheRefreshThread implements Runnable {
public void run() {
// 真正实现功能方法
refreshRegistry();
}
}
服务端
- 接受注册
怎么找到接受注册的类呢,我们看看日志
1 | @Override |
父类的regiser方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
//首先根据appName获取一些列的服务实例对象,如果为Null,则新创建一个map并把当前的注册应用程序信息添加到此Map当中
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
... 注册过程
} finally {
read.unlock();
}
}
同步到其他节点
1 | private void replicateToPeers(Action action, String appName, String id, |
其他源码不讲了,大致差不多的思路。
feign 机制原理
使用
- 在启动类上新增@EnableFeignClients注解
- 在service接口上添加@FeignClient(“注册到eureka服务名字”)
- service方法上添加 @RequestMapping(提供者controller的url)
1 | @SpringBootApplication |
原理
- 通过EnableFeignClients 注解开启FeignClien,他会扫描带有@FeignClient的注解,注入到IOC容器。
1 | public void registerFeignClients(AnnotationMetadata metadata, BeanDefinitionRegistry registry) { |
工厂Bean我们都知道通过getObject方法获取的是具体的Bean实例,不是工厂本身
1 | @Override |
再一步步跟进去会发现是ReflectiveFeign 类的newInstance方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 @Override
public <T> T newInstance(Target<T> target) {
Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();
for (Method method : target.type().getMethods()) {
if (method.getDeclaringClass() == Object.class) {
continue;
} else if(Util.isDefault(method)) {
DefaultMethodHandler handler = new DefaultMethodHandler(method);
defaultMethodHandlers.add(handler);
methodToHandler.put(method, handler);
} else {
methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
}
}
// jdk动态代理
InvocationHandler handler = factory.create(target, methodToHandler);
T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(), new Class<?>[]{target.type()}, handler);
// 注入handler,拦截器
for(DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
defaultMethodHandler.bindTo(proxy);
}
return proxy;
}
SynchronousMethodHandler的invoke方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 @Override
public Object invoke(Object[] argv) throws Throwable {
// 请求模板
RequestTemplate template = buildTemplateFromArgs.create(argv);
// 重试配置类,默认5次
Retryer retryer = this.retryer.clone();
while (true) {
try {
// 发送请求
return executeAndDecode(template);
} catch (RetryableException e) {
retryer.continueOrPropagate(e);
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
continue;
}
}
}
- 负载均衡
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106Object executeAndDecode(RequestTemplate template) throws Throwable {
Request request = targetRequest(template);
if (logLevel != Logger.Level.NONE) {
logger.logRequest(metadata.configKey(), logLevel, request);
}
Response response;
long start = System.nanoTime();
try {
// 发送请求,默认开启了负载,调用的是LoadBalancerFeignClient 的execute方法
response = client.execute(request, options);
// ensure the request is set. TODO: remove in Feign 10
response.toBuilder().request(request).build();
} catch (IOException e) {
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
}
throw errorExecuting(request, e);
}
// 返回结果状态判断和关闭连接
···
}
}
@Override
public Response execute(Request request, Request.Options options) throws IOException {
try {
URI asUri = URI.create(request.url());
String clientName = asUri.getHost();
URI uriWithoutHost = cleanUrl(request.url(), clientName);
FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
this.delegate, request, uriWithoutHost);
IClientConfig requestConfig = getClientConfig(options, clientName);
// 核心,用负载均衡器执行
return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
requestConfig).toResponse();
}
catch (ClientException e) {
IOException io = findIOException(e);
if (io != null) {
throw io;
}
throw new RuntimeException(e);
}
}
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
// 命令模式构建
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
} catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}
}
public Observable<T> submit(final ServerOperation<T> operation) {
。。。
// 重点方法selectServer
Observable<T> o =
(server == null ? selectServer() : Observable.just(server))
.concatMap(new Func1<Server, Observable<T>>() {
@Override
// Called for each server being selected
public Observable<T> call(Server server) {
context.setServer(server);
。。。
}
}
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
@Override
public void call(Subscriber<? super Server> next) {
try {
// 交给loadBalancerContext 来处理
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}
整个负载还是比较复杂的,继续跟踪下去太多,大家可以自行看源码。
- 熔断
jwt权限
不知道具体业务,只能做个demo。