Dubbo服务暴露与注册逻辑解析
前面的文章中,我们讲解了Dubbo是如何进行配置的属性的初始化的,并且讲到,Dubbo最终会将所有的属性参数都封装为一个URL对象,从而以这个URL对象为基准传递参数。本文则主要讲解Dubbo是如何基于URL对象进行服务的暴露与注册的。
首先需要说明的一点是,服务的暴露与注册是两个不同的概念。在Dubbo中,微服务之间的交互默认是通过Netty进行的,而服务之间的通信是基于TCP以全双工的方式进行的。那么也就是说,每个服务都会存在一个ip和port。所谓的服务暴露就是指根据配置将当前服务使用Netty绑定一个本地的端口号(对于消费者而言,则是尝试连接目标服务的ip和端口)。至于注册,由于微服务架构中对于新添加的服务,需要一定的机制来通知消费者,有新的服务可用,或者对于某些下线的服务,也需要通知消费者,将这个已经下线的服务给移除。Dubbo中服务的注册与发现默认是委托给zookeeper来进行的。本文主要讲解服务的暴露与注册的整体实现结构,至于服务暴露和注册时所需要注意的详细细节,则在后面的文章中进行讲解。
1. 服务的暴露 服务的暴露的入口位置主要在RegistryProtocol.export()
方法中,该方法首先会进行服务的暴露,然后会进行服务的注册。如下是该方法的源码:
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 获取服务注册相关的配置数据
URL registryUrl = getRegistryUrl(originInvoker);
// 获取provider相关的配置数据
URL providerUrl = getProviderUrl(originInvoker);
// 对provider的部分配置信息进行覆盖,重写的工作主要是委托给Configurator进行,
// 这里OverrideListener的作用主要是在当前服务的配置信息发生更改时,对原有的配置进行重写,
// 并且会判断是否需要对当前的服务进行重新暴露
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener =
new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
// 进行服务的本地暴露,本质上就是根据配置使用Netty绑定本地的某个端口,从而完成服务暴露工作
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// 根据配置获取对应的Registry对象,常见的有ZookeeperRegistry和RedisRegistry,默认使用的是
// ZookeeperRegistry,本文则以Zookeeper为例进行讲解
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
// 将当前的Invoker对象注册到一个全局的providerInvokers中进行缓存,
// 该Map对象保存了所有的已经暴露了的服务
ProviderInvokerWrapper<T> providerInvokerWrapper =
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl,
registeredProviderUrl);
// 除非主动配置不进行注册,那么这里将会返回true
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
// 进行服务注册的代码,主要是通过Zookeeper的客户端CuratorFramework进行服务的注册
register(registryUrl, registeredProviderUrl);
// 将当前Invoker标识为已经注册完成
providerInvokerWrapper.setReg(true);
}
// 注册配置被更改的监听事件,将配置被更改时将会触发相应的listener
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
// 设置相关的URL对象,并且使用DestroyableExporter对exporter进行封装返回
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
return new DestroyableExporter<>(exporter);
}
上面的代码中,主要完成了三部分的工作:
- 将服务与本地的某个端口号进行绑定,从而实现服务暴露的功能;
- 根据配置得到一个服务注册对象
Registry
,然后对其进行注册; - 创建一个配置被重写的监听器,并且注册该监听器,从而实现配置被重写时能够动态的使用新的配置进行服务的配置。
对于服务的暴露,主要是在doLocalExport()
方法中,我们继续阅读其源码:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker,
URL providerUrl) {
// 获取当前Invoker对应的key,默认为group/interface/version的格式
String key = getCacheKey(originInvoker);
// 这一段代码看起来比较复杂,其实本质上还是protocol.export()方法的调用,该方法就是进行服务暴露的代码,
// 而ExporterChangeableWrapper的主要作用则是进行unexport()时的一些清理工作
return (ExporterChangeableWrapper<T>) bounds0.4907861203713513.com0.5042556148172979puteIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate),
originInvoker);
});
}
doLocalExport()
方法的实现比较简单,主要的导出工作还是委托给了protocol.export()
方法进行,这里的protocol
的类型为DubboProtocol
,这里我们直接看其export()
方法:
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
// 这里主要是构建Stub的事件分发器,该分发器用于在消费者端进行Stub事件的分发
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,
Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer ["
+ url.getParameter(Constants.INTERFACE_KEY)
+ "], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
// 开启服务
openServer(url);
// 该方法的主要作用是对序列化进行优化,其会获取配置的实现了SerializationOptimizer接口的配置类,
// 然后通过其getSerializableClasses()方法获取序列化类,通过这些类来进行序列化的优化
optimizeSerialization(url);
return exporter;
}
export()
方法主要做了三件事:a. 注册stub事件分发器;b. 开启服务;c. 注册序列化优化器类。这里openServer()
方法是用于开启服务的,我们继续阅读其源码:
private void openServer(URL url) {
String key = url.getAddress();
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
// 这里采用双检查法来判断对应于当前服务的server是否已经创建,如果没有创建,
// 则创建一个新的,并且缓存起来
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
// 创建并缓存新服务
serverMap.put(key, createServer(url));
}
}
} else {
server.reset(url);
}
}
}
private ExchangeServer createServer(URL url) {
url = URLBuilder.from(url)
.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
.addParameterIfAbsent(Constants.HEARTBEAT_KEY,
String.valueOf(Constants.DEFAULT_HEARTBEAT))
.addParameter(Constants.CODEC_KEY, DubboCodec.NAME)
.build();
// 获取所使用的server类型,默认为netty
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0
&& !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
// 通过Exchangers.bind()方法进行服务的绑定
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
// 获取client参数所指定的值,该值指定了当前client所使用的传输层服务,比如netty或mina。
// 然后判断当前SPI所提供的传输层服务是否包含所指定的服务类型,如果不包含,则抛出异常
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class)
.getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
上面的代码主要是创建ExchangeServer
的,使用双检查来检测是否已经存在了对应的服务,如果不存在,则通过Exchangers.bind()
方法进行创建。这里最终会将bind()
方法的调用委托给HeaderExchanger.bind()
方法进行。需要注意的是,上面的代码中传入了一个requestHandler
的参数,这是一个ExchangeHandler
类型的对象,其主要作用是获取并且调用Invoker
,以得到最终的调用结果,这些Handler
的作用,我们将在后面的文章中进行讲解,本文主要讲解服务的暴露与注册的过程。下面我们继续阅读HeaderExchanger.bind()
方法的源码:
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(
new HeaderExchangeHandler(handler))));
}
这里的bind()
方法主要是创建了三个Handler
,并且最后一个Handler
将传入的ExchangeHandler
包裹起来了。相信读者朋友应该很快就能认识到,这里使用的是责任链模式,这几个handler通过统一的构造函数将下一个handler的实例注入到当前handler中。其实我们也就能够理解,最终通过netty进行的调用过程就是基于这些责任链的。这里我们主要看Transporters.bind()
方法的实现原理:
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
// 判断传入的Handler是否只有一个,如果只有一个,则直接使用该handler,如果存在多个,
// 则使用ChannelHandlerDispatcher将这些handler包裹起来进行分发
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
// 通过配置指定的Transporter进行服务的绑定,这里默认使用的是NettyTransporter
return getTransporter().bind(url, handler);
}
// NettyTransporter
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
// 在NettyTransporter中进行服务绑定时,其只是创建了一个NettyServer以返回,但实际上在创建该对象的
// 过程中,就完成了Netty服务的绑定。需要注意的是,这里的NettyServer并不是Netty所提供的类,而是
// Dubbo自己封装的一个服务类,其对Netty的服务进行了封装
return new NettyServer(url, listener);
}
Transporters.bind()
方法主要是将服务的绑定过程交由NettyTransporter
进行,而其则是创建了一个NettyServer
对象,真正的绑定过程就在创建该对象的过程中。下面我们来看其创建的源码:
// AbstractServer
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
// 获取绑定的ip和端口号等信息
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp))
{
bindIp = Constants.ANYHOST_VALUE;
}
// 在本地绑定指定的ip和端口
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY,
Constants.DEFAULT_IDLE_TIMEOUT);
try {
// 通过创建的InetSocketAddress对象,将真正的绑定过程交由子类进行
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress()
+ ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind "
+ getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: "
+ t.getMessage(), t);
}
// 这里的DataStore只是一个本地缓存的数据仓库,主要是对一些大对象进行缓存
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY,
Integer.toString(url.getPort()));
}
// NettyServer
@Override
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
// 这里就进入了创建netty服务的过程,bossGroup指定的线程数为1,因为只有一个channel用于接收客户端请求,
// 而workerGroup线程数则指定为配置文件所设置的线程数,这些线程主要用于进行请求的处理
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(
Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
// 创建NettyServerHandler,这个handler就是用于处理请求用的handler,但是前面我们也讲到了,
// Dubbo使用了一个handler的责任链来进行消息的处理,第二个参数this就是这个链的链头。需要注意的是,
// Netty本身提供的责任链与Dubbo这里使用的责任链是不同的,Dubbo只是使用了Netty的链的一个节点来
// 处理Dubbo所创建的链,这样Dubbo的链其实是可以在多种服务复用的,比如Mina
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
// 这里是标准的创建Netty的BootstrapServer的过程
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(),
NettyServer.this);
ch.pipeline()
// 添加用于解码的handler
.addLast("decoder", adapter.getDecoder())
// 添加用于编码的handler
.addLast("encoder", adapter.getEncoder())
// 添加用于进行心跳监测的handler
.addLast("server-idle-handler", new IdleStateHandler(0, 0,
idleTimeout, MILLISECONDS))
// 将处理请求的handler添加到pipeline中
.addLast("handler", nettyServerHandler);
}
});
// 进行服务的绑定
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
上面的代码就是一个标准的使用Netty进行服务绑定的代码,关于Netty的使用,读者朋友可以阅读Netty Reactor模式实现原理详解。
2. 服务的注册 对于服务的注册,前面我们已经讲到,入口主要在RegistryProtocol.export()
方法中,而调用入口则是通过其register()
方法进行的,这里我们来看一下该方法的调用过程:
public void register(URL registryUrl, URL registeredProviderUrl) {
// 通过RegistryFactory获取一个Registry对象,该对象的主要作用是进行服务的注册,
// 这里默认返回的是ZookeeperRegistry
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
}
这里主要是根据配置获取一个Registry
对象,我们继续阅读其register()
方法的源码:
// FailbackRegistry
@Override
public void register(URL url) {
// 将当前URL对象保存到已注册的URL对象列表中
super.register(url);
// 移除之前注册失败的记录
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// 将真正的注册过程委托给ZookeeperRegistry进行
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// 下面的过程主要是在注册失败的情况下,将当前URL添加到注册失败的URL列表中
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry "
+ getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: "
+ t.getMessage(), t);
}
// 将当前URL添加到注册失败的URL列表中
addFailedRegistered(url);
}
}
// ZookeeperRegistry
@Override
public void doRegister(URL url) {
try {
// 这里是真正的注册过程。需要注意的是这里的zkClient类型为ZookeeperClient,其是Dubbo对
// 真正使用的CuratorFramework的一个封装
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl()
+ ", cause: " + e.getMessage(), e);
}
}
上面的代码中首先会对一些缓存数据进行清理,并且将当前URL添加到注册的URL列表中,然后将注册过程委托给ZookeeperClient进行。下面我们来看其是如何进行注册的:
@Override
public void create(String path, boolean ephemeral) {
// 判断创建的是否为临时节点,如果不是临时节点,则判断是否已经存在该节点,如果存在,则直接返回
if (!ephemeral) {
if (checkExists(path)) {
return;
}
}
// 对path进行截取,因为最后一个"/"后面是被编码的URL对象,前面则是serviceKey + category
// 这里的category指定的是provider还是consumer
int i = path.lastIndexOf('/');
if (i > 0) {
// 创建节点,需要注意的是,这里的create()方法进行的是递归调用,这是因为zookeeper创建节点时
// 只能一级一级的创建,因而其每次都是取"/"前面的一部分来创建,只有当前节点已经存在的情况下,
// 上面的checkExists()才会为true,而且这里,由于zookeeper规定,除了叶节点以外,其余所有的
// 节点都必须为非临时节点,因而这里第二个参数传入的是false,这也是前面的if判断能通过的原因
create(path.substring(0, i), false);
}
if (ephemeral) {
// 创建临时节点,具体的创建工作交由子类进行,也就是下面的代码
createEphemeral(path);
} else {
// 创建持久节点,具体的创建工作交由子类进行,也就是下面的代码
createPersistent(path);
}
}
@Override
public void createEphemeral(String path) {
try {
// 将临时节点的创建工作交由CuratorFramework进行
client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
} catch (NodeExistsException e) {
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
public void createPersistent(String path) {
try {
// 将持久节点的创建工作交由CuratorFramework进行
client.create().forPath(path);
} catch (NodeExistsException e) {
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
3. 小结
本文主要讲解了Dubbo在导出服务时是如何进行服务暴露与注册的,并且具体讲解了如何基于netty进行服务的暴露,和如何基于zookeeper进行服务的注册。
- 上一篇
Pixel-Anchor文本检测逻辑学习
文本检测是深度学习中一项非常重要的应用,在前面的文章中已经介绍过了很多文本检测的方法,包括CTPN(详见文章:大话文本检测经典模型CTPN)、SegLink(详见文章:大话文本检测经典模型SegLink)、EAST(详见文章...
- 下一篇
tio-websocket-spring-boot-starter如何使用?!
引言 T-io网络通讯框架开源之后受到许多同学的喜欢,但是对于使用Spring系列技术的同学用起来稍许不适。于是乎抽时间写了个 starter,很荣幸代码被作者采纳,正式入驻T-io家族包。 tio-websocket-server tio-webso..