Loading...

Dubbo(六):zookeeper注册中心的应用

技术分享4年前 (2020)更新 技术分享
3,372 0 0

  Dubbo中有一个非常本质和重要的功能,那就是服务的自动注册与发现,而这个功能是通过注册中心来实现的。而dubbo中考虑了外部许多的注册组件的实现,zk,redis,etcd,consul,eureka…

  各自实现方式各有不同,但是对外表现都是一致的:都实现了 Registry 接口!

  今天我们就来看看最常用和注册中心 Zookeeper 的接入实现吧!

 

1. dubbo在 zookeeper 中的服务目录划分

  注册中心的作用主要分发服务的发布,与服务订阅,及服务推送,服务查询!而zk中,则以服务主单位进行目录划分的。

  整个zookeeper的路径概览:

    /dubbo: root 目录, 持久化目录 (可通过 group=xxx 自定义root目录)
    /dubbo/xxx.xx.XxxService: 每个服务一个路径,持久化目录
    /dubbo/xxx.xx.XxxService/configurators: 配置存放路径,默认为空
    /dubbo/xxx.xx.XxxService/providers: 服务提供者目录,所有提供者以其子路径形式存储,持久化目录。各服务提供者以临时目录形式存在,路径样例如: dubbo%3A%2F%2F192.168.56.1%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-api-provider%26default%3Dtrue%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D112756%26release%3D%26side%3Dprovider%26timestamp%3D1588548412331
    /dubbo/xxx.xx.XxxService/consumers: 服务消费者目录, 持久化路径。所有消费者以其子路径形式存储,路径样例如下:consumer%3A%2F%2F192.168.1.4%2Forg.apache.dubbo.rpc.service.GenericService%3Fapplication%3Ddubbo-demo-api-consumer%26category%3Dconsumers%26check%3Dfalse%26dubbo%3D2.0.2%26generic%3Dtrue%26interface%3Dorg.apache.dubbo.demo.DemoService%26pid%3D139620%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1588596195728
    /dubbo/xxx.xx.XxxService/routers: 路由配置信息,默认为空

 

