博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Dubbo源码分析系列之服务的发布
阅读量:4177 次
发布时间:2019-05-26

本文共 44883 字,大约阅读时间需要 149 分钟。

按照前面对于dubbo的理解,如果要实现服务发布和注册,需要做哪些事情?

  1. 配置文件解析或者注解解析

  2. 服务注册

  3. 启动netty服务实现远程监听

Dubbo对于sping的扩展

最早我们使用Spring的配置,来实现dubbo服务的发布,方便大家的同时,也意味着Dubbo里面和Spring肯定有那种说不清的关系

Spring的标签扩展

在spring中定义了两个接口

NamespaceHandler: 注册一堆BeanDefinitionParser,利用他们来进行解析

BeanDefinitionParser:用于解析每个element的内容

Spring默认会加载jar包下的META-INF/spring.handlers文件寻找对应的NamespaceHandler。Dubbo-config模块下的dubbo-config-spring

slt4.png

Dubbo的接入实现

Dubbo中spring扩展就是使用spring的自定义类型,所以同样也有NamespaceHandler、BeanDefinitionParser。而NamespaceHandler是DubboNamespaceHandler

7
public class DubboNamespaceHandler extends NamespaceHandlerSupport {   static {      Version.checkDuplicate(DubboNamespaceHandler.class);   }   public void init() {        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));        registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));    }}

BeanDefinitionParser全部都使用了DubboBeanDefinitionParser,如果我们想看dubbo:service的配置,就直接看DubboBeanDefinitionParser(ServiceBean.class,true)

这个里面主要做了一件事,把不同的配置分别转化成spring容器中的bean对象

123456789
application对应ApplicationConfigregistry对应RegistryConfigmonitor对应MonitorConfigprovider对应ProviderConfigconsumer对应ConsumerConfig

我们仔细看,发现涉及到服务发布和服务调用的两个配置的解析,试用的是ServiceBean和referenceBean。并不是config结尾的,这两个类稍微特殊些,当然他同时也继承了ServiceConfig和ReferenceConfig

12
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));

DubboBeanDefinitionParser

这里面是实现具体配置文件解析的入口,它重写了parse方法,对spring的配置进行解析。我们关注一下ServiceBean的解析. 实际就是解析dubbo:service这个标签中对应的属性

12345678910
else if (ServiceBean.class.equals(beanClass)) {           String className = element.getAttribute("class");           if (className != null && className.length() > 0) {               RootBeanDefinition classDefinition = new RootBeanDefinition();               classDefinition.setBeanClass(ReflectUtils.forName(className));               classDefinition.setLazyInit(false);               parseProperties(element.getChildNodes(), classDefinition);               beanDefinition.getPropertyValues().addPropertyValue("ref", new BeanDefinitionHolder(classDefinition, id + "Impl"));           }       }

ServiceBean的实现

ServiceBean这个类,分别实现了InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware, ApplicationEventPublisherAware

InitializingBean

接口为bean提供了初始化方法的方式,它只包括afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候会执行该方法。被重写的方法为afterPropertiesSet

DisposableBean

被重写的方法为destroy,bean被销毁的时候,spring容器会自动执行destory方法,比如释放资源

ApplicationContextAware

实现了这个接口的bean,当spring容器初始化的时候,会自动的将ApplicationContext注入进来

ApplicationListener

ApplicationEvent事件监听,spring容器启动后会发一个事件通知。被重写的方法为:onApplicationEvent,onApplicationEvent方法传入的对象是ContextRefreshedEvent。这个对象是当Spring的上下文被刷新或者加载完毕的时候触发的。因此服务就是在Spring的上下文刷新后进行导出操作的

BeanNameAware

获得自身初始化时,本身的bean的id属性,被重写的方法为setBeanName

ApplicationEventPublisherAware

这个是一个异步事件发送器。被重写的方法为setApplicationEventPublisher,简单来说,在spring里面提供了类似于消息队列的异步事件解耦功能。(典型的观察者模式的应用)

spring事件发送监听由3个部分组成

1.ApplicationEvent:表示事件本身,自定义事件需要继承该类

2.ApplicationEventPublisherAware:事件发送器,需要实现该接口

3.ApplicationListener:事件监听器接口

ServiceBean中服务暴露过程

在ServiceBean中,我们暂且只需要关注两个方法,分别是:

在初始化bean的时候会执行该方法afterPropertiesSet,

spring容器启动后会发一个事件通知onApplicationEvent

afterPropertiesSet

