Dubbo-服务注册中心之AbstractRegistry

网友投稿 301 2022-06-21


在dubbo中,关于注册中心Registry的有关实现封装在了dubbo-registry模块中。提供者(Provider)个消费者(Consumer)都是通过注册中心进行资源的调度。当服务启动时,provider会调用注册中心的register方法将自己的服务通过url的方式发布到注册中心,而consumer订阅其他服务时,会将订阅的服务通过url发送给注册中心(URL中通常会包含各种配置)。当某个服务被关闭时,它则会从注册中心中移除,当某个服务被修改时,则会调用notify方法触发所有的监听器。

首先简单介绍一下在dubbo的基本统一数据模型URL

1|0统一数据模型URL

在dubbo中定义的url与传统的url有所不同,用于在扩展点之间传输数据,可以从url参数中获取配置信息等数据,这一点很重要。

描述一个dubbo协议的服务

dubbo://192.168.1.6:20880/moe.cnkirito.sample.HelloService?timeout=3000

描述一个消费者

consumer://30.5.120.217/org.apache.dubbo.demo.DemoService?application=demo-consumer&category=consumers&check=false&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=1209&qos.port=33333&side=consumer×tamp=1545721827784

接下来将着重介绍几个重要的类。

2|0AbstractRegistry

在该类中有介个关于url的变量。

private final Set registered = new ConcurrentHashSet();

-> 记录已经注册服务的URL集合,注册的URL不仅仅可以是服务提供者的,也可以是服务消费者的。

private final ConcurrentMap> subscribed = new ConcurrentHashMap>();

-> 消费者url订阅的监听器集合

private final ConcurrentMap>> notified = new ConcurrentHashMap>>();

-> 某个消费者被通知的服务URL集合,最外部URL的key是消费者的URL,value是一个map集合,里面的map中的key为分类名,value是该类下的服务url集合。

private URL registryUrl;

-> 注册中心URL

private File file;

-> 本地磁盘缓存文件,缓存注册中心的数据

2|1初始化