2. zookeeper 组件的接入

  Zookeeper 是在本地服务通过socket端口暴露之后,再调用 RegistryFactoryWrapper 进行获取的。

  注册时的URL如下: registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-api-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F192.168.56.1%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-api-provider%26bind.ip%3D192.168.56.1%26bind.port%3D20880%26default%3Dtrue%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D135556%26release%3D%26side%3Dprovider%26timestamp%3D1588518573613&pid=135556&registry=zookeeper&timestamp=1588518545808

  zk的调用分两种情况,一是作为服务提供者时会在 ServiceConfig#doExportUrlsFor1Protocol 中,进行远程服务暴露时会拉起。二是在消费者在进行远程调用时会 ReferenceConfig#createProxy 时拉取以便获取提供者列表。我们以ServiceConfig为例看看其调用zk的过程:

    // ServiceConfig#doExportUrlsFor1Protocol 
    // 此处 PROTOCOL 是一个SPI类,默认实例是DubboProtocol,但其在处理具体协议时会根据协议类型做出相应选择
    // 此处 协议为 registry, 所以会选择 RegistryProtocol 进行export() 处理
    Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);

    // 而在RegistryProtocol中,又有几个重要属性,是dubbo进行依赖注入完成的
    // org.apache.dubbo.registry.integration.RegistryProtocol    
    public void setCluster(Cluster cluster) {
        this.cluster = cluster;
    }
    public void setProtocol(Protocol protocol) {
        this.protocol = protocol;
    }
    public void setRegistryFactory(RegistryFactory registryFactory) {
        this.registryFactory = registryFactory;
    }


    // org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper#export
    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        // 检查是否是要进行服务注册,即检查协议字段是否是 registry, 即前缀是 registry://
        // 注册完成后即返回
        if (UrlUtils.isRegistry(invoker.getUrl())) {
            // ProtocolFilterWrapper.export()
            return protocol.export(invoker);
        }
        return new ListenerExporterWrapper<T>(protocol.export(invoker),
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                        .getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY)));
    }
    // org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper#export
    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        // 同样判断是否是注册请求
        if (UrlUtils.isRegistry(invoker.getUrl())) {
            // RegistryProtocol.export()
            return protocol.export(invoker);
        }
        return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
    }
    // org.apache.dubbo.registry.integration.RegistryProtocol#export
    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        URL registryUrl = getRegistryUrl(originInvoker);
        // url to export locally
        URL providerUrl = getProviderUrl(originInvoker);

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
        //  the same service. Because the subscribed is cached key with the name of the service, it causes the
        //  subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        // url to registry
        // 获取registry实例,如 zk client
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);

        // decide if we need to delay publish
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        if (register) {
            // 注册服务地址到注册中心
            register(registryUrl, registeredProviderUrl);
        }

        // register stated url on provider model
        registerStatedUrl(registryUrl, registeredProviderUrl, register);

        // Deprecated! Subscribe to override rules in 2.6.x or before.
        // 订阅目录
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);

        notifyExport(exporter);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);
    }
    /**
     * Get an instance of registry based on the address of invoker
     *
     * @param originInvoker
     * @return
     */
    protected Registry getRegistry(final Invoker<?> originInvoker) {
        URL registryUrl = getRegistryUrl(originInvoker);
        // RegistryFactory 又是通过 SPI 机制生成的    
        // 会根据具体的注册中心的类型创建调用具体实例,如此处为: zookeeper, 所以会调用 ZookeeperRegistryFactory.getRegistry()
        return registryFactory.getRegistry(registryUrl);
    }
    // 所有 RegistryFactory 都会被包装成 RegistryFactoryWrapper, 以便修饰
    // org.apache.dubbo.registry.RegistryFactoryWrapper#getRegistry
    @Override
    public Registry getRegistry(URL url) {
        // 对于zk, 会调用 ZookeeperRegistryFactory
        return new ListenerRegistryWrapper(registryFactory.getRegistry(url),
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(RegistryServiceListener.class)
                        .getActivateExtension(url, "registry.listeners")));
    }
    // org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry(org.apache.dubbo.common.URL)
    @Override
    public Registry getRegistry(URL url) {
        if (destroyed.get()) {
            LOGGER.warn("All registry instances have been destroyed, failed to fetch any instance. " +
                    "Usually, this means no need to try to do unnecessary redundant resource clearance, all registries has been taken care of.");
            return DEFAULT_NOP_REGISTRY;
        }

        url = URLBuilder.from(url)
                .setPath(RegistryService.class.getName())
                .addParameter(INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(EXPORT_KEY, REFER_KEY)
                .build();
        String key = createRegistryCacheKey(url);
        // Lock the registry access process to ensure a single instance of the registry
        LOCK.lock();
        try {
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            //create registry by spi/ioc
            // 调用子类方法创建 registry 实例,此处为 ZookeeperRegistry.createRegistry
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }
    // org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory#createRegistry
    @Override
    public Registry createRegistry(URL url) {
        // 最终将Zk组件接入到应用中了,后续就可以使用zk提供的相应功能了
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }

  至此,zookeeper被接入了。我们先来看下 zookeeper 注册中心构造方法实现:

    // org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#ZookeeperRegistry
    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        // 抽象父类处理
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        // group=dubbo
        String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(PATH_SEPARATOR)) {
            group = PATH_SEPARATOR + group;
        }
        this.root = group;
        zkClient = zookeeperTransporter.connect(url);
        zkClient.addStateListener((state) -> {
            if (state == StateListener.RECONNECTED) {
                logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.\n" +
                        " Since ephemeral ZNode will not get deleted for a connection lose, " +
                        "there's no need to re-register url of this instance.");
                ZookeeperRegistry.this.fetchLatestAddresses();
            } else if (state == StateListener.NEW_SESSION_CREATED) {
                logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry...");
                try {
                    ZookeeperRegistry.this.recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            } else if (state == StateListener.SESSION_LOST) {
                logger.warn("Url of this instance will be deleted from registry soon. " +
                        "Dubbo client will try to re-register once a new session is created.");
            } else if (state == StateListener.SUSPENDED) {

            } else if (state == StateListener.CONNECTED) {

            }
        });
    }
    // org.apache.dubbo.registry.support.FailbackRegistry#FailbackRegistry
    public FailbackRegistry(URL url) {
        super(url);
        // retry.period=5000
        this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD);

        // since the retry task will not be very much. 128 ticks is enough.
        // 当连接失败时,使用后台定时重试
        retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
    }
    // org.apache.dubbo.registry.support.AbstractRegistry#AbstractRegistry
    public AbstractRegistry(URL url) {
        setUrl(url);
        // 默认会开启file.cache 选项,所以一般会要求存在该文件
        if (url.getParameter(REGISTRY__LOCAL_FILE_CACHE_ENABLED, true)) {
            // Start file save timer
            syncSaveFile = url.getParameter(REGISTRY_FILESAVE_SYNC_KEY, false);
            // 默认路径 $USER_HOME/.dubbo/dubbo-registry-dubbo-demo-api-provider-127.0.0.1-2181.cache
            String defaultFilename = System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(APPLICATION_KEY) + "-" + url.getAddress().replaceAll(":", "-") + ".cache";
            String filename = url.getParameter(FILE_KEY, defaultFilename);
            File file = null;
            if (ConfigUtils.isNotEmpty(filename)) {
                file = new File(filename);
                if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
                    if (!file.getParentFile().mkdirs()) {
                        throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
                    }
                }
            }
            this.file = file;
            // When starting the subscription center,
            // we need to read the local cache file for future Registry fault tolerance processing.
            // 如果文件存在先从其中加载原有配置
            loadProperties();
            notify(url.getBackupUrls());
        }
    }
    // 如果存在注册中心缓存文件,则从其中加载各属性值
    // 其值为 xxx.xxxService=xxx 格式
    private void loadProperties() {
        if (file != null && file.exists()) {
            InputStream in = null;
            try {
                in = new FileInputStream(file);
                properties.load(in);
                if (logger.isInfoEnabled()) {
                    logger.info("Load registry cache file " + file + ", data: " + properties);
                }
            } catch (Throwable e) {
                logger.warn("Failed to load registry cache file " + file, e);
            } finally {
                if (in != null) {
                    try {
                        in.close();
                    } catch (IOException e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            }
        }
    }
    // org.apache.dubbo.registry.support.AbstractRegistry#notify(java.util.List<org.apache.dubbo.common.URL>)
    protected void notify(List<URL> urls) {
        if (CollectionUtils.isEmpty(urls)) {
            return;
        }

        for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
            URL url = entry.getKey();

            if (!UrlUtils.isMatch(url, urls.get(0))) {
                continue;
            }

            Set<NotifyListener> listeners = entry.getValue();
            if (listeners != null) {
                for (NotifyListener listener : listeners) {
                    try {
                        notify(url, listener, filterEmpty(url, urls));
                    } catch (Throwable t) {
                        logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
                    }
                }
            }
        }
    }

  该构造函数的主要作用是建立与zkServer的连接实例,添加状态监听,以及本地缓存文件的处理。

 

3. Zookeeper 服务提供者注册

  上一节我们看到在初始化 zookeeper 时,是在export()过程中通过 getRegistry() 实现的。同样,在export()过程中,在获取了注册中心实例后,还需要将服务地址注册上去,才算功成。服务注册的时序图如下:

Dubbo(六):zookeeper注册中心的应用

    // org.apache.dubbo.registry.integration.RegistryProtocol#export
    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        URL registryUrl = getRegistryUrl(originInvoker);
        // url to export locally
        URL providerUrl = getProviderUrl(originInvoker);

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
        //  the same service. Because the subscribed is cached key with the name of the service, it causes the
        //  subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        // url to registry
        // 1. 获取注册中心实例
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);

        // decide if we need to delay publish
        // 2. 将 registeredProviderUrl 注册上去
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        if (register) {
            register(registryUrl, registeredProviderUrl);
        }

        // register stated url on provider model
        registerStatedUrl(registryUrl, registeredProviderUrl, register);

        // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);

        notifyExport(exporter);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);
    }
    private void register(URL registryUrl, URL registeredProviderUrl) {
        // 此处获取 registry 实际是registry的一个 wrapper, registryUrl 以 zookeeper:// 开头
        Registry registry = registryFactory.getRegistry(registryUrl);
        // 此处的 registeredProviderUrl 如: dubbo://192.168.56.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=135556&release=&side=provider&timestamp=1588518573613
        registry.register(registeredProviderUrl);
    }
    // org.apache.dubbo.registry.ListenerRegistryWrapper#register
    @Override
    public void register(URL url) {
        try {
            // 调用实际的 registry
            registry.register(url);
        } finally {
            if (CollectionUtils.isNotEmpty(listeners)) {
                RuntimeException exception = null;
                for (RegistryServiceListener listener : listeners) {
                    if (listener != null) {
                        try {
                            listener.onRegister(url);
                        } catch (RuntimeException t) {
                            logger.error(t.getMessage(), t);
                            exception = t;
                        }
                    }
                }
                if (exception != null) {
                    throw exception;
                }
            }
        }
    }
    // org.apache.dubbo.registry.support.FailbackRegistry#register
    @Override
    public void register(URL url) {
        if (!acceptable(url)) {
            logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
            return;
        }
        super.register(url);
        removeFailedRegistered(url);
        removeFailedUnregistered(url);
        try {
            // Sending a registration request to the server side
            // 调用zk进行url注册
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;

            // If the startup detection is opened, the Exception is thrown directly.
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

            // Record a failed registration request to a failed list, retry regularly
            addFailedRegistered(url);
        }
    }
    // org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister
    @Override
    public void doRegister(URL url) {
        try {
            // 创建zk node, dynamic=true, 默认创建的节点为临时节点
            // url地址如:  dubbo://192.168.56.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=34676&release=&side=provider&timestamp=1588639020690
            // 此地址通过 toUrlPath 转换为 zookeeper 的目录地址,比如 providers 目录,consumers 目录...
            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
    // 将提供者信息转移为zk目录
    private String toUrlPath(URL url) {
        // 找到分类目录,最下级为当前提供者的所有url信息encode后的值
        // 即 /dubbo/interfaceName/providers/xxxx
        return toCategoryPath(url) + PATH_SEPARATOR + URL.encode(url.toFullString());
    }
    private String toCategoryPath(URL url) {
        // category='', 默认为 providers
        return toServicePath(url) + PATH_SEPARATOR + url.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
    }
    private String toServicePath(URL url) {
        String name = url.getServiceInterface();
        if (ANY_VALUE.equals(name)) {
            return toRootPath();
        }
        // /dubbo/interfaceName
        return toRootDir() + URL.encode(name);
    }

  服务注册主要就分两步:1. 获取registry实例(通过SPI机制); 2. 将服务的url转换为对应的路径,以临时节点的形式create到zkServer; 以服务名作为父路径,以自身服务url作为叶子路径,可以很方便在相应路径上找到所有的提供者或消费者。

 

4. zookeeper 服务下线处理

  当应用要关闭,或者注册失败时,需要进行服务下线。当然,如果应用没有及时做下线处理,zk会通过其自身的临时节点过期机制,也会将该服务做下线处理。从而避免消费者或管理台看到无效的服务存在。

  应用服务的主动下线操作是由 ShutdownHookCallbacks 来实现的,其时序图如下:

Dubbo(六):zookeeper注册中心的应用

    // org.apache.dubbo.config.bootstrap.DubboBootstrap#DubboBootstrap
    private DubboBootstrap() {
        configManager = ApplicationModel.getConfigManager();
        environment = ApplicationModel.getEnvironment();
        // 在 DubboBootstrap 创建时就创建几个关闭钩子
        DubboShutdownHook.getDubboShutdownHook().register();
        // 将 DubboBootstrap 的销毁动作添加到 DubboShutdownHook 的执行队列中,以便在关闭时一起调用
        ShutdownHookCallbacks.INSTANCE.addCallback(new ShutdownHookCallback() {
            @Override
            public void callback() throws Throwable {
                DubboBootstrap.this.destroy();
            }
        });
    }
    /**
     * Register the ShutdownHook
     */
    public void register() {
        if (registered.compareAndSet(false, true)) {
            DubboShutdownHook dubboShutdownHook = getDubboShutdownHook();
            Runtime.getRuntime().addShutdownHook(dubboShutdownHook);
            dispatch(new DubboShutdownHookRegisteredEvent(dubboShutdownHook));
        }
    }
    // org.apache.dubbo.config.bootstrap.DubboBootstrap#destroy
    public void destroy() {
        if (destroyLock.tryLock()) {
            try {
                // DubboShutdownHook 实现的destroy方法
                DubboShutdownHook.destroyAll();

                if (started.compareAndSet(true, false)
                        && destroyed.compareAndSet(false, true)) {

                    unregisterServiceInstance();
                    unexportMetadataService();
                    unexportServices();
                    unreferServices();

                    destroyRegistries();
                    DubboShutdownHook.destroyProtocols();
                    destroyServiceDiscoveries();

                    clear();
                    shutdown();
                    release();
                }
            } finally {
                destroyLock.unlock();
            }
        }
    }
    // org.apache.dubbo.config.DubboShutdownHook#destroyAll
    public static void destroyAll() {
        if (destroyed.compareAndSet(false, true)) {
            AbstractRegistryFactory.destroyAll();
            destroyProtocols();
        }
    }
    // org.apache.dubbo.registry.support.AbstractRegistryFactory#destroyAll
    /**
     * Close all created registries
     */
    public static void destroyAll() {
        if (!destroyed.compareAndSet(false, true)) {
            return;
        }

        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Close all registries " + getRegistries());
        }
        // Lock up the registry shutdown process
        LOCK.lock();
        try {
            // 获取所有注册的地址,一一进行destroy()操作
            for (Registry registry : getRegistries()) {
                try {
                    registry.destroy();
                } catch (Throwable e) {
                    LOGGER.error(e.getMessage(), e);
                }
            }
            REGISTRIES.clear();
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }
    // org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#destroy
    @Override
    public void destroy() {
        super.destroy();
        try {
            // (解绑)销毁动作只需执行一次,一次会将所有需要解绑的地址全部操作完成
            zkClient.close();
        } catch (Exception e) {
            logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

    // org.apache.dubbo.registry.support.FailbackRegistry#destroy
    @Override
    public void destroy() {
        super.destroy();
        retryTimer.stop();
    }
    // org.apache.dubbo.registry.support.AbstractRegistry#destroy
    @Override
    public void destroy() {
        if (logger.isInfoEnabled()) {
            logger.info("Destroy registry:" + getUrl());
        }
        Set<URL> destroyRegistered = new HashSet<>(getRegistered());
        if (!destroyRegistered.isEmpty()) {
            for (URL url : new HashSet<>(getRegistered())) {
                if (url.getParameter(DYNAMIC_KEY, true)) {
                    try {
                        // 此处会将在该 register 中注册的所有地址进行解绑,所以,当前实例只需调用一次destroy()即可
                        unregister(url);
                        if (logger.isInfoEnabled()) {
                            logger.info("Destroy unregister url " + url);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
        Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed());
        if (!destroySubscribed.isEmpty()) {
            for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    try {
                        unsubscribe(url, listener);
                        if (logger.isInfoEnabled()) {
                            logger.info("Destroy unsubscribe url " + url);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
        AbstractRegistryFactory.removeDestroyedRegistry(this);
    }
    // org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doUnregister
    @Override
    public void doUnregister(URL url) {
        try {
            // 注册地址如: /dubbo/org.apache.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F192.168.56.1%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-api-provider%26default%3Dtrue%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%2CsayHelloAsync%26pid%3D135556%26release%3D%26side%3Dprovider%26timestamp%3D1588518573613
            zkClient.delete(toUrlPath(url));
        } catch (Throwable e) {
            throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

  其实就是一个关闭钩子,进行路径的主动删除操作。

 

5. zookeeper 消费者服务订阅

  前面两个操作,服务的注册与解绑,可以针对提供者,也可以针对消费者。但是对服务订阅则更多是针对消费者!因其要时刻关注提供者的变化,接收注册中心的消息推送。其以 ReferenceConfig.get() 入口。所以,我们以消费者的视图进行解析zk的订阅过程:

Dubbo(六):zookeeper注册中心的应用

    // org.apache.dubbo.registry.integration.RegistryProtocol#refer
    @Override
    @SuppressWarnings("unchecked")
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        url = getRegistryUrl(url);
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
        String group = qs.get(GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        return doRefer(cluster, registry, type, url);
    }

    // org.apache.dubbo.registry.integration.RegistryProtocol#doRefer
    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (directory.isShouldRegister()) {
            directory.setRegisteredConsumerUrl(subscribeUrl);
            registry.register(directory.getRegisteredConsumerUrl());
        }
        directory.buildRouterChain(subscribeUrl);
        // 注册消费者监听到 zk 中
        // 添加路径 providers, configurators, routers, 以便在同时监听几个目录
        directory.subscribe(toSubscribeUrl(subscribeUrl));

        Invoker<T> invoker = cluster.join(directory);
        List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
        if (CollectionUtils.isEmpty(listeners)) {
            return invoker;
        }

        RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
        for (RegistryProtocolListener listener : listeners) {
            listener.onRefer(this, registryInvokerWrapper);
        }
        return registryInvokerWrapper;
    }
    private static URL toSubscribeUrl(URL url) {
        return url.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY);
    }

    // org.apache.dubbo.registry.ListenerRegistryWrapper#subscribe
    @Override
    public void subscribe(URL url, NotifyListener listener) {
        try {
            registry.subscribe(url, listener);
        } finally {
            if (CollectionUtils.isNotEmpty(listeners)) {
                RuntimeException exception = null;
                for (RegistryServiceListener registryListener : listeners) {
                    if (registryListener != null) {
                        try {
                            registryListener.onSubscribe(url);
                        } catch (RuntimeException t) {
                            logger.error(t.getMessage(), t);
                            exception = t;
                        }
                    }
                }
                if (exception != null) {
                    throw exception;
                }
            }
        }
    }
    // org.apache.dubbo.registry.support.FailbackRegistry#subscribe
    @Override
    public void subscribe(URL url, NotifyListener listener) {
        super.subscribe(url, listener);
        removeFailedSubscribed(url, listener);
        try {
            // Sending a subscription request to the server side
            doSubscribe(url, listener);
        } catch (Exception e) {
            Throwable t = e;

            List<URL> urls = getCacheUrls(url);
            if (CollectionUtils.isNotEmpty(urls)) {
                notify(url, listener, urls);
                logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
            } else {
                // If the startup detection is opened, the Exception is thrown directly.
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                        && url.getParameter(Constants.CHECK_KEY, true);
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
                    if (skipFailback) {
                        t = t.getCause();
                    }
                    throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
                } else {
                    logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                }
            }

            // Record a failed registration request to a failed list, retry regularly
            addFailedSubscribed(url, listener);
        }
    }

    // org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe
    @Override
    public void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            if (ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
                    for (String child : currentChilds) {
                        child = URL.decode(child);
                        if (!anyServices.contains(child)) {
                            anyServices.add(child);
                            subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
                                    Constants.CHECK_KEY, String.valueOf(false)), k);
                        }
                    }
                });
                zkClient.create(root, false);
                List<String> services = zkClient.addChildListener(root, zkListener);
                if (CollectionUtils.isNotEmpty(services)) {
                    for (String service : services) {
                        service = URL.decode(service);
                        anyServices.add(service);
                        subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                    }
                }
            } else {
                // configurators
                List<URL> urls = new ArrayList<>();
                // /dubbo/org.apache.dubbo.demo.DemoService/configurators 
                // url 样例: provider://192.168.56.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-api-provider&bind.ip=192.168.56.1&bind.port=20880&category=configurators&check=false&default=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=34676&release=&side=provider&timestamp=1588639020690
                // 它会经过 toCategoriesPath() 转换为多个控制目录,依次注册
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                    // listener 是 ConcurrentHashMap 中的 ConcurrentHashMap, 所以迭代也是两层的
                    // 当目录发生变更时,运行 notify() 方法
                    ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
                    // 与提供者的服务注册不一样,消费者的订阅路径是 持久化的
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                // 自身注册完成后,主动触发一次 notify() 以刷新信息
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
    private String[] toCategoriesPath(URL url) {
        String[] categories;
        if (ANY_VALUE.equals(url.getParameter(CATEGORY_KEY))) {
            categories = new String[]{PROVIDERS_CATEGORY, CONSUMERS_CATEGORY, ROUTERS_CATEGORY, CONFIGURATORS_CATEGORY};
        } else {
            categories = url.getParameter(CATEGORY_KEY, new String[]{DEFAULT_CATEGORY});
        }
        String[] paths = new String[categories.length];
        for (int i = 0; i < categories.length; i++) {
            // servicePath: /dubbo/interfaceName
            paths[i] = toServicePath(url) + PATH_SEPARATOR + categories[i];
        }
        return paths;
    }

  事实上,其步骤与提供者是类似的:1. 获取registry实例; 2. 注册自身消费标识到consumers目录; 3. 订阅providers,configurators,routers 子目录变更;

 

6. zookeeper 服务信息通知notify

  当注册中心发现服务提供者发生了变化时,将会将该信息推送给订阅了相应路径的客户端。这个客户端首先会被前面设置的 zkListener 处理。

    // ZookeeperRegistry.doSubscribe
    @Override
    public void doSubscribe(final URL url, final NotifyListener listener) {
        ...
                List<URL> urls = new ArrayList<>();
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                    // 服务回调将由 ZookeeperRegistry.this.notify() 处理
                    ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                notify(url, listener, urls);
        ...
    }
    // org.apache.dubbo.registry.support.FailbackRegistry#notify            
    @Override
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        if (url == null) {
            throw new IllegalArgumentException("notify url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("notify listener == null");
        }
        try {
            doNotify(url, listener, urls);
        } catch (Exception t) {
            // Record a failed registration request to a failed list, retry regularly
            addFailedNotified(url, listener, urls);
            logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }
    }
    // org.apache.dubbo.registry.support.FailbackRegistry#doNotify
    protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
        super.notify(url, listener, urls);
    }
    // org.apache.dubbo.registry.support.AbstractRegistry#notify(org.apache.dubbo.common.URL, org.apache.dubbo.registry.NotifyListener, java.util.List<org.apache.dubbo.common.URL>)
    /**
     * Notify changes from the Provider side.
     *
     * @param url      consumer side url
     * @param listener listener
     * @param urls     provider latest urls
     */
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        if (url == null) {
            throw new IllegalArgumentException("notify url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("notify listener == null");
        }
        if ((CollectionUtils.isEmpty(urls))
                && !ANY_VALUE.equals(url.getServiceInterface())) {
            logger.warn("Ignore empty notify urls for subscribe url " + url);
            return;
        }
        if (logger.isInfoEnabled()) {
            logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
        }
        // keep every provider's category.
        Map<String, List<URL>> result = new HashMap<>();
        for (URL u : urls) {
            if (UrlUtils.isMatch(url, u)) {
                String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
                List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
                categoryList.add(u);
            }
        }
        if (result.size() == 0) {
            return;
        }
        Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            categoryNotified.put(category, categoryList);
            // 再交由最原始传入的 listener 进行处理, 即 RegistryDirectory.notify()
            listener.notify(categoryList);
            // We will update our cache file after each notification.
            // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
            saveProperties(url);
        }
    }
    // org.apache.dubbo.registry.integration.RegistryDirectory#notify
    @Override
    public synchronized void notify(List<URL> urls) {
        Map<String, List<URL>> categoryUrls = urls.stream()
                .filter(Objects::nonNull)
                .filter(this::isValidCategory)
                .filter(this::isNotCompatibleFor26x)
                .collect(Collectors.groupingBy(this::judgeCategory));

        List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
        this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

        List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
        // 添加到所有提供者列表中,以便在后续请求可以将最新的地址信息给consumer使用
        toRouters(routerURLs).ifPresent(this::addRouters);

        // providers
        List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
        /**
         * 3.x added for extend URL address
         */
        ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
        List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
        if (supportedListeners != null && !supportedListeners.isEmpty()) {
            for (AddressListener addressListener : supportedListeners) {
                providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
            }
        }
        refreshOverrideAndInvoker(providerURLs);
    }

  

7. zookeeper 服务解除事件订阅

  除了服务下线外,还需要解除订阅关系。以便zookeeper不用再维护其监听推送。

    // org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doUnsubscribe
    @Override
    public void doUnsubscribe(URL url, NotifyListener listener) {
        ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
        if (listeners != null) {
            ChildListener zkListener = listeners.get(listener);
            if (zkListener != null) {
                if (ANY_VALUE.equals(url.getServiceInterface())) {
                    String root = toRootPath();
                    zkClient.removeChildListener(root, zkListener);
                } else {
                    // 将路径列举出来,一个个解除监听
                    for (String path : toCategoriesPath(url)) {
                        zkClient.removeChildListener(path, zkListener);
                    }
                }
            }
        }
    }
    // 解除监听操作同样是由关闭钩子 触发的
    // org.apache.dubbo.registry.support.AbstractRegistry#destroy
    @Override
    public void destroy() {
        if (logger.isInfoEnabled()) {
            logger.info("Destroy registry:" + getUrl());
        }
        Set<URL> destroyRegistered = new HashSet<>(getRegistered());
        if (!destroyRegistered.isEmpty()) {
            for (URL url : new HashSet<>(getRegistered())) {
                if (url.getParameter(DYNAMIC_KEY, true)) {
                    try {
                        unregister(url);
                        if (logger.isInfoEnabled()) {
                            logger.info("Destroy unregister url " + url);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
        Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed());
        if (!destroySubscribed.isEmpty()) {
            for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    try {
                        // 解除监听操作
                        unsubscribe(url, listener);
                        if (logger.isInfoEnabled()) {
                            logger.info("Destroy unsubscribe url " + url);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
        AbstractRegistryFactory.removeDestroyedRegistry(this);
    }
    // org.apache.dubbo.registry.support.FailbackRegistry#unsubscribe
    @Override
    public void unsubscribe(URL url, NotifyListener listener) {
        super.unsubscribe(url, listener);
        removeFailedSubscribed(url, listener);
        try {
            // Sending a canceling subscription request to the server side
            // 调用 ZookeeperRegistry.doUnregister(), 删除路径监听
            doUnsubscribe(url, listener);
        } catch (Exception e) {
            Throwable t = e;

            // If the startup detection is opened, the Exception is thrown directly.
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true);
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to unsubscribe " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to unsubscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

            // Record a failed registration request to a failed list, retry regularly
            addFailedUnsubscribed(url, listener);
        }
    }

  和前面的下线操作意义相同,只是一个是 delete 路径,一个是解除变更监听。

 

  dubbo 中以服务为单位进行注册通知管理,避免了一个因应用提供者提供许多服务,而导致消费者必须处理大量数据的情况。各消费者只关注与自己相关的服务路径,从而最小化影响。通过临时节点和心跳机制,保证了各服务的准确体现。这是zk比较擅长的。虽然该点功能小,却意义重大。

© 版权声明

相关文章

暂无评论

您必须登录才能参与评论!
立即登录
暂无评论...