我们发现这个方法里面,就是把dubbo中配置的applicationregistryserviceprotocol等信息,加载到对应的config实体中,便于后续的使用

onApplicationEvent

spring容器启动之后,会收到一个这样的事件通知,这里面做了两个事情

  • 判断服务是否已经发布过

  • 如果没有发布,则调用调用export进行服务发布的流程(这里就是入口)

12345678
public void onApplicationEvent(ContextRefreshedEvent event) {        if (!isExported() && !isUnexported()) {            if (logger.isInfoEnabled()) {                logger.info("The service ready on spring started. service: " + getInterface());            }            export();        }    }

export

serviceBean中,重写了export方法,实现了 一个事件的发布。并且调用了super.export() ,也就是会调用父类的export方法

123456
@Override    public void export() {        super.export();        // Publish ServiceBeanExportedEvent        publishExportEvent();    }

ServiceConfig配置类

先整体来看一下这个类的作用,从名字来看,它应该和其他所有config类一样去实现对配置文件中service的配置信息的存储。实际上这个类并不单纯,所有的配置它都放在了一个AbstractServiceConfig的抽象类,自己实现了很多对于服务发布之前要做的操作逻辑

export

public synchronized void export() {        checkAndUpdateSubConfigs(); //检查并且更新配置信息        if (!shouldExport()) {//当前的服务是否需要发布, 通过配置实现:@Service(export = false)            return;        }        if (shouldDelay()) {//检查是否需要延时发布,通过配置@Service(delay = 1000)实现,单位毫秒            //这里的延时是通过定时器来实现            delayExportExecutor.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);        } else {            doExport(); //如果没有配置delay,则直接调用export进行发布        }    }

doExport

这里仍然还是在实现发布前的各种判断,比如判断

protected synchronized void doExport() {        if (unexported) {            throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");        }        if (exported) { //服务是否已经发布过了            return;        }        exported = true; //设置发布状态        if (StringUtils.isEmpty(path)) {//path表示服务路径,默认使用interfaceName            path = interfaceName;        }        doExportUrls();    }

doExportUrls

  1. 记载所有配置的注册中心地址

  2. 遍历所有配置的协议,protocols

  3. 针对每种协议发布一个对应协议的服务

private void doExportUrls() {        //加载所有配置的注册中心的地址,组装成一个URL    //(registry://ip:port/org.apache.dubbo.registry.RegistryService的东西)        List
registryURLs = loadRegistries(true); for (ProtocolConfig protocolConfig : protocols) { //group跟version组成一个pathKey(serviceName) String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version); //applicationModel用来存储ProviderModel,发布的服务的元数据,后续会用到 ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass); ApplicationModel.initProviderModel(pathKey, providerModel); doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }

doExportUrlsFor1Protocol

发布指定协议的服务,我们以Dubbo服务为例,由于代码太多,就不全部贴出来

  1. 前面的一大串if else代码,是为了把当前服务下所配置的<dubbo:method>参数进行解析,保存到map集合中

  2. 获得当前服务需要暴露的ip和端口

  3. 把解析到的所有数据,组装成一个URL,大概应该是:dubbo://192.168.13.1:20881/com.gupaoedu.dubbo.practice.ISayHelloService

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List
registryURLs) { //省略一大串ifelse代码,用于解析
配置 //省略解析
中配置参数的代码,比如token、比如service中的method名称等存储在map中 //获得当前服务要发布的目标ip和port String host = this.findConfigedHosts(protocolConfig, registryURLs, map); Integer port = this.findConfigedPorts(protocolConfig, name, map); //组装URL URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);//这里是通过ConfiguratorFactory去实现动态改变配置的功能,这里暂时不涉及后续再分析 if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); }

如果scope!=”none”则发布服务,默认scope为null。如果scope不为none,判断是否为local或remote,从而发布Local服务或Remote服务,默认两个都会发布

12345678
String scope = url.getParameter(SCOPE_KEY);if (!SCOPE_NONE.equalsIgnoreCase(scope)) {            //injvm发布到本地            if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {                exportLocal(url);            }            //发布远程服务            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {

Local

服务只是injvm的服务,提供一种消费者和提供者都在一个jvm内的调用方式。使用了Injvm协议,是一个伪协议,它不开启端口,不发起远程调用,只在JVM内直接关联,(通过集合的方式保存了发布的服务信息),但执行Dubbo的Filter链。简单来说,就是你本地的dubbo服务调用,都依托于dubbo的标准来进行。这样可以享受到dubbo的一些配置服务

remote

表示根据根据配置的注册中心进行远程发布。遍历多个注册中心,进行协议的发布

  1. Invoker是一个代理类,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。(后续单独分析)

  2. DelegateProviderMetaDataInvoker,因为2.7引入了元数据,所以这里对invoker做了委托,把invoker交给DelegateProviderMetaDataInvoker来处理

  3. 调用protocol.export(invoker)来发布这个代理

  4. 添加到exporters集合

12345678
for (URL registryURL : registryURLs) {   //省略部分代码...  Invoker
invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); Exporter
exporter = protocol.export(wrapperInvoker); exporters.add(exporter);}

protocol.export

protocol.export,这个protocol是什么呢?找到定义处发现它是一个自适应扩展点,打开Protocol这个扩展点,又可以看到它是一个在方法层面上的自适应扩展,意味着它实现了对于export这个方法的适配。也就意味着这个Protocol是一个动态代理类,Protocol$Adaptive

1
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

这个动态代理类,会根据url中配置的protocol name来实现对应协议的适配

Protocol$Adaptive

public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {    public void destroy()  {        throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");    }    public int getDefaultPort()  {        throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");    }    public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");        if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");        org.apache.dubbo.common.URL url = arg0.getUrl();        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);        return extension.export(arg0);    }    public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {        if (arg1 == null) throw new IllegalArgumentException("url == null");        org.apache.dubbo.common.URL url = arg1;        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);        return extension.refer(arg0, arg1);    }}

那么在当前的场景中,protocol会是调用谁呢?目前发布的invoker(URL),实际上是一个registry://协议,所以Protocol$Adaptive,会通过getExtension(extName)得到一个RegistryProtocol

RegistryProtocol.export

很明显,这个RegistryProtocol是用来实现服务注册的

这里面会有很多处理逻辑

  • 实现对应协议的服务发布

  • 实现服务注册

  • 订阅服务重写

public 
Exporter
export(final Invoker
originInvoker) throws RpcException { //这里获得的是zookeeper注册中心的url: zookeeper://ip:port URL registryUrl = getRegistryUrl(originInvoker); //这里是获得服务提供者的url, dubbo://ip:port... URL providerUrl = getProviderUrl(originInvoker); /***************************************************************************/ //订阅override数据。在admin控制台可以针对服务进行治理,比如修改权重,修改路由机制等,当注册中心有此服务的覆盖配置注册进来时,推送消息给提供者,重新暴露服务 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); /*****************************************************************************/ //这里就交给了具体的协议去暴露服务(很重要) final ExporterChangeableWrapper
exporter = doLocalExport(originInvoker, providerUrl); // 根据invoker中的url获取Registry实例: zookeeperRegistry final Registry registry = getRegistry(originInvoker); //获取要注册到注册中心的URL: dubbo://ip:port final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); ProviderInvokerWrapper
providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); //to judge if we need to delay publish boolean register = registeredProviderUrl.getParameter("register", true); if (register) {//是否配置了注册中心,如果是, 则需要注册 //注册到注册中心的URL register(registryUrl, registeredProviderUrl); providerInvokerWrapper.setReg(true); } //设置注册中心的订阅 // Deprecated! Subscribe to override rules in 2.6.x or before. registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); //保证每次export都返回一个新的exporter实例 return new DestroyableExporter<>(exporter); }

doLocalExport

先通过doLocalExport来暴露一个服务,本质上应该是启动一个通信服务,主要的步骤是将本地ip和20880端口打开,进行监听

originInvoker: 应该是registry://ip:port/com.alibaba.dubbo.registry.RegistryService

key: 从originInvoker中获得发布协议的url: dubbo://ip:port/…

bounds: 一个prviderUrl服务export之后,缓存到 bounds中,所以一个providerUrl只会对应一个exporter

computeIfAbsent就相当于, java8的语法

123
if(bounds.get(key)==null){    bounds.put(key,s->{})}
1234567891011
private 
ExporterChangeableWrapper
doLocalExport(final Invoker
originInvoker, URL providerUrl) { String key = getCacheKey(originInvoker); return (ExporterChangeableWrapper
) bounds.computeIfAbsent(key, s -> { //对原有的invoker,委托给了InvokerDelegate Invoker
invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl); //将invoker转换为exporter并启动netty服务 return new ExporterChangeableWrapper<>((Exporter
) protocol.export(invokerDelegate), originInvoker); }); }

