目錄
- 查找待發布的服務--掃描xml或注解
- 服務的發布入口
- 本地暴露服務
- 生成Invoker物件
- 遠程暴露服務
- 得到Protocol物件
- 啟動Netty監聽服務
- 注冊服務

Invoker 表示遠程通信的物件
Directory 表示服務地址串列
服務發布程序
- 掃描xml配置或者注解
- url的組裝 (dubbo是基于URL驅動的)
- 注冊到注冊中心
- 啟動、發布服務
Dubbo原始碼使用樣例(不使用Spring-Boot的Starter組件):
public class Application {
public static void main(String[] args) throws Exception {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ProviderConfiguration.class);
context.start();
System.in.read();
}
@Configuration
@EnableDubbo(scanBasePackages = "com.anto.dubbo.dubboprovider")
@PropertySource("classpath:/spring/dubbo-provider.properties")
static class ProviderConfiguration {
@Bean
public RegistryConfig registryConfig() {
RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setAddress("zookeeper://172.30.2.7:2181");
return registryConfig;
}
}
}
而在dubbo-spring-boot-starter組件中,則可以直接不帶@EnableDubbo直接在properties檔案配置掃描路徑即可,
dubbo.registry.address=zookeeper://172.30.2.7:2181
dubbo.scan.base-packages=com.anto.dubbo.dubboprovider
是因為自動裝配類中,DubboRelaxedBinding2AutoConfiguration會將上述配置系結至指定的Bean中,
@Bean(name = BASE_PACKAGES_PROPERTY_RESOLVER_BEAN_NAME)//dubboScanBasePackagesPropertyResolver
public PropertyResolver dubboScanBasePackagesPropertyResolver(ConfigurableEnvironment environment) {
ConfigurableEnvironment propertyResolver = new AbstractEnvironment() {
@Override
protected void customizePropertySources(MutablePropertySources propertySources) {
//查找properties檔案中的 dubbo.scan. 配置
Map<String, Object> dubboScanProperties = getSubProperties(environment.getPropertySources(), DUBBO_SCAN_PREFIX);
propertySources.addLast(new MapPropertySource("dubboScanProperties", dubboScanProperties));
}
};
ConfigurationPropertySources.attach(propertyResolver);
return new DelegatingPropertyResolver(propertyResolver);
}
查找待發布的服務--掃描xml或注解
dubbo服務發布的形式
- xml形式
- 注解形式
@EnableDubbo包含了兩個注解@EnableDubboConfig、@DubboComponentScan
疑問:dubbo啟動時 @EnableDubbo是否是必須的注解?
非必須的注解,當用Spring-Boot方式集成Starter組件時,掃描路徑是直接讀取application.properties檔案的;
至于ServiceAnnotationBeanPostProcessor則在DubboAutoConfiguration宣告了該Bean,
以下都是基于注解的方式來進行初始化的,
- ServiceAnnotationBeanPostProcessor

