文章有点长,亲,要慢慢看!
1. 概述
1.1 注册中心作用
- 在Dubbo中,注册中心为核心模块,Dubbo通过注册中心实现各个服务之间的注册与发现等功能,而本次源码的分析为registry模块的api和zookeeper的实现。
- 服务的提供者和消费者都需要把自己注册到注册中心,提供者让消费者感知到服务存在,从而消费者发起远程调用,也让服务治理中心感知到有服务提供者上线;消费者则是让服务治理中心可以发现自己。
1.2 Zookeeper
- Zookeeper是一个提供分布式协调服务的开源软件,常用于解决分布式应用中经常遇到的一些数据管理问题。Zookeeper功能非常强大,可以实现如分布式应用配置管理、统一命名服务、状态同步服务、集群管理等功能。关于Zookeeper,大家如果想了解可以关注一下自行去搜索一下。
1.3 registry模块
-
整个registry下的模块
api是注册中心所有的API和抽象类实现
default是注册中心的内存实现
zookeeper、redis、nacos就是基于不同的组件的实现
multicast是通过广播实现
1.4 注册中心工作流程
这张图相信只要是用过的都不陌生,挂在dubbo.io的官网挂了很久很久了。那么这个流程主要是说了什么呢?
- 0.是生产者(服务提供方)初始化,就好比你写了个服务实现然后启动起来。
- 1.是服务提供方向启动器起来过后,就会向注册中心提交自己的服务信息
- 2.是消费者(服务消费方)向注册中心提交订阅请求。就是你写了一个业务需要用到一个生产者服务,这个时候你需要提前打招呼,我需要它,有它的消息的时候让注册中心告诉你他的信息。
- 3.这个时候当服务提供者离开或者是有新的服务提供者加入,注册中心就会将变化的信息发送给消费者。
- 4.消费者知道了生产者的信息,要用的时候就直接调用,注意这里的调用是不经过注册中心的,而是直接同步的网络调用。
2. dubbo-registry-api
- api层主要是注册中心所有API的抽象实现类,并不是实际提供服务的组件。
-
模块关系图
-
类关系图
-
目录结构
2.1 Registry的相关实现
- 由类的关系图科看到Registry的实现关系,我们接下来就分析下各个接口和这个类
2.1.1 RegistryService
- 注册中心模块的服务接口:提供了注册、取消注册、订阅、取消订阅、查询符合条件的已注册数据。
- 虽然官方有解释这个的地方但是还是复制一下方法解释如下,官方地址是:http://dubbo.apache.org/zh-cn/docs/dev/impls/registry.html
public interface RegistryService {
/**
* 注册服务.
* @param url 注册信息,不允许为空,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
*/
void register(URL url);
/**
* 取消注册服务.
* @param url 注册信息,不允许为空,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
*/
void unregister(URL url);
/**
* 订阅服务.
* @param listener 变更事件监听器,不允许为空
*/
void subscribe(URL url, NotifyListener listener);
/**
* 取消订阅服务.
* @param url 订阅条件,不允许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
* @param listener 变更事件监听器,不允许为空
*/
void unsubscribe(URL url, NotifyListener listener);
/**
* 查询注册列表,与订阅的推模式相对应,这里为拉模式,只返回一次结果。
*
* @see org.apache.dubbo.registry.NotifyListener#notify(List)
* @param url 查询条件,不允许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
* @return 已注册信息列表,可能为空,含义同{@link org.apache.dubbo.registry.NotifyListener#notify(List<URL>)}的参数。
*/
List<URL> lookup(URL url);
}
2.1.2 Node (不在api中定义,在common模块中)
- 节点的接口 里面声明了一些关于节点的操作方法
public interface Node {
/**
* 获取节点Url
*/
URL getUrl();
/**
* 是否可用
*/
boolean isAvailable();
/**
* 销毁节点
*/
void destroy();
}
2.1.2 Registry
- 这个接口其实就是把节点以及注册中心服务的方法放在了一起
public interface Registry extends Node, RegistryService {
}
2.1.3 AbstractRegistry
- AbstractRegistry实现了Registry接口,为减轻注册中心的压力,在该类中实现了把本地URL缓存到property文件中的机制,并且实现了注册中心的注册、订阅等方法。
- 看下类图
- 首先是抽象类的属性
// url地址分隔符,用于文件缓存,服务提供程序url分隔
private static final char URL_SEPARATOR = ' ';
// URL地址分隔的正则表达式,用于分析文件缓存中的服务提供程序URL列表
private static final String URL_SPLIT = "\\s+";
// 日志输出
protected final Logger logger = LoggerFactory.getLogger(getClass());
// 本地磁盘缓存,其中的特殊key为registies是记录注册表中心列表,其他是服务提供者的李彪
private final Properties properties = new Properties();
// 文件缓存写入执行器 提供一个线程的线程池
private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
// 是否同步保存文件标志
private final boolean syncSaveFile;
// 这个是缓存的版本号
private final AtomicLong lastCacheChanged = new AtomicLong();
// 这个是已经注册的URL集合,不仅仅是服务提供者的,也可以是服务消费者的
private final Set<URL> registered = new ConcurrentHashSet<URL>();
// 已订阅的url 值为url的监听器集合
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
// 消费者或服务治理服务获取注册信息后的缓存对象
// 内存中服务器缓存的notified对象是ConcurrentHashMap里面嵌套了一个Map,
// 外层Map的Key是消费者的URL,
// 内层的Map的key是分类,包括provider,consumer,routes,configurators四种,
// value则对应服务列表,没有服务提供者提供服务的URL,会以一个特别的empty://前缀开头
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();
// 注册中心的URL
private URL registryUrl;
// 本地磁盘缓存文件保存的是注册中心的数据
private File file;
2.1.3.1 构造方法
public AbstractRegistry(URL url) {
// 设置注册中心的地址URL
setUrl(url);
// 从URL参数中获取是否同步保存的状态,URL中如果不包含,那就设置默认值为false
syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
// 获取文件路径
String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
// 开始读入文件
File file = null;
// 不存在就抛出异常
if (ConfigUtils.isNotEmpty(filename)) {
file = new File(filename);
if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
if (!file.getParentFile().mkdirs()) {
throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
}
}
}
// 把文件对象放到属性上
this.file = file;
// 加载文件中的参数放入Properties,Properties继承HashTable。
loadProperties();
// 通知监听器 URL变化 见下面notify的源码
notify(url.getBackupUrls());
}
private void loadProperties() {
if (file != null && file.exists()) {
InputStream in = null;
try {
// 把文件中的key-value读进来
in = new FileInputStream(file);
// Properties是一个继承HashTable的类.
// 这个地方就是按行读入,util里面的类,里面调用了一个load0 方法会把key和value做分割然后放入Properties中,。
properties.load(in);
if (logger.isInfoEnabled()) {
logger.info("Load registry store file " + file + ", data: " + properties);
}
} catch (Throwable e) {
logger.warn("Failed to load registry store file " + file, e);
} finally {
// 关闭流
if (in != null) {
try {
in.close();
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
}
2.1.3.2 lookup
- 获得消费者url订阅的服务URL列表
@Override
public List<URL> lookup(URL url) {
// 查找的结果数据
List<URL> result = new ArrayList<URL>();
// 获取注册信息中的分类服务列表信息
Map<String, List<URL>> notifiedUrls = getNotified().get(url);
// 如果该消费者订阅了服务
if (notifiedUrls != null && notifiedUrls.size() > 0) {
for (List<URL> urls : notifiedUrls.values()) {
for (URL u : urls) {
// 把非空的加入结果集中
if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
result.add(u);
}
}
}
} else {
// 如果没有订阅服务
// 使用原子类以保证在获取注册在注册中心的服务url时能够保证是最新的url集合
final AtomicReference<List<URL>> reference = new AtomicReference<List<URL>>();
// 通知监听器。当收到服务变更通知时触发
NotifyListener listener = new NotifyListener() {
@Override
public void notify(List<URL> urls) {
reference.set(urls);
}
};
// 添加这个服务的监听器
subscribe(url, listener); // Subscribe logic guarantees the first notify to return
List<URL> urls = reference.get();
// 然后把非空结果放入结果集中
if (urls != null && !urls.isEmpty()) {
for (URL u : urls) {
if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
result.add(u);
}
}
}
}
return result;
}
2.1.3.3 register and unregister
- url注册和取消注册代码很简单,就是向registered中add或者remove url
@Override
public void register(URL url) {
if (url == null) {
throw new IllegalArgumentException("register url == null");
}
if (logger.isInfoEnabled()) {
logger.info("Register: " + url);
}
registered.add(url);
}
@Override
public void unregister(URL url) {
if (url == null) {
throw new IllegalArgumentException("unregister url == null");
}
if (logger.isInfoEnabled()) {
logger.info("Unregister: " + url);
}
registered.remove(url);
}
2.1.3.4 notify
protected void notify(List<URL> urls) {
if (urls == null || urls.isEmpty()) return;
// 遍历已订阅的URL
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL url = entry.getKey();
if (!UrlUtils.isMatch(url, urls.get(0))) {
continue;
}
// 通知URL对应的监听器
Set<NotifyListener> listeners = entry.getValue();
if (listeners != null) {
for (NotifyListener listener : listeners) {
try {
// 通知监听器,看下方代码注释
notify(url, listener, filterEmpty(url, urls));
} catch (Throwable t) {
logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
}
}
}
}
}
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((urls == null || urls.isEmpty())
&& !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
// 将url进行分类
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
// 根据不同的category分别放到不同List中处理 以category的值做分类
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList<URL>();
result.put(category, categoryList);
}
categoryList.add(u);
}
}
// 没有分类结果就直接return
if (result.size() == 0) {
return;
}
// 获得消费者被通知的url的Map
Map<String, List<URL>> categoryNotified = notified.get(url);
// 如果没有 就创建一个
if (categoryNotified == null) {
notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
// 创建过后再获取
categoryNotified = notified.get(url);
}
// 发送URL变化给监听器
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
// 把分类标实和分类后的列表放入notified的value中 覆盖到 `notified`
// 当分类的数据为空,依然有urls 。不过其中的urls[0].protocol是empty,以此来处理所有服务提供者为空时的情况。
categoryNotified.put(category, categoryList);
// 保存一份到文件缓存中 中间做的 就是解析出参数然后同步或者异步保存到文件中
saveProperties(url);
// 通知监听器
listener.notify(categoryList);
}
}
2.1.3.5 subscribe and unsubscribe
- 注册中心服务实现的订阅和取消订阅
@Override
public void subscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("subscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("subscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Subscribe: " + url);
}
// 获得url已经订阅的服务的监听器集合
Set<NotifyListener> listeners = subscribed.get(url);
if (listeners == null) {
subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
listeners = subscribed.get(url);
}
// 然后把listener添加到上
listeners.add(listener);
}
@Override
public void unsubscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("unsubscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("unsubscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Unsubscribe: " + url);
}
// 获得url已经订阅的服务的监听器集合
Set<NotifyListener> listeners = subscribed.get(url);
if (listeners != null) {
// 然后移除
listeners.remove(listener);
}
}
2.1.3.6 recover
- 注册中心的连接断开后恢复时调用的方法,里面其实就是注册和订阅
protected void recover() throws Exception {
// register
Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
if (!recoverRegistered.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover register url " + recoverRegistered);
}
for (URL url : recoverRegistered) {
//调用的上面的注册方法
register(url);
}
}
// subscribe
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!recoverSubscribed.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover subscribe url " + recoverSubscribed.keySet());
}
for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
// 调用上面的订阅方法
subscribe(url, listener);
}
}
}
}
2.1.3.7 destory
- 这个方法是在进程关闭时,去取消注册和订阅,实际上就是调用unregister和unsubscribe
@Override
public void destroy() {
if (logger.isInfoEnabled()) {
logger.info("Destroy registry:" + getUrl());
}
// 获取以注册的URL
Set<URL> destroyRegistered = new HashSet<URL>(getRegistered());
if (!destroyRegistered.isEmpty()) {
for (URL url : new HashSet<URL>(getRegistered())) {
if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
try {
// 取消注册
unregister(url);
if (logger.isInfoEnabled()) {
logger.info("Destroy unregister url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
// 获取已订阅的URL以及监听器
Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!destroySubscribed.isEmpty()) {
for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
try {
// 去取消订阅
unsubscribe(url, listener);
if (logger.isInfoEnabled()) {
logger.info("Destroy unsubscribe url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
}
2.1.4 FailbackRegistry
- 这个类其实是为AbstractRegistry增加了失败重试的机制作为抽象能力,后面不同的注册中心具体实现继承了这个类就可以直接使用这个能力。
-
类图
- 常规套路 类的属性
// Scheduled executor service
// 经过固定时间后(默认是5s),调用FailbackRegistry#retry方法
private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));
// Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
// 失败重试计时器,定期检查是否有失败请求,如果有,则无限制重试
private final ScheduledFuture<?> retryFuture;
// 注册失败的集合
private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();
// 取消注册失败的集合
private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();
// 发起订阅失败的监听器集合
private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
// 取消订阅失败的监听器集合
private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
// 通知失败的URL集合
private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();
/**
* The time in milliseconds the retryExecutor will wait
* RetryExecutor将等待的时间(毫秒)
*/
private final int retryPeriod;
2.1.4.1 构造方法
public FailbackRegistry(URL url) {
super(url);
// 获取重试的时间 如果没有就设置成默认的 DEFAULT_REGISTRY_RETRY_PERIOD = 5 * 1000;
this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
// 设置重试任务 里面就是调用retry方法 见下方retry方法的解析
this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
// Check and connect to the registry
try {
retry();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
}
}
}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}
2.1.4.2 register and unregister 、 subscribe and unsubscribe
- 注册和取消注册
@Override
public void register(URL url) {
// 缓存等注册操作 见AbstractRegistry
super.register(url);
// 在失败集合中将这个移除
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// Sending a registration request to the server side
// 向服务器发送注册请求
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
// 开启了启动时就检测,直接抛异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
// 记录失败的url
failedRegistered.add(url);
}
}
- 后面的unregister方法,subscribe unsubscribe都类似 可以看下源码, 中间的doXXXX这几个方法都是abstract方法等着后面不同的服务来实现。
2.1.4.3 notify
- notify则与上面的 四个方法不同,它是默认调用的父类AbstractRegistry的notify方法
@Override
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
doNotify(url, listener, urls);
} catch (Exception t) {
// Record a failed registration request to a failed list, retry regularly
// 将失败的注册请求记录到失败列表,定期重试
Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
if (listeners == null) {
failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
listeners = failedNotified.get(url);
}
listeners.put(listener, urls);
logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
// 注意 这个是调用父类的
super.notify(url, listener, urls);
}
2.1.4.4 recover
- recover方法也区别于AbstractRegistry,他是直接添加到失败重试的集合中,让定时任务自己去重新注册和订阅
@Override
protected void recover() throws Exception {
// register
// 把已注册的添加到失败重试的列表中
Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
if (!recoverRegistered.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover register url " + recoverRegistered);
}
for (URL url : recoverRegistered) {
// 添加到失败重试注册列表
failedRegistered.add(url);
}
}
// subscribe
// 把已订阅的添加到失败重试的列表中
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!recoverSubscribed.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover subscribe url " + recoverSubscribed.keySet());
}
for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
// 添加到失败重试订阅列表
addFailedSubscribed(url, listener);
}
}
}
}
2.1.4.5 retry
- 重试的方法,其实也比较简单,就是把集合中的数据拿出来,该做注册做注册,该订阅就订阅,成功了就从失败重试集合中移除,失败了就等下次再来。简单看下对注册列表的代码就明白了。其他代码都是类似的
// Retry the failed actions
protected void retry() {
if (!failedRegistered.isEmpty()) {
// 不为空就把他URL拿到
Set<URL> failed = new HashSet<URL>(failedRegistered);
if (failed.size() > 0) {
if (logger.isInfoEnabled()) {
logger.info("Retry register " + failed);
}
try {
// 然后遍历它 做对应的操作
for (URL url : failed) {
try {
// 做注册操作
doRegister(url);
// 移除失败集合中URL
failedRegistered.remove(url);
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
......
}
2.1.4.6 destroy
@Override
public void destroy() {
// 调用父类的方法
super.destroy();
try {
// 取消执行任务
retryFuture.cancel(true);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
ExecutorUtil.gracefulShutdown(retryExecutor, retryPeriod);
}
2.1.4.7 待实现的方法
- 这些方法都是交给不同的服务提供组件去自己实现的,后面的Zookeeper就针对这些方法做了实现。
// ==== Template method ====
protected abstract void doRegister(URL url);
protected abstract void doUnregister(URL url);
protected abstract void doSubscribe(URL url, NotifyListener listener);
protected abstract void doUnsubscribe(URL url, NotifyListener listener);
2.2 Registry的相关Factory的实现
- 注册中心的工厂类,顾名思义就是生产上面的Registry的实现。
2.2.1 RegistryFactory
@SPI("dubbo")
public interface RegistryFactory {
// 这个接口方法实际上就是获取对注册中心的连接,然后返回不同注册中心的不同Regsitry的实现对象,
// 注解就是根据设置不同的protocol(协议)来选择不同的实现,
// 比如Zookeeper,就会去使用Zookeeper的ZookeeperRegistryFactory,具体怎么选择,后续博客再写
@Adaptive({"protocol"})
Registry getRegistry(URL url);
}
2.2.2 AbstractRegistryFactory
- 类图
- 这个抽象类还是相对来说比较简答的。咱们看一下他的类属性
// 注册中心获取过程的锁
private static final ReentrantLock LOCK = new ReentrantLock();
// 注册中心Map<注册地址,registry> 一个类的缓存。
private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>();
2.2.2.1 getRegistryies
- 获取所有的registry对象
/**
* Get all registries
* 获取所有的registry对象
* @return all registries
*/
public static Collection<Registry> getRegistries() {
//得到一个集合的镜像,它的返回结果不可直接被改变,否则会报错
return Collections.unmodifiableCollection(REGISTRIES.values());
}
2.2.2.2 destoryAll
- 关闭所有已创建的registry对象
/**
* Close all created registries
* 关闭所有已创建的registry对象
*/
// TODO: 2017/8/30 to move somewhere else better
public static void destroyAll() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Close all registries " + getRegistries());
}
//对注册中心关闭操作加锁
LOCK.lock();
try {
// 遍历所有的注册中心的操作类,然后调用destroy来销毁。
for (Registry registry : getRegistries()) {
try {
registry.destroy();
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
}
}
// 然后清除集合
REGISTRIES.clear();
} finally {
// Release the lock
LOCK.unlock();
}
}
2.2.2.3 getRegistry
- 获取对应注册中心的操作实现类
@Override
public Registry getRegistry(URL url) {
// 通过URL来获取到注册中心的类型
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceStringWithoutResolving();
// 锁定注册中心访问进程以确保注册表的单个实例
LOCK.lock();
try {
// 通过key来拿到对应的注册中心的操作类
Registry registry = REGISTRIES.get(key);
// 有就直接返回
if (registry != null) {
return registry;
}
// 没有就创建对应的注册中心操作类
registry = createRegistry(url);
// 如果创建失败,报错
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
// 创建成功就放到结合中
REGISTRIES.put(key, registry);
// 然后再返回
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
2.2.2.4 createRegistry
- 抽象方法,没有实现,需要不同的服务提供工厂对象来自己实现对应的创建方法
protected abstract Registry createRegistry(URL url);
2.3 Consumer And Provider InvokerWrapper
- 实现Invoker接口,主要包装消费者和服务提供者的属性
- 主要为QOS提供服务 官方地址:http://dubbo.apache.org/zh-cn/docs/user/references/qos.html
- 什么是QOS? qos-server,是dubbo在线运维命令服务,默认端口号为:2222,用于接口命令,运维dubbo。
2.3.1 ConsumerInvokerWrapper
// invoker对象
private Invoker<T> invoker;
// 原始的URL地址
private URL originUrl;
// 注册中心的地址
private URL registryUrl;
// 消费者的地址
private URL consumerUrl;
// 注册中心的Directory
private RegistryDirectory registryDirectory;
2.3.2 ProviderInvokerWrapper
// invoker对象
private Invoker<T> invoker;
// 原始的URL地址
private URL originUrl;
// 注册中心的地址
private URL registryUrl;
// 提供者的地址
private URL providerUrl;
// 是否注册
private volatile boolean isReg;
2.4 ProviderConsumerRegTable
- 这个类是消费者和服务提供者的注册表操作,也是用在QOS中。
- 主要类属性
// 服务提供者的Invokers集合
public static ConcurrentHashMap<String, Set<ProviderInvokerWrapper>> providerInvokers = new ConcurrentHashMap<String, Set<ProviderInvokerWrapper>>();
// 服务消费者的Invokers集合
public static ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>> consumerInvokers = new ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>>();
- 类图
- 里面就是一些对类属性集合的操作,主要是QOS会用。
2.5 RegistryStatusChecker
- 这个类就一个方法 check方法,主要是做状态校验。做注册中心相关的状态检查校验
- 类上面的@Activate 注解 使这个类自动被激活加载。
@Override
public Status check() {
// 获取所有的注册中心的对象
Collection<Registry> registries = AbstractRegistryFactory.getRegistries();
if (registries.isEmpty()) {
return new Status(Status.Level.UNKNOWN);
}
Status.Level level = Status.Level.OK;
StringBuilder buf = new StringBuilder();
// 遍历
for (Registry registry : registries) {
if (buf.length() > 0) {
buf.append(",");
}
// 把地址拼接到一起
buf.append(registry.getUrl().getAddress());
// 如果注册中心的某个节点不可用就把状态设置成error
if (!registry.isAvailable()) {
level = Status.Level.ERROR;
buf.append("(disconnected)");
} else {
buf.append("(connected)");
}
}
// 然后返回价差的结果对象
return new Status(level, buf.toString());
}
2.5 RegistryDirectory and RegistryProtocol
- 这两个类后续再说。牵涉到其他地方的一些东西。
3. dubbo-registry-zookeeper
不知道大家看到这里有没有忘记这张图
-
模块关系图
所有的注册中心实现FailbackRegistry 和 AbstractRegistryFactory来实现对应的功能。
那么Zookeeper也是如此。Zookeeper主要就只有两个类
- 是ZookeeperRegistry
- 是ZookeeperRegistryFactory来实现对应的功能
3.1 Dubbo在Zookeeper中的数据结构
- dubbo在使用Zookeeper时只会创建永久节点和临时节点。
- 根节点是注册中心分组,下面是很多的服务接口,分组来自用户配置的<dubbo:registry>中的group属性,默认是/dubbo。
- 服务接口下是如图所示的四种服务目录,都是持久节点。
- 服务提供者路径/dubbo/service/providers (这里方便标识全部都用service替代接口com.demo.DemoService),下面包含接口的多个服务提供者者的URL元数据信息。
- 服务提供者路径/dubbo/service/consumers,下面包含接口有多个消费者的URL元数据信息
- 服务提供者路径/dubbo/service/routers,下面包含多个用于消费者路由策略URL元数据信息。
- 服务提供者路径/dubbo/service/configurators,下面包含多个用于服务提供者动态配置的URL元数据信息。
在Dubbo框架启动时会根据我们所写的服务相关的配置在注册中心创建4个目录,在providers和consumers目录中分别存储服务提供方、消费方元数据信息。包括:IP、端口、权重和应用名等数据。
- 目录包含信息
目录名称 | 存储值样例 |
---|---|
/dubbo/service/providers | dubbo://192.168.1.1.20880/com.demo.DemoService?key=value&... |
/dubbo/service/consumers | dubbo://192.168.1.1.5002/com.demo.DemoService?key=value&... |
/dubbo/service/routers | condition://0.0.0.0/com.demo.DemoService?category=routers&key=value&... |
/dubbo/service/configurators | override://0.0.0.0/com.demo.DemoService?category=configurators&key=value&... |
- 在Dubbo中启用注册中心:
<beans>
<!-- 适用于Zookeeper一个集群有多个节点,多个IP和端口用逗号分隔-->
<dubbo:registry protocol="zookeeper" address="ip:port;ip:port">
<!-- 适用于Zookeeper多个集群有多个节点,多个IP和端口用竖线分隔-->
<dubbo:registry protocol="zookeeper" address="ip:port|ip:port">
</beans>
3.2 ZookeeperRegistry
- 惯例给大家一张类图
- 然后看下属性
// Zookeeper的默认端口号
private final static int DEFAULT_ZOOKEEPER_PORT = 2181;
// Dubbo在Zookeeper中注册的默认根节点
private final static String DEFAULT_ROOT = "dubbo";
// 组的名称 或者说是 根节点的值
private final String root;
// 服务集合
private final Set<String> anyServices = new ConcurrentHashSet<String>();
// zk节点的监听器
// Dubbo底层封装了2套Zookeeper API,所以通过ChildListener抽象了监听器,
// 但是在实际调用时会通过createTargetChildListener转为对应框架的监听器实现
private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();
// zk的客户端, 对节点进行一些删改等操作
private final ZookeeperClient zkClient;
- 关于Dubbo中的Zookeeper客户端,Dubbo实现了一个统一的Client API,但是用两种不同的Zookeeper开源库来实现,一个是Apache的Curator,另一个是zkClient 如果用户不设置,则默认使用Curator实现。
3.2.1 构造方法
- 构造方法比较简单,就是获取组名,连接Zookeeper
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
// 调用FailbackRegistry的构造方法
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 获取组名称 并复制给root
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
// 连接上Zookeeper
zkClient = zookeeperTransporter.connect(url);
// 添加连接状态监听器
zkClient.addStateListener(new StateListener() {
@Override
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
// 重连恢复
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}
3.2.2 服务注册发布与服务下线取消注册
- 也比较简单就是创建节点和删除节点
// 发布
@Override
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
// 取消发布
@Override
protected void doUnregister(URL url) {
try {
zkClient.delete(toUrlPath(url));
} catch (Throwable e) {
throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
3.2.3 服务订阅和取消订阅
- 订阅有pull和push两种方式,一种是客户端定时轮询注册中心拉去配置,另一种是注册中心主动推送数据给客户端。Dubbo目前采用的是第一次启动拉取然后接受事件再重新拉取。
- 再暴露服务的时候,服务端会订阅configurators监听动态配置,消费端启动的时候回订阅providers、routers、configurators类接收这三者的变更通知。
- Dubbo在实现Zookeeper注册中心的时候是,客户端第一次连接获取全量数据,然后在订阅节点上注册一个watcher,客户端与注册中心之间保持TCP长连接,后续有节点发生变化则会触发watcher事件来把对应节点下的全量数据拉取过来。
3.2.3.1 doSubscribe
@Override
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
// 订阅所有数据
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
// 为空则把listeners放入到缓存的Map中
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
// 创建子节点监听器,对root下的子节点做监听,一旦有子节点发生改变,
// 那么就对这个节点进行订阅.
if (zkListener == null) {
// zkListener为空说明是第一次拉取,则新建一个listener
listeners.putIfAbsent(listener, new ChildListener() {
// 节点变更时,触发通知时执行
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
for (String child : currentChilds) {
// 遍历所有节点
child = URL.decode(child);
// 如果有子节点还未被订阅贼说明是新节点,
if (!anyServices.contains(child)) {
// 加入到集合中
anyServices.add(child);
//就订阅之
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
}
});
zkListener = listeners.get(listener);
}
// 创建持久节点root,接下来订阅持久节点的子节点
zkClient.create(root, false);
// 添加root节点的子节点监听器,并返回当前的services
List<String> services = zkClient.addChildListener(root, zkListener);
if (services != null && !services.isEmpty()) {
// 遍历所有的子节点进行订阅
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
// 增加当前节点的订阅,并且会返回改节点下所有子节点的列表
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
// 订阅类别服务
} else {
List<URL> urls = new ArrayList<URL>();
// 将url转变成
// /dubbo/com.demo.DemoService/providers
// /dubbo/com.demo.DemoService/configurators
// /dubbo/com.demo.DemoService/routers
// 根据url类别获取一组要订阅的路径
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
// 如果缓存没有,则添加到缓存中
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
// 同样如果监听器缓存中没有 则放入缓存
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
// 通知节点变化
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
zkClient.create(path, false);
// 订阅并返回该节点下的子路径并缓存
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
// 有子节点组装,没有那么就将消费者的协议变成empty作为url。
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 回调NotifyListener,更新本地缓存信息
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
3.2.3.2 doUnsubscribe
@Override
protected void doUnsubscribe(URL url, NotifyListener listener) {
// 通过url把监听器全部拿到
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners != null) {
ChildListener zkListener = listeners.get(listener);
if (zkListener != null) {
// 直接删除group下所有的
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
// 移除监听器
zkClient.removeChildListener(root, zkListener);
} else {
// 移除类别服务下的监听器
for (String path : toCategoriesPath(url)) {
zkClient.removeChildListener(path, zkListener);
}
}
}
}
}
3.2.3.3 其他
- 其他代码相对来说不是很复杂可以自行看一下。
3.3 ZookeeperRegistryFactory
- 工厂类的代码极其短,随意看下。
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
private ZookeeperTransporter zookeeperTransporter;
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
@Override
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
3.3.1 关于ZookeeperTransporter
@SPI("curator")
public interface ZookeeperTransporter {
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
ZookeeperClient connect(URL url);
}
-
上面我提到过,Dubbo用Zookeeper的时候用了两种方式实现,一个是Apache Curator,另一个是zkClient,这个类就是做看了一个转换。如下图
两个类都实现了该接口来向外提供统一的ZookeeperClient。
这个实现在remoting模块。暂时就不讲了。
4. 结语
- 整个模块,其他的Redis、Nacos等实现都是根据不同组件的特点来实现。功能都一样,只是实现不一样,大家可以自己去探索一下。
- 整个模块中我们单独看的话主要是就是一个实现,一个工厂,里面牵涉到了本地缓存、重试这些机制。代码量不是很大。认真看还是不难的。其中特别需要注意的就是注册中心的数据结构 和 发布订阅这些的实现了。
- 结语有点乱。就这样,不足之处希望留言指出,后续优化。!感谢!!!
关于我
- 坐标杭州,普通本科在读,计算机科学与技术专业,20年毕业,目前处于实习阶段。
- 主要做Java开发,会写点Golang、Shell。对微服务、大数据比较感兴趣,预备做这个方向。
- 目前处于菜鸟阶段,各位大佬轻喷,小弟正在疯狂学习。
- 欢迎大家和我交流鸭!!!
有疑问加站长微信联系(非本文作者)