InvokerDelegete: 是RegistryProtocol的一个静态内部类,该类是一个originInvoker的委托类,该类存储了originInvoker,其父类InvokerWrapper还会存储providerUrl,InvokerWrapper会调用originInvoker的invoke方法,也会销毁invoker。可以管理invoker的生命周期

DubboProtocol.export

基于动态代理的适配,很自然的就过渡到了DubboProtocol这个协议类中,但是实际上是DubboProtocol吗?

这里并不是获得一个单纯的DubboProtocol扩展点,而是会通过Wrapper对Protocol进行装饰,装饰器分别为: QosProtocolWrapper/ProtocolListenerWrapper/ProtocolFilterWrapper/DubboProtocol

为什么是这样?我们再来看看spi的代码

Wrapper包装

在ExtensionLoader.loadClass这个方法中,有一段这样的判断,如果当前这个类是一个wrapper包装类,也就是这个wrapper中有构造方法,参数是当前被加载的扩展点的类型,则把这个wrapper类加入到cacheWrapperClass缓存中。

123456789101112
else if (isWrapperClass(clazz)) {            cacheWrapperClass(clazz);        }private boolean isWrapperClass(Class
clazz) { try { clazz.getConstructor(type); return true; } catch (NoSuchMethodException e) { return false; } }