public AbstractRegistry(URL url) { //1. 设置配置中心的地址 setUrl(url); //2. 配置中心的URL中是否配置了同步保存文件属性,否则默认为false syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false); //3. 配置信息本地缓存的文件名 String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache"); //逐层创建文件目录 File file = null; if (ConfigUtils.isNotEmpty(filename)) { file = new File(filename); if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) { if (!file.getParentFile().mkdirs()) { throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!"); } } } this.file = file; //如果现有配置缓存,则从缓存文件中加载属性 loadProperties(); notify(url.getBackupUrls()); }

加载本地磁盘缓存文件到内存缓存中,也就是把文件中的数据写入到properties中

private void loadProperties() { if (file != null && file.exists()) { InputStream in = null; try { in = new FileInputStream(file); // 把数据写入到内存缓存中 properties.load(in); if (logger.isInfoEnabled()) { logger.info("Load registry store file " + file + ", data: " + properties); } } catch (Throwable e) { logger.warn("Failed to load registry store file " + file, e); } finally { if (in != null) { try { in.close(); } catch (IOException e) { logger.warn(e.getMessage(), e); } } } } }

3|0注册与取消注册

对registered变量执行add和remove操作

@Override public void register(URL url) { if (url == null) { throw new IllegalArgumentException("register url == null"); } if (logger.isInfoEnabled()) { logger.info("Register: " + url); } registered.add(url); } @Override public void unregister(URL url) { if (url == null) { throw new IllegalArgumentException("unregister url == null"); } if (logger.isInfoEnabled()) { logger.info("Unregister: " + url); } registered.remove(url); }

4|0订阅与取消订阅

通过消费者url从subscribed变量中获取该消费者的所有监听器集合,然后将该监听器放入到集合中,取消同理。

@Override public void subscribe(URL url, NotifyListener listener) { if (url == null) { throw new IllegalArgumentException("subscribe url == null"); } if (listener == null) { throw new IllegalArgumentException("subscribe listener == null"); } if (logger.isInfoEnabled()) { logger.info("Subscribe: " + url); } // 获得该消费者url 已经订阅的服务 的监听器集合 Set listeners = subscribed.get(url); if (listeners == null) { subscribed.putIfAbsent(url, new ConcurrentHashSet()); listeners = subscribed.get(url); } // 添加某个服务的监听器 listeners.add(listener); } @Override public void unsubscribe(URL url, NotifyListener listener) { if (url == null) { throw new IllegalArgumentException("unsubscribe url == null"); } if (listener == null) { throw new IllegalArgumentException("unsubscribe listener == null"); } if (logger.isInfoEnabled()) { logger.info("Unsubscribe: " + url); } Set listeners = subscribed.get(url); if (listeners != null) { listeners.remove(listener); } }

5|0服务的恢复

注册的恢复包括注册服务的恢复和订阅服务的恢复,因为在内存中表留了注册的服务和订阅的服务,因此在恢复的时候会重新拉取这些数据,分别调用发布和订阅的方法来重新将其录入到注册中心中。

protected void recover() throws Exception { // register //把内存缓存中的registered取出来遍历进行注册 Set recoverRegistered = new HashSet(getRegistered()); if (!recoverRegistered.isEmpty()) { if (logger.isInfoEnabled()) { logger.info("Recover register url " + recoverRegistered); } for (URL url : recoverRegistered) { register(url); } } // subscribe //把内存缓存中的subscribed取出来遍历进行订阅 Map> recoverSubscribed = new HashMap>(getSubscribed()); if (!recoverSubscribed.isEmpty()) { if (logger.isInfoEnabled()) { logger.info("Recover subscribe url " + recoverSubscribed.keySet()); } for (Map.Entry> entry : recoverSubscribed.entrySet()) { URL url = entry.getKey(); for (NotifyListener listener : entry.getValue()) { subscribe(url, listener); } } } }

6|0通知

protected void notify(List urls) { if (urls == null || urls.isEmpty()) return; // 遍历订阅URL的监听器集合,通知他们 for (Map.Entry> entry : getSubscribed().entrySet()) { URL url = entry.getKey(); // 匹配 if (!UrlUtils.isMatch(url, urls.get(0))) { continue; } // 遍历监听器集合,通知他们 Set listeners = entry.getValue(); if (listeners != null) { for (NotifyListener listener : listeners) { try { notify(url, listener, filterEmpty(url, urls)); } catch (Throwable t) { logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t); } } } } } /** * 通知监听器,URL 变化结果 * @param url * @param listener * @param urls */ protected void notify(URL url, NotifyListener listener, List urls) { if (url == null) { throw new IllegalArgumentException("notify url == null"); } if (listener == null) { throw new IllegalArgumentException("notify listener == null"); } if ((urls == null || urls.isEmpty()) && !Constants.ANY_VALUE.equals(url.getServiceInterface())) { logger.warn("Ignore empty notify urls for subscribe url " + url); return; } if (logger.isInfoEnabled()) { logger.info("Notify urls for subscribe url " + url + ", urls: " + urls); } Map> result = new HashMap>(); // 将urls进行分类 for (URL u : urls) { if (UrlUtils.isMatch(url, u)) { // 按照url中key为category对应的值进行分类,如果没有该值,就找key为providers的值进行分类 String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); List categoryList = result.get(category); if (categoryList == null) { categoryList = new ArrayList(); // 分类结果放入result result.put(category, categoryList); } categoryList.add(u); } } if (result.size() == 0) { return; } // 获得某一个消费者被通知的url集合(通知的 URL 变化结果) Map> categoryNotified = notified.get(url); if (categoryNotified == null) { // 添加该消费者对应的url notified.putIfAbsent(url, new ConcurrentHashMap>()); categoryNotified = notified.get(url); } // 处理通知监听器URL 变化结果 for (Map.Entry> entry : result.entrySet()) { String category = entry.getKey(); List categoryList = entry.getValue(); // 把分类标实和分类后的列表放入notified的value中 // 覆盖到 `notified` // 当某个分类的数据为空时,会依然有 urls 。其中 `urls[0].protocol = empty` ,通过这样的方式,处理所有服务提供者为空的情况。 categoryNotified.put(category, categoryList); // 保存到文件 saveProperties(url); //通知监听器 listener.notify(categoryList); } }

在构造函数的最后一句,调用notify(url.getBackupUrls()); 来将注册中心url返回的urls来进行通知。从下面代码可以开出返回的urls是通过url的参数获得的。

public List getBackupUrls() { List urls = new ArrayList(); urls.add(this); String[] backups = getParameter(Constants.BACKUP_KEY, new String[0]); if (backups != null && backups.length > 0) { for (String backup : backups) { urls.add(this.setAddress(backup)); } } return urls; }

然后获取遍历所有订阅URL,类型Map> ,判断遍历中的当前url与传入的backupURL是否匹配,匹配了继续向下执行,否则则跳过这个url,再处理下一个url。当向下执行时,获取遍历当前url的监听器。对每个监听器执行


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Spring MVC的工作流程(spring cloud)
下一篇:LeetCode刷题 -- 20200607 前缀和篇(leetcode刷题班)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~