更新時(shí)間:2022-09-19 來(lái)源:黑馬程序員 瀏覽量:
概述
dubbo是一個(gè)簡(jiǎn)單易用的RPC框架,通過(guò)簡(jiǎn)單的提供者,消費(fèi)者配置就能完成無(wú)感的網(wǎng)絡(luò)調(diào)用。那么在dubbo中是如何將提供者的服務(wù)暴露出去,消費(fèi)者又是如何獲取到提供者相關(guān)信息的呢?這就是本章我們要討論的內(nèi)容。
dubbo與spring的整合
在了解dubbo的服務(wù)注冊(cè)和服務(wù)發(fā)現(xiàn)之前,我們首先需要掌握一個(gè)知識(shí)點(diǎn):Spring中自定義Schema。
Spring自定義Schema
Dubbo 現(xiàn)在的設(shè)計(jì)是完全無(wú)侵入,也就是使用者只依賴于配置契約。在 Dubbo 中,可以使用 XML 配置相關(guān)信息,也可以用來(lái)引入服務(wù)或者導(dǎo)出服務(wù)。配置完成,啟動(dòng)工程,Spring 會(huì)讀取配置文件,生成注入相關(guān)Bean。那 Dubbo 如何實(shí)現(xiàn)自定義 XML 被 Spring 加載讀取呢?
從 Spring 2.0 開(kāi)始,Spring 開(kāi)始提供了一種基于 XML Schema 格式擴(kuò)展機(jī)制,用于定義和配置 bean。
入門案例
學(xué)習(xí)和使用Spring XML Schema 擴(kuò)展機(jī)制并不難,需要下面幾個(gè)步驟:
1. 創(chuàng)建配置屬性的JavaBean對(duì)象
2. 創(chuàng)建一個(gè) XML Schema 文件,描述自定義的合法構(gòu)建模塊,也就是xsd文件。
3. 自定義處理器類,并實(shí)現(xiàn)`NamespaceHandler`接口。
4. 自定義解析器,實(shí)現(xiàn)`BeanDefinitionParser`接口(最關(guān)鍵的部分)。
5. 編寫(xiě)Spring.handlers和Spring.schemas文件配置所有部件
定義JavaBean對(duì)象,在spring中此對(duì)象會(huì)根據(jù)配置自動(dòng)創(chuàng)建
public class User {
private String id;
private String name;
private Integer age;
//省略getter setter方法
}
在META-INF下定義`user.xsd`文件,使用xsd用于描述標(biāo)簽的規(guī)則
<?xml version="1.0" encoding="UTF-8"?>
<xsd:schema
xmlns="http://low-budgetmovie.com/schema/user"
xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:beans="http://www.springframework.org/schema/beans"
targetNamespace="http://low-budgetmovie.com/schema/user"
elementFormDefault="qualified"
attributeFormDefault="unqualified">
<xsd:import namespace="http://www.springframework.org/schema/beans" />
<xsd:element name="user">
<xsd:complexType>
<xsd:complexContent>
<xsd:extension base="beans:identifiedType">
<xsd:attribute name="name" type="xsd:string" />
<xsd:attribute name="age" type="xsd:int" />
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
</xsd:element>
</xsd:schema>
Spring讀取xml文件時(shí),會(huì)根據(jù)標(biāo)簽的命名空間找到其對(duì)應(yīng)的NamespaceHandler,我們?cè)贜amespaceHandler內(nèi)會(huì)注冊(cè)標(biāo)簽對(duì)應(yīng)的解析器BeanDefinitionParser。
package com.itheima.schema;
import org.springframework.beans.factory.xml.NamespaceHandlerSupport;
public class UserNamespaceHandler extends NamespaceHandlerSupport {
public void init() {
registerBeanDefinitionParser("user", new UserBeanDefinitionParser());
}
}
BeanDefinitionParser是標(biāo)簽對(duì)應(yīng)的解析器,Spring讀取到對(duì)應(yīng)標(biāo)簽時(shí)會(huì)使用該類進(jìn)行解析;
public class UserBeanDefinitionParser extends
AbstractSingleBeanDefinitionParser {
protected Class getBeanClass(Element element) {
return User.class;
}
protected void doParse(Element element, BeanDefinitionBuilder bean) {
String name = element.getAttribute("name");
String age = element.getAttribute("age");
String id = element.getAttribute("id");
if (StringUtils.hasText(id)) {
bean.addPropertyValue("id", id);
}
if (StringUtils.hasText(name)) {
bean.addPropertyValue("name", name);
}
if (StringUtils.hasText(age)) {
bean.addPropertyValue("age", Integer.valueOf(age));
}
}
}
定義spring.handlers文件,內(nèi)部保存命名空間與NamespaceHandler類的對(duì)應(yīng)關(guān)系;必須放在classpath下的META-INF文件夾中。
```proprties
http\://low-budgetmovie.com/schema/user=com.itheima.schema.UserNamespaceHandler
```
定義spring.schemas文件,內(nèi)部保存命名空間對(duì)應(yīng)的xsd文件位置;必須放在classpath下的META-INF文件夾中。
http\://low-budgetmovie.com/schema/user.xsd=META-INF/user.xsd
代碼準(zhǔn)備好了之后,就可以在spring工程中進(jìn)行使用和測(cè)試,定義spring配置文件,導(dǎo)入對(duì)應(yīng)約束。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:itheima="http://low-budgetmovie.com/schema/user"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
http://low-budgetmovie.com/schema/user http://low-budgetmovie.com/schema/user.xsd">
<itheima:user id="user" name="zhangsan" age="12"></itheima:user>
</beans>
編寫(xiě)測(cè)試類,通過(guò)spring容器獲取對(duì)象user
public class SchemaDemo {
public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext("/spring/applicationContext.xml");
User user = (User)ctx.getBean("user");
System.out.println(user);
}
}
dubbo中的相關(guān)對(duì)象
Dubbo是運(yùn)行在spring容器中,dubbo的配置文件也是通過(guò)spring的配置文件applicationContext.xml來(lái)加載,所以dubbo的自定義配置標(biāo)簽實(shí)現(xiàn),其實(shí)同樣依賴spring的xml schema機(jī)制
可以看出Dubbo所有的組件都是由`DubboBeanDefinitionParser`解析,并通過(guò)registerBeanDefinitionParser方法來(lái)注冊(cè)到spring中最后解析對(duì)應(yīng)的對(duì)象。這些對(duì)象中我們重點(diǎn)關(guān)注的有以下兩個(gè):
ServiceBean:服務(wù)提供者暴露服務(wù)的核心對(duì)象
ReferenceBean:服務(wù)消費(fèi)者發(fā)現(xiàn)服務(wù)的核心對(duì)象
RegistryConfig:定義注冊(cè)中心的核心配置對(duì)象
服務(wù)暴露
前面主要探討了 Dubbo 中 schema 、 XML 的相關(guān)原理 , 這些內(nèi)容對(duì)理解框架整體至關(guān)重要 , 在此基礎(chǔ)上我們繼續(xù)探討服務(wù)是如何依靠前面的配置進(jìn)行服務(wù)暴露。
名詞解釋
在 Dubbo 的核心領(lǐng)域模型中:
Invoker 是實(shí)體域,它是 Dubbo 的核心模型,其它模型都向它靠擾,或轉(zhuǎn)換成它,它代表一個(gè)可執(zhí)行體,可向它發(fā)起 invoke 調(diào)用,它有可能是一個(gè)本地的實(shí)現(xiàn),也可能是一個(gè)遠(yuǎn)程的實(shí)現(xiàn),也可能一個(gè)集群實(shí)現(xiàn)。在服務(wù)提供方,Invoker用于調(diào)用服務(wù)提供類。在服務(wù)消費(fèi)方,Invoker用于執(zhí)行遠(yuǎn)程調(diào)用。
- Protocol 是服務(wù)域,它是 Invoker 暴露和引用的主功能入口,它負(fù)責(zé) Invoker 的生命周期管理。
- export:暴露遠(yuǎn)程服務(wù)
- refer:引用遠(yuǎn)程服務(wù)
- proxyFactory:獲取一個(gè)接口的代理類
- getInvoker:針對(duì)server端,將服務(wù)對(duì)象,如DemoServiceImpl包裝成一個(gè)Invoker對(duì)象
- getProxy:針對(duì)client端,創(chuàng)建接口的代理對(duì)象,例如DemoService的接口。
- Invocation 是會(huì)話域,它持有調(diào)用過(guò)程中的變量,比如方法名,參數(shù)等
整體流程
在詳細(xì)探討服務(wù)暴露細(xì)節(jié)之前 , 我們先看一下整體duubo的服務(wù)暴露原理
在整體上看,Dubbo 框架做服務(wù)暴露分為兩大部分 , 第一步將持有的服務(wù)實(shí)例通過(guò)代理轉(zhuǎn)換成 Invoker, 第二步會(huì)把 Invoker 通過(guò)具體的協(xié)議 ( 比如 Dubbo ) 轉(zhuǎn)換成 Exporter, 框架做了這層抽象也大大方便了功能擴(kuò)展 。
服務(wù)提供方暴露服務(wù)的藍(lán)色初始化鏈,時(shí)序圖如下:
源碼分析
(1) 導(dǎo)出入口
服務(wù)導(dǎo)出的入口方法是 ServiceBean 的 onApplicationEvent。onApplicationEvent 是一個(gè)事件響應(yīng)方法,該方法會(huì)在收到 Spring 上下文刷新事件后執(zhí)行服務(wù)導(dǎo)出操作。方法代碼如下:
public void onApplicationEvent(ContextRefreshedEvent event) {
// 是否有延遲導(dǎo)出 && 是否已導(dǎo)出 && 是不是已被取消導(dǎo)出
if (isDelay() && !isExported() && !isUnexported()) {
// 導(dǎo)出服務(wù)
export();
}
}
onApplicationEvent 方法在經(jīng)過(guò)一些判斷后,會(huì)決定是否調(diào)用 export 方法導(dǎo)出服務(wù)。在export 根據(jù)配置執(zhí)行相應(yīng)的動(dòng)作。最終進(jìn)入到doExportUrls導(dǎo)出服務(wù)方法。
private void doExportUrls() {
// 加載注冊(cè)中心鏈接
List<URL> registryURLs = loadRegistries(true);
// 遍歷 protocols,并在每個(gè)協(xié)議下導(dǎo)出服務(wù)
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
關(guān)于多協(xié)議多注冊(cè)中心導(dǎo)出服務(wù)首先是根據(jù)配置,以及其他一些信息組裝 URL。前面說(shuō)過(guò),URL 是 Dubbo 配置的載體,通過(guò) URL 可讓 Dubbo 的各種配置在各個(gè)模塊之間傳遞。
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
// 如果協(xié)議名為空,或空串,則將協(xié)議名變量設(shè)置為 dubbo
if (name == null || name.length() == 0) {
name = "dubbo";
}
Map<String, String> map = new HashMap<String, String>();
//略
// 獲取上下文路徑
String contextPath = protocolConfig.getContextpath();
if ((contextPath == null || contextPath.length() == 0) && provider != null) {
contextPath = provider.getContextpath();
}
// 獲取 host 和 port
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
// 組裝 URL
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
// 省略無(wú)關(guān)代碼
}
上面的代碼首先是將一些信息,比如版本、時(shí)間戳、方法名以及各種配置對(duì)象的字段信息放入到 map 中,最后將 map 和主機(jī)名等數(shù)據(jù)傳給 URL 構(gòu)造方法創(chuàng)建 URL 對(duì)象。前置工作做完,接下來(lái)就可以進(jìn)行服務(wù)導(dǎo)出了。服務(wù)導(dǎo)出分為導(dǎo)出到本地 (JVM),和導(dǎo)出到遠(yuǎn)程。在深入分析服務(wù)導(dǎo)出的源碼前,我們先來(lái)從宏觀層面上看一下服務(wù)導(dǎo)出邏輯。如下:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
// 省略無(wú)關(guān)代碼
String scope = url.getParameter(Constants.SCOPE_KEY);
// 如果 scope = none,則什么都不做
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
// scope != remote,導(dǎo)出到本地
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
// scope != local,導(dǎo)出到遠(yuǎn)程
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
//省略無(wú)關(guān)代碼
// 為服務(wù)提供類(ref)生成 Invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
// DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 導(dǎo)出服務(wù),并生成 Exporter
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
// 不存在注冊(cè)中心,僅導(dǎo)出服務(wù)
} else {
//略
}
}
}
this.urls.add(url);
}
上面代碼根據(jù) url 中的 scope 參數(shù)決定服務(wù)導(dǎo)出方式,分別如下:
- scope = none,不導(dǎo)出服務(wù)
- scope != remote,導(dǎo)出到本地
- scope != local,導(dǎo)出到遠(yuǎn)程
不管是導(dǎo)出到本地,還是遠(yuǎn)程。進(jìn)行服務(wù)導(dǎo)出之前,均需要先創(chuàng)建 Invoker,這是一個(gè)很重要的步驟。因此下面先來(lái)分析 Invoker 的創(chuàng)建過(guò)程。Invoker 是由 ProxyFactory 創(chuàng)建而來(lái),Dubbo 默認(rèn)的 ProxyFactory 實(shí)現(xiàn)類是 JavassistProxyFactory。下面我們到 JavassistProxyFactory 代碼中,探索 Invoker 的創(chuàng)建過(guò)程。如下:
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// 為目標(biāo)類創(chuàng)建 Wrapper
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// 創(chuàng)建匿名 Invoker 類對(duì)象,并實(shí)現(xiàn) doInvoke 方法。
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// 調(diào)用 Wrapper 的 invokeMethod 方法,invokeMethod 最終會(huì)調(diào)用目標(biāo)方法
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
如上,JavassistProxyFactory 創(chuàng)建了一個(gè)繼承自 AbstractProxyInvoker 類的匿名對(duì)象,并覆寫(xiě)了抽象方法 doInvoke。
(2) 導(dǎo)出服務(wù)到本地
Invoke創(chuàng)建成功之后,接下來(lái)我們來(lái)看本地導(dǎo)出
private void exportLocal(URL url) {
// 如果 URL 的協(xié)議頭等于 injvm,說(shuō)明已經(jīng)導(dǎo)出到本地了,無(wú)需再次導(dǎo)出
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL) // 設(shè)置協(xié)議頭為 injvm
.setHost(LOCALHOST)
.setPort(0);
ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
// 創(chuàng)建 Invoker,并導(dǎo)出服務(wù),這里的 protocol 會(huì)在運(yùn)行時(shí)調(diào)用 InjvmProtocol 的 export 方法
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
}
}
exportLocal 方法比較簡(jiǎn)單,首先根據(jù) URL 協(xié)議頭決定是否導(dǎo)出服務(wù)。若需導(dǎo)出,則創(chuàng)建一個(gè)新的 URL 并將協(xié)議頭、主機(jī)名以及端口設(shè)置成新的值。然后創(chuàng)建 Invoker,并調(diào)用 InjvmProtocol 的 export 方法導(dǎo)出服務(wù)。下面我們來(lái)看一下 InjvmProtocol 的 export 方法都做了哪些事情。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 創(chuàng)建 InjvmExporter
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
如上,InjvmProtocol 的 export 方法僅創(chuàng)建了一個(gè) InjvmExporter,無(wú)其他邏輯。到此導(dǎo)出服務(wù)到本地就分析完了。
(3) 導(dǎo)出服務(wù)到遠(yuǎn)程
接下來(lái),我們繼續(xù)分析導(dǎo)出服務(wù)到遠(yuǎn)程的過(guò)程。導(dǎo)出服務(wù)到遠(yuǎn)程包含了服務(wù)導(dǎo)出與服務(wù)注冊(cè)兩個(gè)過(guò)程。先來(lái)分析服務(wù)導(dǎo)出邏輯。我們把目光移動(dòng)到 RegistryProtocol 的 export 方法上。
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 導(dǎo)出服務(wù)
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
// 獲取注冊(cè)中心 URL
URL registryUrl = getRegistryUrl(originInvoker);
// 根據(jù) URL 加載 Registry 實(shí)現(xiàn)類,比如 ZookeeperRegistry
final Registry registry = getRegistry(originInvoker);
// 獲取已注冊(cè)的服務(wù)提供者 URL,比如:
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
// 獲取 register 參數(shù)
boolean register = registeredProviderUrl.getParameter("register", true);
// 向服務(wù)提供者與消費(fèi)者注冊(cè)表中注冊(cè)服務(wù)提供者
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
// 根據(jù) register 的值決定是否注冊(cè)服務(wù)
if (register) {
// 向注冊(cè)中心注冊(cè)服務(wù)
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// 獲取訂閱 URL,比如:
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
// 創(chuàng)建監(jiān)聽(tīng)器
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 向注冊(cè)中心進(jìn)行訂閱 override 數(shù)據(jù)
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
// 創(chuàng)建并返回 DestroyableExporter
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
上面代碼看起來(lái)比較復(fù)雜,主要做如下一些操作:
1. 調(diào)用 doLocalExport 導(dǎo)出服務(wù)
2. 向注冊(cè)中心注冊(cè)服務(wù)
3. 向注冊(cè)中心進(jìn)行訂閱 override 數(shù)據(jù)
4. 創(chuàng)建并返回 DestroyableExporter
下面先來(lái)分析 doLocalExport 方法的邏輯,如下:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
// 訪問(wèn)緩存
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
// 創(chuàng)建 Invoker 為委托類對(duì)象
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
// 調(diào)用 protocol 的 export 方法導(dǎo)出服務(wù)
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
// 寫(xiě)緩存
bounds.put(key, exporter);
}
}
}
return exporter;
}
接下來(lái),我們把重點(diǎn)放在 Protocol 的 export 方法上。假設(shè)運(yùn)行時(shí)協(xié)議為 dubbo,此處的 protocol 變量會(huì)在運(yùn)行時(shí)加載 DubboProtocol,并調(diào)用 DubboProtocol 的 export 方法。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// 獲取服務(wù)標(biāo)識(shí),理解成服務(wù)坐標(biāo)也行。由服務(wù)組名,服務(wù)名,服務(wù)版本號(hào)以及端口組成。比如:
// demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
String key = serviceKey(url);
// 創(chuàng)建 DubboExporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
// 將 <key, exporter> 鍵值對(duì)放入緩存中
exporterMap.put(key, exporter);
//省略無(wú)關(guān)代碼
// 啟動(dòng)服務(wù)器
openServer(url);
// 優(yōu)化序列化
optimizeSerialization(url);
return exporter;
}
(4) 開(kāi)啟Netty服務(wù)
如上,我們重點(diǎn)關(guān)注 DubboExporter 的創(chuàng)建以及 openServer 方法,其他邏輯看不懂也沒(méi)關(guān)系,不影響理解服務(wù)導(dǎo)出過(guò)程。下面分析 openServer 方法。
private void openServer(URL url) {
// 獲取 host:port,并將其作為服務(wù)器實(shí)例的 key,用于標(biāo)識(shí)當(dāng)前的服務(wù)器實(shí)例
String key = url.getAddress();
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
// 訪問(wèn)緩存
ExchangeServer server = serverMap.get(key);
if (server == null) {
// 創(chuàng)建服務(wù)器實(shí)例
serverMap.put(key, createServer(url));
} else {
// 服務(wù)器已創(chuàng)建,則根據(jù) url 中的配置重置服務(wù)器
server.reset(url);
}
}
}
接下來(lái)分析服務(wù)器實(shí)例的創(chuàng)建過(guò)程。如下:
private ExchangeServer createServer(URL url) {
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,
// 添加心跳檢測(cè)配置到 url 中
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// 獲取 server 參數(shù),默認(rèn)為 netty
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
// 通過(guò) SPI 檢測(cè)是否存在 server 參數(shù)所代表的 Transporter 拓展,不存在則拋出異常
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
// 添加編碼解碼器參數(shù)
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
// 創(chuàng)建 ExchangeServer
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server...");
}
// 獲取 client 參數(shù),可指定 netty,mina
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
// 獲取所有的 Transporter 實(shí)現(xiàn)類名稱集合,比如 supportedTypes = [netty, mina]
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
// 檢測(cè)當(dāng)前 Dubbo 所支持的 Transporter 實(shí)現(xiàn)類名稱列表中,
// 是否包含 client 所表示的 Transporter,若不包含,則拋出異常
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type...");
}
}
return server;
}
如上,createServer 包含三個(gè)核心的邏輯。第一是檢測(cè)是否存在 server 參數(shù)所代表的 Transporter 拓展,不存在則拋出異常。第二是創(chuàng)建服務(wù)器實(shí)例。第三是檢測(cè)是否支持 client 參數(shù)所表示的 Transporter 拓展,不存在也是拋出異常。兩次檢測(cè)操作所對(duì)應(yīng)的代碼比較直白了,無(wú)需多說(shuō)。但創(chuàng)建服務(wù)器的操作目前還不是很清晰,我們繼續(xù)往下看。
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 獲取 Exchanger,默認(rèn)為 HeaderExchanger。
// 緊接著調(diào)用 HeaderExchanger 的 bind 方法創(chuàng)建 ExchangeServer 實(shí)例
return getExchanger(url).bind(url, handler);
}
上面代碼比較簡(jiǎn)單,就不多說(shuō)了。下面看一下 HeaderExchanger 的 bind 方法。
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
// 創(chuàng)建 HeaderExchangeServer 實(shí)例,該方法包含了多個(gè)邏輯,分別如下:
// 1. new HeaderExchangeHandler(handler)
// 2. new DecodeHandler(new HeaderExchangeHandler(handler))
// 3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
HeaderExchanger 的 bind 方法包含的邏輯比較多,但目前我們僅需關(guān)心 Transporters 的 bind 方法邏輯即可。該方法的代碼如下:
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 handlers 元素?cái)?shù)量大于1,則創(chuàng)建 ChannelHandler 分發(fā)器
handler = new ChannelHandlerDispatcher(handlers);
}
// 獲取自適應(yīng) Transporter 實(shí)例,并調(diào)用實(shí)例方法
return getTransporter().bind(url, handler);
}
如上,getTransporter() 方法獲取的 Transporter 是在運(yùn)行時(shí)動(dòng)態(tài)創(chuàng)建的,類名為 TransporterAdaptive,也就是自適應(yīng)拓展類。TransporterAdaptive 會(huì)在運(yùn)行時(shí)根據(jù)傳入的 URL 參數(shù)決定加載什么類型的 Transporter,默認(rèn)為 NettyTransporter。調(diào)用`NettyTransporter.bind(URL, ChannelHandler)`方法。創(chuàng)建一個(gè)`NettyServer`實(shí)例。調(diào)用`NettyServer.doOPen()`方法,服務(wù)器被開(kāi)啟,服務(wù)也被暴露出來(lái)了。
(5) 服務(wù)注冊(cè)
本節(jié)內(nèi)容以 Zookeeper 注冊(cè)中心作為分析目標(biāo),其他類型注冊(cè)中心大家可自行分析。下面從服務(wù)注冊(cè)的入口方法開(kāi)始分析,我們把目光再次移到 RegistryProtocol 的 export 方法上。如下:
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// ${導(dǎo)出服務(wù)}
// 省略其他代碼
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
// 注冊(cè)服務(wù)
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 訂閱 override 數(shù)據(jù)
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
// 省略部分代碼
}
RegistryProtocol 的 export 方法包含了服務(wù)導(dǎo)出,注冊(cè),以及數(shù)據(jù)訂閱等邏輯。其中服務(wù)導(dǎo)出邏輯上一節(jié)已經(jīng)分析過(guò)了,本節(jié)將分析服務(wù)注冊(cè)邏輯,相關(guān)代碼如下:
public void register(URL registryUrl, URL registedProviderUrl) {
// 獲取 Registry
Registry registry = registryFactory.getRegistry(registryUrl);
// 注冊(cè)服務(wù)
registry.register(registedProviderUrl);
}
register 方法包含兩步操作,第一步是獲取注冊(cè)中心實(shí)例,第二步是向注冊(cè)中心注冊(cè)服務(wù)。接下來(lái)分兩節(jié)內(nèi)容對(duì)這兩步操作進(jìn)行分析。
這里以 Zookeeper 注冊(cè)中心為例進(jìn)行分析。下面先來(lái)看一下 getRegistry 方法的源碼,這個(gè)方法由 AbstractRegistryFactory 實(shí)現(xiàn)。如下:
public Registry getRegistry(URL url) {
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceString();
LOCK.lock();
try {
// 訪問(wèn)緩存
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
// 緩存未命中,創(chuàng)建 Registry 實(shí)例
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry...");
}
// 寫(xiě)入緩存
REGISTRIES.put(key, registry);
return registry;
} finally {
LOCK.unlock();
}
}
protected abstract Registry createRegistry(URL url);
如上,getRegistry 方法先訪問(wèn)緩存,緩存未命中則調(diào)用 createRegistry 創(chuàng)建 Registry。在此方法中就是通過(guò)`new ZookeeperRegistry(url, zookeeperTransporter)`實(shí)例化一個(gè)注冊(cè)中心
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 獲取組名,默認(rèn)為 dubbo
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
// group = "/" + group
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
// 創(chuàng)建 Zookeeper 客戶端,默認(rèn)為 CuratorZookeeperTransporter
zkClient = zookeeperTransporter.connect(url);
// 添加狀態(tài)監(jiān)聽(tīng)器
zkClient.addStateListener(new StateListener() {
@Override
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}
在上面的代碼代碼中,我們重點(diǎn)關(guān)注 ZookeeperTransporter 的 connect 方法調(diào)用,這個(gè)方法用于創(chuàng)建 Zookeeper 客戶端。創(chuàng)建好 Zookeeper 客戶端,意味著注冊(cè)中心的創(chuàng)建過(guò)程就結(jié)束了。接下來(lái),再來(lái)分析一下 Zookeeper 客戶端的創(chuàng)建過(guò)程。
public ZookeeperClient connect(URL url) {
// 創(chuàng)建 CuratorZookeeperClient
return new CuratorZookeeperClient(url);
}
繼續(xù)向下看。
public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> {
private final CuratorFramework client;
public CuratorZookeeperClient(URL url) {
super(url);
try {
// 創(chuàng)建 CuratorFramework 構(gòu)造器
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(url.getBackupAddress())
.retryPolicy(new RetryNTimes(1, 1000))
.connectionTimeoutMs(5000);
String authority = url.getAuthority();
if (authority != null && authority.length() > 0) {
builder = builder.authorization("digest", authority.getBytes());
}
// 構(gòu)建 CuratorFramework 實(shí)例
client = builder.build();
//省略無(wú)關(guān)代碼
// 啟動(dòng)客戶端
client.start();
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
CuratorZookeeperClient 構(gòu)造方法主要用于創(chuàng)建和啟動(dòng) CuratorFramework 實(shí)例。至此Zookeeper客戶端就已經(jīng)啟動(dòng)了
下面我們將 Dubbo 的 demo 跑起來(lái),然后通過(guò) Zookeeper 可視化客戶端 [ZooInspector](https://github.com/apache/zookeeper/tree/b79af153d0f98a4f3f3516910ed47234d7b3d74e/src/contrib/zooinspector) 查看節(jié)點(diǎn)數(shù)據(jù)。如下:

從上圖中可以看到DemoService 這個(gè)服務(wù)對(duì)應(yīng)的配置信息最終被注冊(cè)到了zookeeper節(jié)點(diǎn)下。搞懂了服務(wù)注冊(cè)的本質(zhì),那么接下來(lái)我們就可以去閱讀服務(wù)注冊(cè)的代碼了。
protected void doRegister(URL url) {
try {
// 通過(guò) Zookeeper 客戶端創(chuàng)建節(jié)點(diǎn),節(jié)點(diǎn)路徑由 toUrlPath 方法生成,路徑格式如下:
// /${group}/${serviceInterface}/providers/${url}
// 比如
// /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register...");
}
}
如上,ZookeeperRegistry 在 doRegister 中調(diào)用了 Zookeeper 客戶端創(chuàng)建服務(wù)節(jié)點(diǎn)。節(jié)點(diǎn)路徑由 toUrlPath 方法生成,該方法邏輯不難理解,就不分析了。接下來(lái)分析 create 方法,如下:
public void create(String path, boolean ephemeral) {
if (!ephemeral) {
// 如果要?jiǎng)?chuàng)建的節(jié)點(diǎn)類型非臨時(shí)節(jié)點(diǎn),那么這里要檢測(cè)節(jié)點(diǎn)是否存在
if (checkExists(path)) {
return;
}
}
int i = path.lastIndexOf('/');
if (i > 0) {
// 遞歸創(chuàng)建上一級(jí)路徑
create(path.substring(0, i), false);
}
// 根據(jù) ephemeral 的值創(chuàng)建臨時(shí)或持久節(jié)點(diǎn)
if (ephemeral) {
createEphemeral(path);
} else {
createPersistent(path);
}
}
好了,到此關(guān)于服務(wù)注冊(cè)的過(guò)程就分析完了。整個(gè)過(guò)程可簡(jiǎn)單總結(jié)為:先創(chuàng)建注冊(cè)中心實(shí)例,之后再通過(guò)注冊(cè)中心實(shí)例注冊(cè)服務(wù)。
總結(jié)
1. 在有注冊(cè)中心,需要注冊(cè)提供者地址的情況下,ServiceConfig 解析出的 URL 格式為:`registry:// registry-host/org.apache.dubbo.registry.RegistryService?export=URL.encode("dubbo://service-host/{服務(wù)名}/{版本號(hào)}")`
2. 基于 Dubbo SPI 的自適應(yīng)機(jī)制,通過(guò) URL `registry://` 協(xié)議頭識(shí)別,就調(diào)用 RegistryProtocol#export() 方法
1. 將具體的服務(wù)類名,比如 `DubboServiceRegistryImpl`,通過(guò) ProxyFactory 包裝成 Invoker 實(shí)例
2. 調(diào)用 doLocalExport 方法,使用 DubboProtocol 將 Invoker 轉(zhuǎn)化為 Exporter 實(shí)例,并打開(kāi) Netty 服務(wù)端監(jiān)聽(tīng)客戶請(qǐng)求
3. 創(chuàng)建 Registry 實(shí)例,連接 Zookeeper,并在服務(wù)節(jié)點(diǎn)下寫(xiě)入提供者的 URL 地址,注冊(cè)服務(wù)
4. 向注冊(cè)中心訂閱 override 數(shù)據(jù),并返回一個(gè) Exporter 實(shí)例
3. 根據(jù) URL 格式中的 `"dubbo://service-host/{服務(wù)名}/{版本號(hào)}"`中協(xié)議頭 `dubbo://` 識(shí)別,調(diào)用 DubboProtocol#export() 方法,開(kāi)發(fā)服務(wù)端口
4. RegistryProtocol#export() 返回的 Exporter 實(shí)例存放到 ServiceConfig 的 `List exporters` 中
服務(wù)發(fā)現(xiàn)
在學(xué)習(xí)了服務(wù)暴露原理之后 , 接下來(lái)重點(diǎn)探討服務(wù)是如何消費(fèi)的 。 這里主要講解如何通過(guò)注冊(cè)中心進(jìn)行服務(wù)發(fā)現(xiàn)進(jìn)行遠(yuǎn)程服務(wù)調(diào)用等細(xì)節(jié) 。
服務(wù)發(fā)現(xiàn)流程
在詳細(xì)探討服務(wù)暴露細(xì)節(jié)之前 , 我們先看一下整體duubo的服務(wù)消費(fèi)原理
在整體上看 , Dubbo 框架做服務(wù)消費(fèi)也分為兩大部分 , 第一步通過(guò)持有遠(yuǎn)程服務(wù)實(shí)例生成Invoker, 這個(gè) Invoker 在客戶端是核心的遠(yuǎn)程代理對(duì)象 。 第二步會(huì)把 Invoker 通過(guò)動(dòng)態(tài)代理轉(zhuǎn)換成實(shí)現(xiàn)用戶接口的動(dòng)態(tài)代理引用 。
服務(wù)消費(fèi)方引用服務(wù)的藍(lán)色初始化鏈,時(shí)序圖如下:
源碼分析
(1) 引用入口
服務(wù)引用的入口方法為 ReferenceBean 的 getObject 方法,該方法定義在 Spring 的 FactoryBean 接口中,ReferenceBean 實(shí)現(xiàn)了這個(gè)方法。
public Object getObject() throws Exception {
return get();
}
public synchronized T get() {
// 檢測(cè) ref 是否為空,為空則通過(guò) init 方法創(chuàng)建
if (ref == null) {
// init 方法主要用于處理配置,以及調(diào)用 createProxy 生成代理類
init();
}
return ref;
}
Dubbo 提供了豐富的配置,用于調(diào)整和優(yōu)化框架行為,性能等。Dubbo 在引用或?qū)С龇?wù)時(shí),首先會(huì)對(duì)這些配置進(jìn)行檢查和處理,以保證配置的正確性。
private void init() {
// 創(chuàng)建代理類
ref = createProxy(map);
此方法代碼很長(zhǎng),主要完成的配置加載,檢查,以及創(chuàng)建引用的代理對(duì)象。這里要從 createProxy 開(kāi)始看起。從字面意思上來(lái)看,createProxy 似乎只是用于創(chuàng)建代理對(duì)象的。但實(shí)際上并非如此,該方法還會(huì)調(diào)用其他方法構(gòu)建以及合并 Invoker 實(shí)例。具體細(xì)節(jié)如下。
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
...........
isDvmRefer = InjvmProtocol . getlnjvmProtocol( ) . islnjvmRefer(tmpUrl)
// 本地引用略
if (isJvmRefer) {
} else {
// 點(diǎn)對(duì)點(diǎn)調(diào)用略
if (url != null && url.length() > 0) {
} else {
// 加載注冊(cè)中心 url
List<URL> us = loadRegistries(false);
if (us != null && !us.isEmpty()) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// 添加 refer 參數(shù)到 url 中,并將 url 添加到 urls 中
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
}
// 單個(gè)注冊(cè)中心或服務(wù)提供者(服務(wù)直連,下同)
if (urls.size() == 1) {
// 調(diào)用 RegistryProtocol 的 refer 構(gòu)建 Invoker 實(shí)例
invoker = refprotocol.refer(interfaceClass, urls.get(0));
// 多個(gè)注冊(cè)中心或多個(gè)服務(wù)提供者,或者兩者混合
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
// 獲取所有的 Invoker
for (URL url : urls) {
// 通過(guò) refprotocol 調(diào)用 refer 構(gòu)建 Invoker,refprotocol 會(huì)在運(yùn)行時(shí)
// 根據(jù) url 協(xié)議頭加載指定的 Protocol 實(shí)例,并調(diào)用實(shí)例的 refer 方法
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url;
}
}
if (registryURL != null) {
// 如果注冊(cè)中心鏈接不為空,則將使用 AvailableCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
// 創(chuàng)建 StaticDirectory 實(shí)例,并由 Cluster 對(duì)多個(gè) Invoker 進(jìn)行合并
invoker = cluster.join(new StaticDirectory(u, invokers));
} else {
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
//省略無(wú)關(guān)代碼
// 生成代理類
return (T) proxyFactory.getProxy(invoker);
}
上面代碼很多,不過(guò)邏輯比較清晰。
1、如果是本地調(diào)用,直接jvm 協(xié)議從內(nèi)存中獲取實(shí)例
2、如果只有一個(gè)注冊(cè)中心,直接通過(guò) Protocol 自適應(yīng)拓展類構(gòu)建 Invoker 實(shí)例接口
3、如果有多個(gè)注冊(cè)中心,此時(shí)先根據(jù) url 構(gòu)建 Invoker。然后再通過(guò) Cluster 合并多個(gè) Invoker,最后調(diào)用 ProxyFactory 生成代理類。
(2) 創(chuàng)建客戶端
在服務(wù)消費(fèi)方,Invoker 用于執(zhí)行遠(yuǎn)程調(diào)用。Invoker 是由 Protocol 實(shí)現(xiàn)類構(gòu)建而來(lái)。Protocol 實(shí)現(xiàn)類有很多,這里分析DubboProtocol。
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// 創(chuàng)建 DubboInvoker
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
上面方法看起來(lái)比較簡(jiǎn)單,創(chuàng)建一個(gè)DubboInvoker。通過(guò)構(gòu)造方法傳入遠(yuǎn)程調(diào)用的client對(duì)象。默認(rèn)情況下,Dubbo 使用 NettyClient 進(jìn)行通信。接下來(lái),我們簡(jiǎn)單看一下 getClients 方法的邏輯。
private ExchangeClient[] getClients(URL url) {
// 是否共享連接
boolean service_share_connect = false;
// 獲取連接數(shù),默認(rèn)為0,表示未配置
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
// 如果未配置 connections,則共享連接
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
// 獲取共享客戶端
clients[i] = getSharedClient(url);
} else {
// 初始化新的客戶端
clients[i] = initClient(url);
}
}
return clients;
}
這里根據(jù) connections 數(shù)量決定是獲取共享客戶端還是創(chuàng)建新的客戶端實(shí)例,getSharedClient 方法中也會(huì)調(diào)用 initClient 方法,因此下面我們一起看一下這個(gè)方法。
private ExchangeClient initClient(URL url) {
// 獲取客戶端類型,默認(rèn)為 netty
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
//省略無(wú)關(guān)代碼
ExchangeClient client;
try {
// 獲取 lazy 配置,并根據(jù)配置值決定創(chuàng)建的客戶端類型
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
// 創(chuàng)建懶加載 ExchangeClient 實(shí)例
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
// 創(chuàng)建普通 ExchangeClient 實(shí)例
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service...");
}
return client;
}
initClient 方法首先獲取用戶配置的客戶端類型,默認(rèn)為 netty。下面我們分析一下 Exchangers 的 connect 方法。
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
// 獲取 Exchanger 實(shí)例,默認(rèn)為 HeaderExchangeClient
return getExchanger(url).connect(url, handler);
}
如上,getExchanger 會(huì)通過(guò) SPI 加載 HeaderExchangeClient 實(shí)例,這個(gè)方法比較簡(jiǎn)單,大家自己看一下吧。接下來(lái)分析 HeaderExchangeClient 的實(shí)現(xiàn)。
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
// 這里包含了多個(gè)調(diào)用,分別如下:
// 1. 創(chuàng)建 HeaderExchangeHandler 對(duì)象
// 2. 創(chuàng)建 DecodeHandler 對(duì)象
// 3. 通過(guò) Transporters 構(gòu)建 Client 實(shí)例
// 4. 創(chuàng)建 HeaderExchangeClient 對(duì)象
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
這里的調(diào)用比較多,我們這里重點(diǎn)看一下 Transporters 的 connect 方法。如下:
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 handler 數(shù)量大于1,則創(chuàng)建一個(gè) ChannelHandler 分發(fā)器
handler = new ChannelHandlerDispatcher(handlers);
}
// 獲取 Transporter 自適應(yīng)拓展類,并調(diào)用 connect 方法生成 Client 實(shí)例
return getTransporter().connect(url, handler);
}
如上,getTransporter 方法返回的是自適應(yīng)拓展類,該類會(huì)在運(yùn)行時(shí)根據(jù)客戶端類型加載指定的 Transporter 實(shí)現(xiàn)類。若用戶未配置客戶端類型,則默認(rèn)加載 NettyTransporter,并調(diào)用該類的 connect 方法。如下:
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
// 創(chuàng)建 NettyClient 對(duì)象
return new NettyClient(url, listener);
}
(3) 注冊(cè)
這里就已經(jīng)創(chuàng)建好了NettyClient對(duì)象。關(guān)于 DubboProtocol 的 refer 方法就分析完了。接下來(lái),繼續(xù)分析 RegistryProtocol 的 refer 方法邏輯。
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 取 registry 參數(shù)值,并將其設(shè)置為協(xié)議頭
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
// 獲取注冊(cè)中心實(shí)例
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// 將 url 查詢字符串轉(zhuǎn)為 Map
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
// 獲取 group 配置
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
// 通過(guò) SPI 加載 MergeableCluster 實(shí)例,并調(diào)用 doRefer 繼續(xù)執(zhí)行服務(wù)引用邏輯
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 調(diào)用 doRefer 繼續(xù)執(zhí)行服務(wù)引用邏輯
return doRefer(cluster, registry, type, url);
}
上面代碼首先為 url 設(shè)置協(xié)議頭,然后根據(jù) url 參數(shù)加載注冊(cè)中心實(shí)例。然后獲取 group 配置,根據(jù) group 配置決定 doRefer 第一個(gè)參數(shù)的類型。這里的重點(diǎn)是 doRefer 方法,如下:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 創(chuàng)建 RegistryDirectory 實(shí)例
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
// 設(shè)置注冊(cè)中心和協(xié)議
directory.setRegistry(registry);
directory.setProtocol(protocol);
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
// 生成服務(wù)消費(fèi)者鏈接
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
// 注冊(cè)服務(wù)消費(fèi)者,在 consumers 目錄下新節(jié)點(diǎn)
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
// 訂閱 providers、configurators、routers 等節(jié)點(diǎn)數(shù)據(jù)
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
// 一個(gè)注冊(cè)中心可能有多個(gè)服務(wù)提供者,因此這里需要將多個(gè)服務(wù)提供者合并為一個(gè)
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
如上,doRefer 方法創(chuàng)建一個(gè) RegistryDirectory 實(shí)例,然后生成服務(wù)者消費(fèi)者鏈接,并向注冊(cè)中心進(jìn)行注冊(cè)。注冊(cè)完畢后,緊接著訂閱 providers、configurators、routers 等節(jié)點(diǎn)下的數(shù)據(jù)。完成訂閱后,RegistryDirectory 會(huì)收到這幾個(gè)節(jié)點(diǎn)下的子節(jié)點(diǎn)信息。由于一個(gè)服務(wù)可能部署在多臺(tái)服務(wù)器上,這樣就會(huì)在 providers 產(chǎn)生多個(gè)節(jié)點(diǎn),這個(gè)時(shí)候就需要 Cluster 將多個(gè)服務(wù)節(jié)點(diǎn)合并為一個(gè),并生成一個(gè) Invoker。
(4)創(chuàng)建代理對(duì)象
Invoker 創(chuàng)建完畢后,接下來(lái)要做的事情是為服務(wù)接口生成代理對(duì)象。有了代理對(duì)象,即可進(jìn)行遠(yuǎn)程調(diào)用。代理對(duì)象生成的入口方法為 ProxyFactory 的 getProxy,接下來(lái)進(jìn)行分析。
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
// 調(diào)用重載方法
return getProxy(invoker, false);
}
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
Class<?>[] interfaces = null;
// 獲取接口列表
String config = invoker.getUrl().getParameter("interfaces");
if (config != null && config.length() > 0) {
// 切分接口列表
String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
if (types != null && types.length > 0) {
interfaces = new Class<?>[types.length + 2];
// 設(shè)置服務(wù)接口類和 EchoService.class 到 interfaces 中
interfaces[0] = invoker.getInterface();
interfaces[1] = EchoService.class;
for (int i = 0; i < types.length; i++) {
// 加載接口類
interfaces[i + 1] = ReflectUtils.forName(types[i]);
}
}
}
if (interfaces == null) {
interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
}
// 為 http 和 hessian 協(xié)議提供泛化調(diào)用支持,參考 pull request #1827
if (!invoker.getInterface().equals(GenericService.class) && generic) {
int len = interfaces.length;
Class<?>[] temp = interfaces;
// 創(chuàng)建新的 interfaces 數(shù)組
interfaces = new Class<?>[len + 1];
System.arraycopy(temp, 0, interfaces, 0, len);
// 設(shè)置 GenericService.class 到數(shù)組中
interfaces[len] = GenericService.class;
}
// 調(diào)用重載方法
return getProxy(invoker, interfaces);
}
public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);
如上,上面大段代碼都是用來(lái)獲取 interfaces 數(shù)組的,我們繼續(xù)往下看。getProxy(Invoker, Class[]) 這個(gè)方法是一個(gè)抽象方法,下面我們到 JavassistProxyFactory 類中看一下該方法的實(shí)現(xiàn)代碼。
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
// 生成 Proxy 子類(Proxy 是抽象類)。并調(diào)用 Proxy 子類的 newInstance 方法創(chuàng)建 Proxy 實(shí)例
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
上面代碼并不多,首先是通過(guò) Proxy 的 getProxy 方法獲取 Proxy 子類,然后創(chuàng)建 InvokerInvocationHandler 對(duì)象,并將該對(duì)象傳給 newInstance 生成 Proxy 實(shí)例。InvokerInvocationHandler 實(shí)現(xiàn) JDK 的 InvocationHandler 接口,具體的用途是攔截接口類調(diào)用。下面以 org.apache.dubbo.demo.DemoService 這個(gè)接口為例,來(lái)看一下該接口代理類代碼大致是怎樣的(忽略 EchoService 接口)。
package org.apache.dubbo.common.bytecode;
public class proxy0 implements org.apache.dubbo.demo.DemoService {
public static java.lang.reflect.Method[] methods;
private java.lang.reflect.InvocationHandler handler;
public proxy0() {
}
public proxy0(java.lang.reflect.InvocationHandler arg0) {
handler = $1;
}
public java.lang.String sayHello(java.lang.String arg0) {
Object[] args = new Object[1];
args[0] = ($w) $1;
Object ret = handler.invoke(this, methods[0], args);
return (java.lang.String) ret;
}
}
好了,到這里代理類生成邏輯就分析完了。整個(gè)過(guò)程比較復(fù)雜,大家需要耐心看一下。
總結(jié)
1. 從注冊(cè)中心發(fā)現(xiàn)引用服務(wù):在有注冊(cè)中心,通過(guò)注冊(cè)中心發(fā)現(xiàn)提供者地址的情況下,ReferenceConfig 解析出的 URL 格式為:`registry://registry-host:/org.apache.registry.RegistryService?refer=URL.encode("conumer-host/com.foo.FooService?version=1.0.0")`。
2. 通過(guò) URL 的registry://協(xié)議頭識(shí)別,就會(huì)調(diào)用RegistryProtocol#refer()方法
(1). 查詢提供者 URL,如 `dubbo://service-host/com.foo.FooService?version=1.0.0` ,來(lái)獲取注冊(cè)中心
(2). 創(chuàng)建一個(gè) RegistryDirectory 實(shí)例并設(shè)置注冊(cè)中心和協(xié)議
(3). 生成 conusmer 連接,在 consumer 目錄下創(chuàng)建節(jié)點(diǎn),向注冊(cè)中心注冊(cè)
(4). 注冊(cè)完畢后,訂閱 providers,configurators,routers 等節(jié)點(diǎn)的數(shù)據(jù)
(5). 通過(guò) URL 的 `dubbo://` 協(xié)議頭識(shí)別,調(diào)用 `DubboProtocol#refer()` 方法,創(chuàng)建一個(gè) ExchangeClient 客戶端并返回 DubboInvoker 實(shí)例
3. 由于一個(gè)服務(wù)可能會(huì)部署在多臺(tái)服務(wù)器上,這樣就會(huì)在 providers 產(chǎn)生多個(gè)節(jié)點(diǎn),這樣也就會(huì)得到多個(gè) DubboInvoker 實(shí)例,就需要 RegistryProtocol 調(diào)用 Cluster 將多個(gè)服務(wù)提供者節(jié)點(diǎn)偽裝成一個(gè)節(jié)點(diǎn),并返回一個(gè) Invoker
4. Invoker 創(chuàng)建完畢后,調(diào)用 ProxyFactory 為服務(wù)接口生成代理對(duì)象,返回提供者引用