我们可以在dubbo的配置文件中找到三个Wrapper

QosprotocolWrapper, 如果当前配置了注册中心,则会启动一个Qos server.qos是dubbo的在线运维命令,dubbo2.5.8新版本重构了telnet模块,提供了新的telnet命令支持,新版本的telnet端口与dubbo协议的端口是不同的端口,默认为22222

ProtocolFilterWrapper,对invoker进行filter的包装,实现请求的过滤

ProtocolListenerWrapper, 用于服务export时候插入监听机制,暂未实现

123
qos=org.apache.dubbo.qos.protocol.QosProtocolWrapperfilter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapperlistener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper

接着,在getExtension->createExtension方法中,会对cacheWrapperClass集合进行判断,如果集合不为空,则进行包装

123456
Set
> wrapperClasses = cachedWrapperClasses; if (CollectionUtils.isNotEmpty(wrapperClasses)) { for (Class
wrapperClass : wrapperClasses) { instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } }

ProtocolFilterWrapper

这个是一个过滤器的包装,使用责任链模式,对invoker进行了包装

public 
Exporter
export(Invoker
invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); }//构建责任链,基于激活扩展点private static
Invoker
buildInvokerChain(final Invoker
invoker, String key, String group) { Invoker
last = invoker; List
filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

我们看如下文件:

/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Filter

默认提供了非常多的过滤器。然后基于条件激活扩展点,来对invoker进行包装,从而在实现远程调用的时候,会经过这些filter进行过滤。

export

public 
Exporter
export(Invoker
invoker) throws RpcException { URL url = invoker.getUrl(); //获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成。比如 //${group}/copm.gupaoedu.practice.dubbo.ISayHelloService:${version}:20880 String key = serviceKey(url); //创建DubboExporter DubboExporter
exporter = new DubboExporter
(invoker, key, exporterMap); // 将
键值对放入缓存中 exporterMap.put(key, exporter); //export an stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } //启动服务 openServer(url); //优化序列化 optimizeSerialization(url); return exporter; }

openServer

去开启一个服务,并且放入到缓存中->在同一台机器上(单网卡),同一个端口上仅允许启动一个服务器实例

2
private void openServer(URL url) {        // 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例        String key = url.getAddress();        client 也可以暴露一个只有server可以调用的服务        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);        if (isServer) {            //是否在serverMap中缓存了            ExchangeServer server = serverMap.get(key);            if (server == null) {                synchronized (this) {                    server = serverMap.get(key);                    if (server == null) {                        // 创建服务器实例                        serverMap.put(key, createServer(url));                    }                }            } else {                // 服务器已创建,则根据 url 中的配置重置服务器                server.reset(url);            }        }    }

createServer

创建服务,开启心跳检测,默认使用netty。组装url

private ExchangeServer createServer(URL url) {    //组装url,在url中添加心跳时间、编解码参数        url = URLBuilder.from(url)                // 当服务关闭以后,发送一个只读的事件,默认是开启状态                .addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())                // 启动心跳配置                .addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT))                .addParameter(Constants.CODEC_KEY, DubboCodec.NAME)                .build();        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);//通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {            throw new RpcException("Unsupported server type: " + str + ", url: " + url);        }		//创建ExchangeServer.        ExchangeServer server;        try {            server = Exchangers.bind(url, requestHandler);        } catch (RemotingException e) {            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);        }        str = url.getParameter(Constants.CLIENT_KEY);        if (str != null && str.length() > 0) {            Set
supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }

Exchangers.bind

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {        if (url == null) {            throw new IllegalArgumentException("url == null");        }        if (handler == null) {            throw new IllegalArgumentException("handler == null");        }    //获取 Exchanger,默认为 HeaderExchanger。    //调用 HeaderExchanger 的 bind 方法创建 ExchangeServer 实例        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");        return getExchanger(url).bind(url, handler);    }

headerExchanger.bind

这里面包含多个逻辑

  • new DecodeHandler(new HeaderExchangeHandler(handler))

  • Transporters.bind

  • new HeaderExchangeServer

目前我们只需要关心transporters.bind方法即可

123
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));    }

Transporters.bind

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {        if (url == null) {            throw new IllegalArgumentException("url == null");        }        if (handlers == null || handlers.length == 0) {            throw new IllegalArgumentException("handlers == null");        }        ChannelHandler handler;        if (handlers.length == 1) {            handler = handlers[0];        } else {            // 如果 handlers 元素数量大于1,则创建 ChannelHandler 分发器            handler = new ChannelHandlerDispatcher(handlers);        }     // 获取自适应 Transporter 实例,并调用实例方法        return getTransporter().bind(url, handler);    }

getTransporter

getTransporter是一个自适应扩展点,它针对bind方法添加了自适应注解,意味着,bing方法的具体实现,会基于Transporter$Adaptive方法进行适配,那么在这里面默认的通信协议是netty,所以它会采用netty4的实现,也就是org.apache.dubbo.remoting.transport.netty4.NettyTransporter

1
public static Transporter getTransporter() {    return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();}

NettyTransporter.bind

创建一个nettyserver

123
public Server bind(URL url, ChannelHandler listener) throws RemotingException {        return new NettyServer(url, listener);    }

NettyServer

初始化一个nettyserver,并且从url中获得相应的ip/ port。然后调用doOpen();

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));    }public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {        super(url, handler);        localAddress = getUrl().toInetSocketAddress();        // 获取 ip 和端口        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());        if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {            bindIp = Constants.ANYHOST_VALUE;        }        bindAddress = new InetSocketAddress(bindIp, bindPort);        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);        try {            doOpen();// 调用模板方法 doOpen 启动服务器            if (logger.isInfoEnabled()) {                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());            }        } catch (Throwable t) {            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);        }        //fixme replace this with better method        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));    }

doOpen

开启netty服务,这个又是大家熟悉的内容了

12345678910111213141516171819202122232425262728293031323334
protected void doOpen() throws Throwable {        bootstrap = new ServerBootstrap();        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),                new DefaultThreadFactory("NettyServerWorker", true));        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);        channels = nettyServerHandler.getChannels();        bootstrap.group(bossGroup, workerGroup)                .channel(NioServerSocketChannel.class)                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)                .childHandler(new ChannelInitializer
() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { // FIXME: should we use getTimeout()? int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) .addLast("handler", nettyServerHandler); } }); // bind ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); }

然后大家要注意的是,它这里用到了一个handler来处理客户端传递过来的请求:

nettyServerHandler

1
NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);

这个handler是一个链路,它的正确组成应该是

MultiMessageHandler(heartbeatHandler(AllChannelHandler(DecodeHandler(HeaderExchangeHeadler(dubboProtocol

后续接收到的请求,会一层一层的处理。比较繁琐

Invoker是什么

从前面的分析来看,服务的发布分三个阶段

第一个阶段会创造一个invoker

第二个阶段会把经历过一系列处理的invoker(各种包装),在DubboProtocol中保存到exporterMap中

第三个阶段把dubbo协议的url地址注册到注册中心上

前面没有分析Invoker,我们来简单看看Invoker到底是一个啥东西。

Invoker是Dubbo领域模型中非常重要的一个概念, 和ExtensionLoader的重要性是一样的,如果Invoker没有搞懂,那么不算是看懂了Dubbo的源码。我们继续回到ServiceConfig中export的代码,这段代码是还没有分析过的。以这个作为入口来分析我们前面export出去的invoker到底是啥东西

1
Invoker
invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

ProxyFacotory.getInvoker

这个是一个代理工程,用来生成invoker,从它的定义来看,它是一个自适应扩展点,看到这样的扩展点,我们几乎可以不假思索的想到它会存在一个动态适配器类

1
ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

ProxyFactory

这个方法的简单解读为:它是一个spi扩展点,并且默认的扩展实现是javassit, 这个接口中有三个方法,并且都是加了@Adaptive的自适应扩展点。所以如果调用getInvoker方法,应该会返回一个ProxyFactory$Adaptive

12345678910111213
@SPI("javassist")public interface ProxyFactory {    @Adaptive({Constants.PROXY_KEY})    
T getProxy(Invoker
invoker) throws RpcException; @Adaptive({Constants.PROXY_KEY})
T getProxy(Invoker
invoker, boolean generic) throws RpcException; @Adaptive({Constants.PROXY_KEY})
Invoker
getInvoker(T proxy, Class
type, URL url) throws RpcException;

ProxyFactory$Adaptive

这个自适应扩展点,做了两件事情

  • 通过ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(extName)获取了一个指定名称的扩展点,

  • 在dubbo-rpc-api/resources/META-INF/com.alibaba.dubbo.rpc.ProxyFactory中,定义了javassis=JavassisProxyFactory

  • 调用JavassisProxyFactory的getInvoker方法

12345678910111213141516171819202122232425262728
public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory {    public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");        if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");        org.apache.dubbo.common.URL url = arg0.getUrl();        String extName = url.getParameter("proxy", "javassist");        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");        org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);        return extension.getProxy(arg0);    }    public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0, boolean arg1) throws org.apache.dubbo.rpc.RpcException {        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");        if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");        org.apache.dubbo.common.URL url = arg0.getUrl();        String extName = url.getParameter("proxy", "javassist");        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");        org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);        return extension.getProxy(arg0, arg1);    }    public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException {        if (arg2 == null) throw new IllegalArgumentException("url == null");        org.apache.dubbo.common.URL url = arg2;        String extName = url.getParameter("proxy", "javassist");        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");        org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);        return extension.getInvoker(arg0, arg1, arg2);    }}

JavassistProxyFactory.getInvoker

javassist是一个动态类库,用来实现动态代理的。

proxy:接口的实现: com.gupaoedu.practice.dubbo.SayHelloServiceImpl

type:接口全称 com.gupaoedu.dubbo.ISayHelloService

url:协议地址:registry://…

12345678910111213
@Override    public 
Invoker
getInvoker(T proxy, Class
type, URL url) { // TODO Wrapper cannot handle this scenario correctly: the classname contains '$' final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker
(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class
[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }

javassist生成的动态代理代码

通过断点的方式(Wrapper258行),在Wrapper.getWrapper中的makeWrapper,会创建一个动态代理,核心的方法invokeMethod代码如下

12345678910111213141516
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {        com.gupaoedu.dubbo.practice.ISayHelloService w;        try {            w = ((com.gupaoedu.dubbo.practice.ISayHelloService) $1);        } catch (Throwable e) {            throw new IllegalArgumentException(e);        }        try {            if ("sayHello".equals($2) && $3.length == 1) {                return ($w) w.sayHello((java.lang.String) $4[0]);            }        } catch (Throwable e) {            throw new java.lang.reflect.InvocationTargetException(e);        }        throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class com.gupaoedu.dubbo.practice.ISayHelloService.");    }

构建好了代理类之后,返回一个AbstractproxyInvoker,并且它实现了doInvoke方法,这个地方似乎看到了dubbo消费者调用过来的时候触发的影子,因为wrapper.invokeMethod本质上就是触发上面动态代理类的方法invokeMethod.

12345678
return new AbstractProxyInvoker
(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class
[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } };

所以,简单总结一下Invoke本质上应该是一个代理,经过层层包装最终进行了发布。当消费者发起请求的时候,会获得这个invoker进行调用。

最终发布出去的invoker, 也不是单纯的一个代理,也是经过多层包装

InvokerDelegate(DelegateProviderMetaDataInvoker(AbstractProxyInvoker()))

服务注册流程

关于服务发布这一条线分析完成之后,再来了解一下服务注册的过程,希望大家还记得我们之所以走到这一步,是因为我们在RegistryProtocol这个类中,看到了服务发布的流程。

12345678910111213141516171819202122232425262728293031323334353637
public 
Exporter
export(final Invoker
originInvoker) throws RpcException { URL registryUrl = getRegistryUrl(originInvoker); // url to export locally URL providerUrl = getProviderUrl(originInvoker); // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call // the same service. Because the subscribed is cached key with the name of the service, it causes the // subscription information to cover. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); //export invoker final ExporterChangeableWrapper
exporter = doLocalExport(originInvoker, providerUrl); // url to registry final Registry registry = getRegistry(originInvoker); final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); ProviderInvokerWrapper
providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); //to judge if we need to delay publish boolean register = registeredProviderUrl.getParameter("register", true); if (register) { register(registryUrl, registeredProviderUrl); providerInvokerWrapper.setReg(true); } // Deprecated! Subscribe to override rules in 2.6.x or before. registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<>(exporter); }

服务注册核心代码

从export方法中抽离出来的部分代码,就是服务注册的流程

1234567891011
// url to registry        final Registry registry = getRegistry(originInvoker);        final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);        ProviderInvokerWrapper
providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); //to judge if we need to delay publish boolean register = registeredProviderUrl.getParameter("register", true); if (register) { register(registryUrl, registeredProviderUrl); providerInvokerWrapper.setReg(true); }

getRegistry

  1. 把url转化为对应配置的注册中心的具体协议

  2. 根据具体协议,从registryFactory中获得指定的注册中心实现

那么这个registryFactory具体是怎么赋值的呢?

12345
private Registry getRegistry(final Invoker
originInvoker) { //把url转化为配置的具体协议,比如zookeeper://ip:port. 这样后续获得的注册中心就会是基于zk的实现 URL registryUrl = getRegistryUrl(originInvoker); return registryFactory.getRegistry(registryUrl);}

在RegistryProtocol中存在一段这样的代码,很明显这是通过依赖注入来实现的扩展点。

1234
private RegistryFactory registryFactory;public void setRegistryFactory(RegistryFactory registryFactory) {        this.registryFactory = registryFactory;}

按照扩展点的加载规则,我们可以先看看/META-INF/dubbo/internal路径下找到RegistryFactory的配置文件.这个factory有多个扩展点的实现。

1234567
dubbo=org.apache.dubbo.registry.dubbo.DubboRegistryFactorymulticast=org.apache.dubbo.registry.multicast.MulticastRegistryFactoryzookeeper=org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactoryredis=org.apache.dubbo.registry.redis.RedisRegistryFactoryconsul=org.apache.dubbo.registry.consul.ConsulRegistryFactoryetcd3=org.apache.dubbo.registry.etcd.EtcdRegistryFactory

接着,找到RegistryFactory的实现, 发现它里面有一个自适应的方法,根据url中protocol传入的值进行适配

12345
@SPI("dubbo")public interface RegistryFactory {    @Adaptive({"protocol"})    Registry getRegistry(URL url);

RegistryFactory$Adaptive

由于在前面的代码中,url中的protocol已经改成了zookeeper,那么这个时候根据zookeeper获得的spi扩展点应该是ZookeeperRegistryFactory

1234567891011
import org.apache.dubbo.common.extension.ExtensionLoader;public class RegistryFactory$Adaptive implements org.apache.dubbo.registry.RegistryFactory {    public org.apache.dubbo.registry.Registry getRegistry(org.apache.dubbo.common.URL arg0)  {        if (arg0 == null) throw new IllegalArgumentException("url == null");        org.apache.dubbo.common.URL url = arg0;        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );        if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.registry.RegistryFactory) name from url (" + url.toString() + ") use keys([protocol])");        org.apache.dubbo.registry.RegistryFactory extension = (org.apache.dubbo.registry.RegistryFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.registry.RegistryFactory.class).getExtension(extName);        return extension.getRegistry(arg0);    }}

ZookeeperRegistryFactory

这个方法中并没有getRegistry方法,而是在父类AbstractRegistryFactory

  • 从缓存REGISTRIES中,根据key获得对应的Registry

  • 如果不存在,则创建Registry

1234567891011121314151617181920212223242526
public Registry getRegistry(URL url) {        url = URLBuilder.from(url)                .setPath(RegistryService.class.getName())                .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())                .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY)                .build();        String key = url.toServiceStringWithoutResolving();        // Lock the registry access process to ensure a single instance of the registry        LOCK.lock();        try {            Registry registry = REGISTRIES.get(key);            if (registry != null) {                return registry;            }            //创建注册中心            registry = createRegistry(url);            if (registry == null) {                throw new IllegalStateException("Can not create registry " + url);            }            REGISTRIES.put(key, registry);            return registry;        } finally {            // Release the lock            LOCK.unlock();        }    }

createRegistry

创建一个zookeeperRegistry,把url和zookeepertransporter作为参数传入。

zookeeperTransporter 这个属性也是基于依赖注入来赋值的,具体的流程就不再分析了,这个的值应该是

CuratorZookeeperTransporter 表示具体使用什么框架来和zk产生连接

123
public Registry createRegistry(URL url) {        return new ZookeeperRegistry(url, zookeeperTransporter);}

ZookeeperRegistry

这个方法中使用了CuratorZookeeperTransport来实现zk的连接

123456789101112131415161718192021222324
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {        super(url);        if (url.isAnyHost()) {            throw new IllegalStateException("registry address == null");        }    //获得group名称        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);        if (!group.startsWith(Constants.PATH_SEPARATOR)) {            group = Constants.PATH_SEPARATOR + group;        }        this.root = group;    //产生一个zookeeper连接        zkClient = zookeeperTransporter.connect(url);    //添加zookeeper状态变化事件        zkClient.addStateListener(state -> {            if (state == StateListener.RECONNECTED) {                try {                    recover();                } catch (Exception e) {                    logger.error(e.getMessage(), e);                }            }        });    }

registry.register(registedProviderUrl);

继续往下分析,会调用registry.register去讲dubbo://的协议地址注册到zookeeper上

这个方法会调用FailbackRegistry类中的register. 为什么呢?因为ZookeeperRegistry这个类中并没有register这个方法,但是他的父类FailbackRegistry中存在register方法,而这个类又重写了AbstractRegistry类中的register方法。所以我们可以直接定位大FailbackRegistry这个类中的register方法中

123456
register(registryUrl, registeredProviderUrl);public void register(URL registryUrl, URL registeredProviderUrl) {        Registry registry = registryFactory.getRegistry(registryUrl);        registry.register(registeredProviderUrl);}

FailbackRegistry.register

  • FailbackRegistry,从名字上来看,是一个失败重试机制

  • 调用父类的register方法,讲当前url添加到缓存集合中

调用doRegister方法,这个方法很明显,是一个抽象方法,会由ZookeeperRegistry子类实现。

10111213141516171819202122232425262728
public void register(URL url) {        super.register(url);        removeFailedRegistered(url);        removeFailedUnregistered(url);        try {            // 调用子类实现真正的服务注册,把url注册到zk上            doRegister(url);        } catch (Exception e) {            Throwable t = e;            // 如果开启了启动时检测,则直接抛出异常            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)                    && url.getParameter(Constants.CHECK_KEY, true)                    && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());            boolean skipFailback = t instanceof SkipFailbackWrapperException;            if (check || skipFailback) {                if (skipFailback) {                    t = t.getCause();                }                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);            } else {                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);            }            // 将失败的注册请求记录到失败列表,定时重试            addFailedRegistered(url);        }    }

ZookeeperRegistry.doRegister

最终调用curator的客户端把服务地址注册到zk

1234567
public void doRegister(URL url) {        try {            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));        } catch (Throwable e) {            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);        }    }

如果你觉得文章不错,文末的赞 ???? 又回来啦,记得给我「点赞」和「在看」哦~

转载地址:http://fkaai.baihongyu.com/

你可能感兴趣的文章
Xerces C++实现xml文件解析
查看>>
用户强制一台设备登录,其他设备登出
查看>>
spring实现单例及创建线程安全单例
查看>>
设计模式-- 模板方法模式
查看>>
SQL关键字执行顺序
查看>>
设计模式--适配器模式
查看>>
SpringMvc注解之@ControllerAdvice
查看>>
SQL--查询两个字段相同的记录
查看>>
多研究些架构,少谈些框架(1) -- 论微服务架构的核心概念
查看>>
多研究些架构,少谈些框架(2)-- 微服务和充血模型
查看>>
多研究些架构,少谈些框架(3)-- 微服务和事件驱动
查看>>
SQL性能优化梳理
查看>>
微服务架构技术栈
查看>>
想面试进BAT,不得不看的分布式锁,面试题都在这里了!!
查看>>
Redis最常被问到知识点总结
查看>>
这才是微服务拆分的正确姿势,值得学习!
查看>>
MySQL中一条SQL是如何执行的?
查看>>
MySQL的索引是什么?怎么优化?
查看>>
2万字长文包教包会 JVM 内存结构
查看>>
不懂 spring 就彻底放弃 Java 吧!
查看>>