在@DubboComponentScan注解中會import類DubboComponentScanRegistrar,然后預先往IOC容器中注冊幾個BeanDefinition,
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(DubboComponentScanRegistrar.class)
public @interface DubboComponentScan {//該注解Import了DubboComponentScanRegistrar類
private void registerServiceAnnotationBeanPostProcessor(Set<String> packagesToScan, BeanDefinitionRegistry registry) {
//注冊ServiceAnnotationBeanPostProcessor的BeanDefinition
BeanDefinitionBuilder builder = rootBeanDefinition(ServiceAnnotationBeanPostProcessor.class);
builder.addConstructorArgValue(packagesToScan);
builder.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
AbstractBeanDefinition beanDefinition = builder.getBeanDefinition();
BeanDefinitionReaderUtils.registerWithGeneratedName(beanDefinition, registry);
}
注冊完ServiceAnnotationBeanPostProcessor的BeanDefinition后,就應該是將該Bean進行實體化,
//AbstractApplicationContext 呼叫refresh()方法時觸發
invokeBeanFactoryPostProcessors(beanFactory);
//PostProcessorRegistrationDelegate 觸發getBean的程序
currentRegistryProcessors.add(beanFactory.getBean(ppName, BeanDefinitionRegistryPostProcessor.class));
初始化完成后,就應該是真正開始其作用了,
而它實作了BeanDefinitionRegistryPostProcessor,那么就應該是呼叫其postProcessBeanDefinitionRegistry()方法,
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
// 再次保證注冊了DubboBootstrapApplicationListener 其實在`@DubboComponentScan注解中,
//匯入`DubboComponentScanRegistrar`類時已經注冊了
registerBeans(registry, DubboBootstrapApplicationListener.class);
Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan);
if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) {
//開始注冊帶有dubbo注解的Bean
registerServiceBeans(resolvedPackagesToScan, registry);
}
//...略
}
跟Mybatis中類似,Dubbo也定義了專門用來掃描指定路徑的類DubboClassPathBeanDefinitionScanner,
private void registerServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) {
DubboClassPathBeanDefinitionScanner scanner =
new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader);
BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry);
scanner.setBeanNameGenerator(beanNameGenerator);
// serviceAnnotationTypes是一個list 包含DubboService.class 和Service.class
//使得scanner只掃描帶這倆路徑的注解
serviceAnnotationTypes.forEach(annotationType -> {
scanner.addIncludeFilter(new AnnotationTypeFilter(annotationType));
});
for (String packageToScan : packagesToScan) {
// Registers @Service Bean first
scanner.scan(packageToScan);
// Finds all BeanDefinitionHolders of @Service whether @ComponentScan scans or not.
Set<BeanDefinitionHolder> beanDefinitionHolders =
findServiceBeanDefinitionHolders(scanner, packageToScan, registry, beanNameGenerator);
if (!CollectionUtils.isEmpty(beanDefinitionHolders)) {
for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) {
registerServiceBean(beanDefinitionHolder, registry, scanner);
}
//...略
}
}
服務的發布入口
當dubbo的服務掃描完成后,需要發布服務,發布服務我們需要考慮以下的要點:
服務以什么協議發布
服務發布的埠
- 服務發布的入口
DubboBootstrapApplicationListener監聽了ContextRefreshedEvent事件,當Spring完成Bean的裝載后,會觸發事件的介面,為真正發布服務入口
private void onContextRefreshedEvent(ContextRefreshedEvent event) {
dubboBootstrap.start();
}
那么該類是在哪里進行注冊的呢?
ServiceClassPostProcessor觸發方法postProcessBeanDefinitionRegistry()時會顯式的注冊DubboBootstrapApplicationListener的Bean,
// 此處是為了保證有DubboBootstrapApplicationListener的Bean
//其實在`@DubboComponentScan注解中,匯入`DubboComponentScanRegistrar`類時已經注冊了
registerBeans(registry, DubboBootstrapApplicationListener.class);
- 服務發布的流程
真正開始進行dubbo服務的發布是通過DubboBootstrap類的start()來完成的,
public DubboBootstrap start() {
if (started.compareAndSet(false, true)) {
//...略
//初始化必要的組件 配置中心、注冊中心、校驗必要的配置等
initialize();
// 1. 暴露dubbo服務
exportServices();
// Not only provider register
if (!isOnlyRegisterProvider() || hasExportedServices()) {
// 2. export MetadataService
exportMetadataService();
//3. Register the local ServiceInstance if required
registerServiceInstance();
}
referServices();
//...略
return this;
}
接下里呼叫 DubboBootstrap類的exportServices()方法
private void exportServices() {
//逐個服務暴露
configManager.getServices().forEach(sc -> {
// TODO, compatible with ServiceConfig.export()
ServiceConfig serviceConfig = (ServiceConfig) sc;
serviceConfig.setBootstrap(this);
if (exportAsync) {
ExecutorService executor = executorRepository.getServiceExporterExecutor();
Future<?> future = executor.submit(() -> {
sc.export();
exportedServices.add(sc);
});
asyncExportingFutures.add(future);
} else {
sc.export();
exportedServices.add(sc);
}
});
}
- 關鍵物件
ServiceConfig

服務發布程序大致可以分為三個程序:
1、生成具體服務的Invoker物件
2、發布協議服務(默認以Dubbo發布),Invoker轉換生成成Exporter
3、將服務地址資訊注冊到注冊中心
//ServiceConfig.doExportUrls()
private void doExportUrls() {
//得到服務倉庫ServiceRepository 將服務注冊進去全域唯一的服務倉庫物件中
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
repository.registerProvider(
getUniqueServiceName(),
ref,
serviceDescriptor,
this,
serviceMetadata
);
//注冊中心地址集合
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
//默認只有dubbo協議 默認的ProtocolConfig物件 <dubbo:protocol name="dubbo" port="20880" />
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// In case user specified path, register service one more time to map it to path.
repository.registerService(pathKey, interfaceClass);
// TODO, uncomment this line once service key is unified
serviceMetadata.setServiceKey(pathKey);
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
根據 RegistryConfig 的配置,組裝 registryURL,形成的 URL 格式如下:
registry://172.30.2.7:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-annotation-provider&dubbo=2.0.2&pid=65324®istry=zookeeper&release=2.7.7×tamp=1604025007827
這個 URL 表示它是一個 registry 協議(RegistryProtocol),地址是注冊中心的ip:port,服務介面是 RegistryService,registry 的型別為 zookeeper,在有多個注冊中心時,會生成多個registryURL,
接下來開始根據具體的協議(默認的dubbo協議)暴露服務,同時將服務注冊到一(多)個注冊中心,
doExportUrlsFor1Protocol()
本地暴露服務
在ServiceConfig中呼叫doExportUrlsFor1Protocol()方法進行服務暴露時,會有如下判斷:
//當沒有顯式的指定scope的值為remote時,會進行本地暴露
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
injvm://127.0.0.1/com.anto.dubbo.HelloService?anyhost=true&application=dubbo-demo-annotation-provider&bind.ip=172.30.60.208&bind.port=20880&cluster=failsafe&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.anto.dubbo.HelloService&methods=sayHello&pid=65324&release=2.7.7&side=provider×tamp=1604025491044
通過指定scope的值,顯式指定以何種方式暴露服務,
<Dubbo:service interface="org.apache.Dubbo.samples.local.api.DemoService" ref="target" scope="remote"/>
#指定消費者提供端的暴露服務方式 不指定將以dubbo、injvm同時暴露
dubbo.provider.scope=remote
使用 Dubbo 本地呼叫不需做特殊配置,按正常 Dubbo 服務暴露服務即可,
任一服務在暴露遠程服務的同時,也會同時以 injvm 的協議暴露本地服務,injvm 是一個偽協議,不會像其他協議那樣對外開啟埠,只用于本地呼叫的目的,
Exporter<?> exporter = PROTOCOL.export(
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
因為發布injvm協議時,其協議頭是injvm,所以PROTOCOL根據自適應擴展點得到的是InjvmProtocol,所以此處生成的的Exporter物件是InjvmExporter型別,
那么通過injvm方式來暴露服務有什么好好處呢?
與本地物件上方法呼叫不同的是,Dubbo 本地呼叫會經過 Filter 鏈,其中包括了 Consumer 端的 Filter 鏈以及 Provider 端的 Filter 鏈,通過這樣的機制,本地消費者和其他消費者都是統一對待,統一監控,服務統一進行治理,
本地呼叫何時是無用的?
第一,泛化呼叫的時候無法使用本地呼叫,
第二,消費者明確指定 URL 發起直連呼叫,
生成Invoker物件
呼叫器,是Dubbo領域比較重要的一個物件,在服務的發布和呼叫程序中,服務本身會以Invoker物件存在,不管是發布dubbo服務還是發布本地的injvm服務,都需要生成一個Invoker物件,
//發布dubbo服務
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
Dubbo中都是通過生成一個Invoker物件,然后PROTOCOL.export(wrapperInvoker);來完成服務的發布,
//發布本地的injvm服務
Exporter<?> exporter = PROTOCOL.export(
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
PROXY_FACTORY是一個自適應擴展點得到的一個物件,
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
看到這,那么肯定在/dubbo/META-INF/internal路徑下會有一個名稱為org.apache.dubbo.rpc.ProxyFactory檔案的ProxyFactory介面的配置,
stub=org.apache.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper
jdk=org.apache.dubbo.rpc.proxy.jdk.JdkProxyFactory
javassist=org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory
- 首先根據URL中的引數來判斷,proxy="jdk",若配置了則直接根據URL中的配置查找,此處邏輯是在生成的
ProxyFactory$Adaptive類中 - 未配置,則按照默認的
@SPI("javassist")則為JavassistProxyFactory型別,
根據之前的分析知道,只要該介面被@SPI修飾,且方法上有@Adaptive修飾時,會生成一個$Adaptive結尾的代理類,所以這里會生成一個由Dubbo框架生成的一個類ProxyFactory$Adaptive ,
public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory {
public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0);
}
public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0, boolean arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0, arg1);
}
public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException {
if (arg2 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg2;
//獲取url中的proxy引數 無則是javassist
String extName = url.getParameter("proxy", "javassist");
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
//再次呼叫JavassistProxyFactory.getInvoker()方法
return extension.getInvoker(arg0, arg1, arg2);
}
}
所以ProxyFactory$Adaptive 的作用主要是根據url中的proxy引數,決定需要用ProxyFactory介面的哪個實作,當沒有配置時,則用@SPI("javassist")配置的值,
此時引數為

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url)
proxy: com.anto.dubbo.dubboprovider.HelloServiceImpl
type: com.anto.dubbo.HelloService
url: injvm://127.0.0.1/com.anto.dubbo.HelloService?anyhost=true&application=dubbo-demo-annotation-provider&bind.ip=172.30.60.208&bind.port=20880&cluster=failsafe&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.anto.dubbo.HelloService&methods=sayHello&pid=16136&release=2.7.7&side=provider×tamp=1606266760859
然后就是呼叫JavassistProxyFactory的getInvoker方法了,
//JavassistProxyFactory
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// 創建一個動態代理
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
//呼叫構建的動態代理類的invokeMethod()
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
通過Wrapper類來創建一個動態代理,(Wrapper類的258行)其核心方法如下:
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
com.anto.dubbo.dubboprovider.HelloServiceImpl w;
try {
w = ((com.anto.dubbo.dubboprovider.HelloServiceImpl) $1);
} catch (Throwable e) {
throw new IllegalArgumentException(e);
}
try {
//判斷引數是否String
if ("sayHello".equals($2) && $3.length == 1 && $3[0].getName().equals("java.lang.String")) {
return ($w) w.sayHello((java.lang.String) $4[0]);
}
if ("sayHello".equals($2) && $3.length == 1 && $3[0].getName().equals("com.anto.dubbo.User")) {
return ($w) w.sayHello((com.anto.dubbo.User) $4[0]);
}
} catch (Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e);
}
throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class com.anto.dubbo.dubboprovider.HelloServiceImpl.");
}
看到這個動態代理類是否有一絲絲的親切感?
這不就是根據不同的方法名和引數型別來決定呼叫介面的哪個方法嘛!
構建好由Wrapper類生成的動態代理后,回傳一個匿名的AbstractProxyInvoker型別的Invoker物件,那么它有什么特點呢?
可以看到它重寫了doInvoke()方法,最終是呼叫動態代理類的invokeMethod(),那本質上也就是呼叫dubbo介面的方法,
回顧下生成Invoker物件的程序:
1.Dubbo框架生成一個ProxyFactory$Adaptive代理類---決定用哪個ProxyFactory
2.Wrapper類為具體要發布的服務創建一個動態代理類
3.生成一個重寫了doInvoke()方法的AbstractProxyInvoker型別的匿名類--用以轉發消費發起的請求到Wrapper生成的代理類的invokeMethod()
所以,簡單總結一下Invoke本質上應該是一個代理,經過層層包裝最終進行了發布,當消費者發起請求的時候,會獲得這個Invoker進行呼叫,
最終發布出去的Invoker, 也不是單純的一個代理,也是經過多層包裝
InvokerDelegate(DelegateProviderMetaDataInvoker(AbstractProxyInvoker()))
遠程暴露服務
在ServiceConfig類中doExportUrlsFor1Protocol()方法中,首先是本地服務的暴露,然后是遠程服務的暴露,
遠程服務暴露的程序其實也就是伴隨著生成Exporter物件程序,
//通過某種協議來暴露服務
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
遠程服務暴露時,首先需要得到一個PROTOCOL物件,它是一個自適應擴展點介面的得到Protocol的物件,
得到Protocol物件
private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
由于PROTOCOL介面的方法標注了@Adaptive,所以會為其生成代理類物件,
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
動態生成的類如下Protocol$Adaptive
package org.apache.dubbo.rpc;
import org.apache.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
//...略
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
//根據url中的協議頭引數 決定加載哪個協議的實作,最開始協議頭為registry
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
//...略
}
思考下:為什么需要這么設計呢?每個標注了@Adaptive擴展類都聽過Compile來生成代碼,而不是單獨設計一個Protocol$Adaptive類呢?
Dubbo針對@SPI擴展介面中,方法標注了@Adaptive注解的類都會生成一個代理類,名稱為介面名$Adaptive,
這樣設計主要是擴展性和靈活性,
通過注解就能夠去宣告一個動態的適配類,同時用戶在使用的時候,可以根據配置中宣告的屬性來決定適配到的目標類,
可擴展性體現在spi的機制上,當我們自己開發擴展的實作時,同樣可以利用這個動態適配的功能來實作目標類的路由,
在上面一步生成的Invoker物件中,它的URL為:
registry://172.30.2.7:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-annotation-provider&dubbo=2.0.2&export=dubbo://172.30.60.208:20880/com.anto.dubbo.HelloService?anyhost=true&application=dubbo-demo-annotation-provider&bind.ip=172.30.60.208&bind.port=20880&cluster=failsafe&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.anto.dubbo.HelloService&methods=sayHello&pid=12248&release=2.7.7&side=provider×tamp=1606719802290&pid=12248®istry=zookeeper&release=2.7.7×tamp=1606719800257
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
所以在上面的getExtension("registry")會去查找RegistryProtocol,但是在ExtensionLoader中有如下陳述句:
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
此時的wrapperClasses則是這三個

當在決議META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol時,會把Protocol介面的包裝類放在快取屬性cachedWrapperClasses中,
思考下:怎么判斷這個實作SPI介面是一個包裝型別呢?
private boolean isWrapperClass(Class<?> clazz) {
try {
//當實作類有將介面自身傳進來的建構式時,認為其是一個包裝型別
clazz.getConstructor(type);
return true;
} catch (NoSuchMethodException e) {
return false;
}
}
Dubbo運用裝飾器模式對協議的spi介面起到一個裝飾增強作用,
所以暴露服務的代碼Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);此時PROTOCOL為ProtocolListenerWrapper物件,然后依次呼叫QosProtocolWrapper、ProtocolFilterWrapper的export()方法,
ProtocolListenerWrapper :用于服務export時候插入監聽機制 ,
QosprotocolWrapper :如果當前配置了注冊中心,則會啟動一個Qos server.qos是dubbo的在線運維命令,dubbo2.5.8新版本重構了telnet模塊,提供了新的telnet命令支持,新版本的telnet埠與dubbo協議的埠是不同的埠,默認為22222 ,
ProtocolFilterWrapper :對invoker進行filter的包裝,實作請求的過濾 ,
呼叫鏈路如下:
ProtocolListenerWrapper.export()--->QosProtocolWrapper.export()---->ProtocolFilterWrapper.export()--->RegistryProtocol.export()
但是在注冊的場景程序中,這幾個擴展點都不會生效,執行的邏輯會先判斷是否為注冊協議,如果是則直接基于協議發布服務,
一句話,最后我們得到的是RegistryProtocol物件,
啟動Netty監聽服務
接下來將呼叫RegistryProtocol.export()方法,
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//這里獲得的是zookeeper注冊中心的url: zookeeper://ip:port
URL registryUrl = getRegistryUrl(originInvoker);
// 這里是獲得服務提供者的url, dubbo://ip:port...
URL providerUrl = getProviderUrl(originInvoker);
//訂閱override資料,在admin控制臺可以針對服務進行治理,比如修改權重,修改路由機制等,當注冊中心有此服務的覆寫配置注冊進來時,推送訊息給提供者,重新暴露服務
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//這里就交給了具體的協議去暴露服務(如dubbo
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
final Registry registry = getRegistry(originInvoker);
//獲取要注冊到注冊中心的URL: dubbo://ip:port
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// 若配置了注冊中心,向注冊中心如zookeeper中注冊服務
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
register(registryUrl, registeredProviderUrl);
}
// register stated url on provider model
registerStatedUrl(registryUrl, registeredProviderUrl, register);
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
notifyExport(exporter);
//保證每次export都回傳一個新的exporter實體
return new DestroyableExporter<>(exporter);
}
在RegistryProtocol.expor()t中,有兩個核心流程:
- 呼叫
doLocalExport啟動本地服務,也就是netty server - 呼叫
register方法進行服務地址的注冊
接下來看下doLocalExport(originInvoker, providerUrl);方法,
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
//key的值為發布該dubbo服務的一個協議串 dubbo://172.30.60.208:20880/com.anto.dubbo.HelloService?...
String key = getCacheKey(originInvoker);
//當bouds這個Map中不存在該服務的key時,會生成一個該服務的Exporter
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
先將要發布的服務生成一個唯一的帶有dubbo協議串的key值,
dubbo://172.30.60.208:20880/com.anto.dubbo.HelloService?anyhost=true&application=dubbo-demo-annotation-provider&bind.ip=172.30.60.208&bind.port=20880&cluster=failsafe&deprecated=false&dubbo=2.0.2&generic=false&interface=com.anto.dubbo.HelloService&methods=sayHello&pid=3876&release=2.7.7&side=provider×tamp=1606963046596
此時再次呼叫protocol.export(invokerDelegate),會再次進入到protocol$Adaptive.export()方法中,
不過此次回傳的是DubboProtocol,呼叫鏈路如下:
ProtocolListenerWrapper.export()--->QosProtocolWrapper.export()---->ProtocolFilterWrapper.export()--->DubboProtocol.export()
所以自來就來到了DubboProtocol.export()方法:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// 將服務名稱埠作為key Invoker作為DubboExporter的引數存盤 一起放進一個exportMap中
//key com.anto.dubbo.HelloService:20880
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//是否配置了引數回呼機制
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
}
}
//開啟一個Netty服務
openServer(url);
optimizeSerialization(url);
return exporter;
}
- openServer()
private void openServer(URL url) {
// 獲取 host:port,并將其作為服務器實體的 key,用于標識當前的服務器實體
String key = url.getAddress();
//client 也可以暴露一個只有server可以呼叫的服務
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
//創建服務器實體
serverMap.put(key, createServer(url));
}
}
} else {
// 服務器已創建,則根據 url 中的配置重置服務器
server.reset(url);
}
}
}
往下則是基于org.apache.dubbo.remoting.transport.netty4.NettyTransporter 開啟一個Netty服務的程序了,
在基于DubboProtocol協議發布服務的程序中,有幾個重要的步驟
- 構建一個exporterMap,以服務路徑名稱作為key,把invoker包裝成了DubboExporter作為value存盤 ;
- 針對同一臺機器上的多個服務,只啟動一個服務實體 ;
- 采用Netty4來發布服務 ,
注冊服務
當配置了諸如Zookeeper的注冊中心時,會將服務的節點資訊在相應的地方寫入,
// 向zookeeper中注冊服務
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
register(registryUrl, registeredProviderUrl);
}
根據URL的key 來動態的找到需要注冊的服務中心,registryFactory是個動態擴展點,先經過包裝的擴展點,然后當為zookeeper時,則為ZookeeperRegistryFactory,
此時的registryURL已經是決議成Zookeeper開頭的url了,
zookeeper://172.30.2.7:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-annotation-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.30.60.208%3A20880%2Fcom.anto.dubbo.HelloService%3Fanyhost%3Dtrue%26application%3Ddubbo-demo-annotation-provider%26bind.ip%3D172.30.60.208%26bind.port%3D20880%26cluster%3Dfailsafe%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dcom.anto.dubbo.HelloService%26methods%3DsayHello%26pid%3D16172%26release%3D2.7.7%26side%3Dprovider%26timestamp%3D1606978398061&pid=16172&release=2.7.7×tamp=1606978397349
registeredProviderUrl則是dubbo開頭的服務提供地址,
private void register(URL registryUrl, URL registeredProviderUrl) {
//registry為ListenerRegistryWrapper registryFactory同樣也是由動態擴展點生成的物件
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
}
ListenerRegistryWrapper.register()
public void register(URL url) {
try {
//呼叫ZookeeperRegistry.register()
registry.register(url);
} finally {
if (CollectionUtils.isNotEmpty(listeners)) {
RuntimeException exception = null;
for (RegistryServiceListener listener : listeners) {
if (listener != null) {
try {
listener.onRegister(url);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
}
ListenerRegistryWrapper 是對ZookeeperRegistry做了一層包裝,增加監聽器相應的功能,
FailbackRegistry.register()
FailbackRegistry是一個提供的重試機制的父類,是ZookeeperRegistry、NacosRegistry、SofaRegistry等具體注冊中心的父類,
ZookeeperRegistry 類中并沒有register(),所以將進入父類FailbackRegistry的方法中,
public void register(URL url) {
//...略
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// 真正呼叫ZookeeperRegistry.doRegister()
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// 若配置的屬性check為true 則直接拋出例外 不再進行重試
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// 如果注冊失敗拋例外了,會將注冊失敗的url放入注冊失敗的容器中
addFailedRegistered(url);
}
}
ZookeeperRegistry.doRegister()
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
所以最侄訓呼叫相應dubbo集成的zookeeper的客戶端(curator 2.7以后)來寫入暴露的服務的節點資訊,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/229656.html
標籤:Java
上一篇:JavaAPI-工具類
