济南Java培训
达内济南山大路中心

17156168575

热门课程

dubbo源码

  • 时间:2018-03-19
  • 发布:济南Java培训机构
  • 来源:互联网

    dubbo是通过在spring中自定义标签,将这些标签解析到对应组件中,然后将这些组件加载到spring容器进行管理.这些组件中就包括发布服务组件dubbo:service,在对service组件解析加载的过程其实也包含着service服务暴露发布的过程.
    济南Java培训机构的老师说:ServiceBean就是service标签对应加载到spring容器中bean,我们来看该bean的源码,发现ServiceBean实现了InitializingBean接口,则在ServiceBean加载到容器中时会调用初始化方法afterPropertiesSet.
    public void afterPropertiesSet() throws Exception {// 设置生产者配置信息ProviderConfigif (getProvider() == null) {Map<String, ProviderConfig> providerConfigMap = applicationContext == null ? null: BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class, false,false);if (providerConfigMap != null && providerConfigMap.size() > 0) {Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null: BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class,false, false);if ((protocolConfigMap == null || protocolConfigMap.size() == 0)&& providerConfigMap.size() > 1) { // 兼容旧版本List<ProviderConfig> providerConfigs = new ArrayList<ProviderConfig>();for (ProviderConfig config : providerConfigMap.values()) {if (config.isDefault() != null&& config.isDefault().booleanValue()) {providerConfigs.add(config);}}if (providerConfigs.size() > 0) {setProviders(providerConfigs);}} else {ProviderConfig providerConfig = null;for (ProviderConfig config : providerConfigMap.values()) {if (config.isDefault() == null|| config.isDefault().booleanValue()) {if (providerConfig != null) {throw new IllegalStateException("Duplicate provider configs: "+ providerConfig + " and "+ config);}providerConfig = config;}}if (providerConfig != null) {setProvider(providerConfig);}}}}// 设置应用配置信息ApplicationConfigif (getApplication() == null&& (getProvider() == null || getProvider().getApplication() == null)) {Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null: BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false,false);if (applicationConfigMap != null && applicationConfigMap.size() > 0) {ApplicationConfig applicationConfig = null;for (ApplicationConfig config : applicationConfigMap.values()) {if (config.isDefault() == null|| config.isDefault().booleanValue()) {if (applicationConfig != null) {throw new IllegalStateException("Duplicate application configs: "+ applicationConfig + " and "+ config);}applicationConfig = config;}}if (applicationConfig != null) {setApplication(applicationConfig);}}}// 设置模块配置信息ModuleConfigif (getModule() == null&& (getProvider() == null || getProvider().getModule() == null)) {Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null: BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false,false);if (moduleConfigMap != null && moduleConfigMap.size() > 0) {ModuleConfig moduleConfig = null;for (ModuleConfig config : moduleConfigMap.values()) {if (config.isDefault() == null|| config.isDefault().booleanValue()) {if (moduleConfig != null) {throw new IllegalStateException("Duplicate module configs: " + moduleConfig+ " and " + config);}moduleConfig = config;}}if (moduleConfig != null) {setModule(moduleConfig);}}}// 设置注册服务器配置信息RegistryConfigif ((getRegistries() == null || getRegistries().size() == 0)&& (getProvider() == null|| getProvider().getRegistries() == null || getProvider().getRegistries().size() == 0)&& (getApplication() == null|| getApplication().getRegistries() == null || getApplication().getRegistries().size() == 0)) {Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null: BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false,false);if (registryConfigMap != null && registryConfigMap.size() > 0) {List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>();for (RegistryConfig config : registryConfigMap.values()) {if (config.isDefault() == null|| config.isDefault().booleanValue()) {registryConfigs.add(config);}}if (registryConfigs != null && registryConfigs.size() > 0) {super.setRegistries(registryConfigs);}}}// 设置监控配置信息MonitorConfigif (getMonitor() == null&& (getProvider() == null || getProvider().getMonitor() == null)&& (getApplication() == null || getApplication().getMonitor() == null)) {Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null: BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false,false);if (monitorConfigMap != null && monitorConfigMap.size() > 0) {MonitorConfig monitorConfig = null;for (MonitorConfig config : monitorConfigMap.values()) {if (config.isDefault() == null|| config.isDefault().booleanValue()) {if (monitorConfig != null) {throw new IllegalStateException("Duplicate monitor configs: "+ monitorConfig + " and " + config);}monitorConfig = config;}}if (monitorConfig != null) {setMonitor(monitorConfig);}}}// 设置服务协议配置信息if ((getProtocols() == null || getProtocols().size() == 0)&& (getProvider() == null|| getProvider().getProtocols() == null || getProvider().getProtocols().size() == 0)) {Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null: BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false,false);if (protocolConfigMap != null && protocolConfigMap.size() > 0) {List<ProtocolConfig> protocolConfigs = new ArrayList<ProtocolConfig>();for (ProtocolConfig config : protocolConfigMap.values()) {if (config.isDefault() == null|| config.isDefault().booleanValue()) {protocolConfigs.add(config);}}if (protocolConfigs != null && protocolConfigs.size() > 0) {super.setProtocols(protocolConfigs);}}}// 设置服务名称pathif (getPath() == null || getPath().length() == 0) {if (beanName != null && beanName.length() > 0&& getInterface() != null && getInterface().length() > 0&& beanName.startsWith(getInterface())) {setPath(beanName);}}// 是否延迟发布if (!isDelay()) {export();// 发布该service}}
    在afterPropertiesSet方法中先进行了应用、注册、监控等信息设置,最后在方法export方法中进行发布.我们继续看export,export是在其父类ServiceConfig中进行实现.
    public synchronized void export() { if (provider != null) { if (export == null) { export = provider.getExport(); } if (delay == null) { delay = provider.getDelay(); } } if (export != null && ! export.booleanValue()) {//是否暴露 return; } if (delay != null && delay > 0) {//是否延迟暴露 Thread thread = new Thread(new Runnable() { public void run() { try { Thread.sleep(delay); } catch (Throwable e) { } doExport();//如果延迟暴露,在多线程中等待特定时间然后暴露 } }); thread.setDaemon(true); thread.setName("DelayExportServiceThread"); thread.start(); } else { doExport();//直接暴露 } }
    export方法是一个同步方法,在方法中我们没有看到具体的发布过程,该方法主要处理延迟发布.我们接着继续看doExport
    protected synchronized void doExport() {if (unexported) {throw new IllegalStateException("Already unexported!");}if (exported) {// 已经发布return;}exported = true;if (interfaceName == null || interfaceName.length() == 0) {throw new IllegalStateException("<dubbo:service interface="" /> interface not allow null!");}checkDefault();if (provider != null) {if (application == null) {application = provider.getApplication();}if (module == null) {module = provider.getModule();}if (registries == null) {registries = provider.getRegistries();}if (monitor == null) {monitor = provider.getMonitor();}if (protocols == null) {protocols = provider.getProtocols();}}if (module != null) {if (registries == null) {registries = module.getRegistries();}if (monitor == null) {monitor = module.getMonitor();}}if (application != null) {if (registries == null) {registries = application.getRegistries();}if (monitor == null) {monitor = application.getMonitor();}}if (ref instanceof GenericService) {// 判断是否为泛接口实现方式interfaceClass = GenericService.class;if (StringUtils.isEmpty(generic)) {generic = Boolean.TRUE.toString();}} else {try {// 加载service对已接口字节码interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());} catch (ClassNotFoundException e) {throw new IllegalStateException(e.getMessage(), e);}// 合法性加成,interfaceClass不能为空、interfaceClass必须为接口类型、methdos必须为接口中的方法checkInterfaceAndMethods(interfaceClass, methods);checkRef();// 检查配置的引用不能为空且引用必须实现接口(ref="demoService"该引用是必须在容器中存在,由spring负责加载)generic = Boolean.FALSE.toString();}if (local != null) {if ("true".equals(local)) {local = interfaceName + "Local";}Class<?> localClass;try {localClass = ClassHelper.forNameWithThreadContextClassLoader(local);} catch (ClassNotFoundException e) {throw new IllegalStateException(e.getMessage(), e);}if (!interfaceClass.isAssignableFrom(localClass)) {throw new IllegalStateException("The local implemention class "+ localClass.getName() + " not implement interface "+ interfaceName);}}if (stub != null) {// 本地存根if ("true".equals(stub)) {stub = interfaceName + "Stub";}Class<?> stubClass;try {stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);} catch (ClassNotFoundException e) {throw new IllegalStateException(e.getMessage(), e);}if (!interfaceClass.isAssignableFrom(stubClass)) {throw new IllegalStateException("The stub implemention class "+ stubClass.getName() + " not implement interface "+ interfaceName);}}checkApplication();// 检查应用信息checkRegistry();// 检查注册中心checkProtocol();// 检查协议appendProperties(this);checkStubAndMock(interfaceClass);// 检查是否为stub或者mock方式if (path == null || path.length() == 0) {path = interfaceName;}doExportUrls();// 在该方法中进行发布}
    我们再doExport方法中只看到了一些检查判断,还是没有看到具体的暴露发布,我们接着继续看doExportUrls中的代码

    private void doExportUrls() {// 根据注册中心构造url路径// registry://10.118.242.90:2181/com.alibaba.dubbo.registry.RegistryService?application=jiafeng-provider//&client=curator&dubbo=2.0.0&group=china&pid=6220&registry=zookeeper&timestamp=1488351817625List<URL> registryURLs = loadRegistries(true);//因为可以支持多个协议发布服务for (ProtocolConfig protocolConfig : protocols) {//真正的服务暴露发布过程doExportUrlsFor1Protocol(protocolConfig, registryURLs);}}

济南Java培训机构

    在该方法中我们看到了整理注册中心信息为URL路径方式,然后分布对每种协议进行发布(可以配置多个注册中心,多个协议类型).我们继续往下看doExportUrlsFor1Protocol方法
    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig,List<URL> registryURLs) {String name = protocolConfig.getName();// 服务协议,不配置时默认dubboif (name == null || name.length() == 0) {name = "dubbo";}String host = protocolConfig.getHost();// 服务ip地址if (provider != null && (host == null || host.length() == 0)) {host = provider.getHost();}boolean anyhost = false;if (NetUtils.isInvalidLocalHost(host)) {// 是否合法IP地址anyhost = true;try {host = InetAddress.getLocalHost().getHostAddress();} catch (UnknownHostException e) {logger.warn(e.getMessage(), e);}if (NetUtils.isInvalidLocalHost(host)) {if (registryURLs != null && registryURLs.size() > 0) {for (URL registryURL : registryURLs) {try {Socket socket = new Socket();// 通过套接字进行数据通信try {SocketAddress addr = new InetSocketAddress(registryURL.getHost(),registryURL.getPort());socket.connect(addr, 1000);// 设置超时时间1秒host = socket.getLocalAddress().getHostAddress();break;} finally {try {socket.close();} catch (Throwable e) {}}} catch (Exception e) {logger.warn(e.getMessage(), e);}}}if (NetUtils.isInvalidLocalHost(host)) {host = NetUtils.getLocalHost();}}}// 服务端口设置Integer port = protocolConfig.getPort();if (provider != null && (port == null || port == 0)) {port = provider.getPort();}final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();if (port == null || port == 0) {port = defaultPort;}if (port == null || port <= 0) {port = getRandomPort(name);if (port == null || port < 0) {port = NetUtils.getAvailablePort(defaultPort);putRandomPort(name, port);}logger.warn("Use random available port(" + port + ") for protocol "+ name);}Map<String, String> map = new HashMap<String, String>();if (anyhost) {map.put(Constants.ANYHOST_KEY, "true");}map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());map.put(Constants.TIMESTAMP_KEY,String.valueOf(System.currentTimeMillis()));if (ConfigUtils.getPid() > 0) {map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));}appendParameters(map, application);appendParameters(map, module);appendParameters(map, provider, Constants.DEFAULT_KEY);appendParameters(map, protocolConfig);appendParameters(map, this);// 下面为设置每个方法的暴露属性if (methods != null && methods.size() > 0) {for (MethodConfig method : methods) {appendParameters(map, method, method.getName());String retryKey = method.getName() + ".retry";if (map.containsKey(retryKey)) {String retryValue = map.remove(retryKey);if ("false".equals(retryValue)) {map.put(method.getName() + ".retries", "0");}}// 方法中参数设置List<ArgumentConfig> arguments = method.getArguments();if (arguments != null && arguments.size() > 0) {for (ArgumentConfig argument : arguments) {// 类型自动转换.if (argument.getType() != null&& argument.getType().length() > 0) {Method[] methods = interfaceClass.getMethods();// 遍历所有方法if (methods != null && methods.length > 0) {for (int i = 0; i < methods.length; i++) {String methodName = methods[i].getName();// 匹配方法名称,获取方法签名.if (methodName.equals(method.getName())) {Class<?>[] argtypes = methods[i].getParameterTypes();// 一个方法中单个callbackif (argument.getIndex() != -1) {if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {appendParameters(map,argument,method.getName()+ "."+ argument.getIndex());} else {throw new IllegalArgumentException("argument config error : the index attribute and type attirbute not match :index :"+ argument.getIndex()+ ", type:"+ argument.getType());}} else {// 一个方法中多个callbackfor (int j = 0; j < argtypes.length; j++) {Class<?> argclazz = argtypes[j];if (argclazz.getName().equals(argument.getType())) {appendParameters(map,argument,method.getName()+ "." + j);if (argument.getIndex() != -1&& argument.getIndex() != j) {throw new IllegalArgumentException("argument config error : the index attribute and type attirbute not match :index :"+ argument.getIndex()+ ", type:"+ argument.getType());}}}}}}}} else if (argument.getIndex() != -1) {appendParameters(map, argument, method.getName()+ "." + argument.getIndex());} else {throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");}}}} // end of methods for}if (ProtocolUtils.isGeneric(generic)) {map.put("generic", generic);map.put("methods", Constants.ANY_VALUE);} else {String revision = Version.getVersion(interfaceClass, version);if (revision != null && revision.length() > 0) {map.put("revision", revision);}String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();if (methods.length == 0) {logger.warn("NO method found in service interface "+ interfaceClass.getName());map.put("methods", Constants.ANY_VALUE);} else {map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));}}// 是否使用口令if (!ConfigUtils.isEmpty(token)) {if (ConfigUtils.isDefault(token)) {map.put("token", UUID.randomUUID().toString());} else {map.put("token", token);}}// 如果为本地调用就不往注册中心注册if ("injvm".equals(protocolConfig.getName())) {protocolConfig.setRegister(false);map.put("notify", "false");}// 导出服务String contextPath = protocolConfig.getContextpath();if ((contextPath == null || contextPath.length() == 0)&& provider != null) {contextPath = provider.getContextpath();}// 拼装url,协议名称、ip地址、端口、上下文路径、服务名称、属性URL url = new URL(name, host, port, (contextPath == null|| contextPath.length() == 0 ? "" : contextPath + "/")+ path, map);if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol()).getConfigurator(url).configure(url);}String scope = url.getParameter(Constants.SCOPE_KEY);// 配置为none不暴露if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {// 配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {exportLocal(url);// 暴露本地服务}// 如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {if (logger.isInfoEnabled()) {logger.info("Export dubbo service "+ interfaceClass.getName() + " to url " + url);}if (registryURLs != null && registryURLs.size() > 0&& url.getParameter("register", true)) {// 多个协议循环进行暴露发布for (URL registryURL : registryURLs) {url = url.addParameterIfAbsent("dynamic",registryURL.getParameter("dynamic"));URL monitorUrl = loadMonitor(registryURL);// 加载监控服务路径if (monitorUrl != null) {url = url.addParameterAndEncoded(Constants.MONITOR_KEY,monitorUrl.toFullString());}if (logger.isInfoEnabled()) {logger.info("Register dubbo service "+ interfaceClass.getName() + " url " + url+ " to registry " + registryURL);}// 代理工厂创建invoker,服务代理,通过该代理进行远程调用Invoker<?> invoker = proxyFactory.getInvoker(ref,(Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY,url.toFullString()));// 暴露远程服务Exporter<?> exporter = protocol.export(invoker);exporters.add(exporter);}} else {Invoker<?> invoker = proxyFactory.getInvoker(ref,(Class) interfaceClass, url);Exporter<?> exporter = protocol.export(invoker);exporters.add(exporter);}}}this.urls.add(url);}
    在该方法中还是大量代码进行服务配置信息设置,以及service中每个暴露方法的和方法参数的信息设置.其结果都是将各种信息最终拼加到URL中.在这当中我们比较关心的代码有两部分:1.暴露本地服务,2.暴露远程服务.
    本地服务暴露
    private void exportLocal(URL url) {if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {// 本地暴露设置协议为injvm、ip为127.0.0.1、端口为0URL local = URL.valueOf(url.toFullString()).setProtocol(Constants.LOCAL_PROTOCOL).setHost(NetUtils.LOCALHOST).setPort(0);Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(ref,(Class) interfaceClass, local));exporters.add(exporter);// 加入全局列表logger.info("Export dubbo service " + interfaceClass.getName()+ " to local registry");}}
    远程服务暴露,在上面doExportUrlsFor1Protocol方法中有这部分代码,其实为远程暴露部分处理入口
    // 代理工厂创建invoker,服务代理,通过该代理进行远程调用Invoker<?> invoker = proxyFactory.getInvoker(ref,(Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY,url.toFullString()));// 暴露远程服务Exporter<?> exporter = protocol.export(invoker);exporters.add(exporter);
    至此我们都还没有看到dubbo发布服务的核心代码,前面我们看到的基本都是对各种属性的校验以及整理,最终会把包括配置在内的所有相关属性都封装到URL中,所以URL在dubbo中还是比较关键的一个元素.(通篇都在贴代码看起来的话应该挺累的,但是话说回来还有什么能比代码更能说明问题的呢).
    本篇文章是济南Java培训机构为您呈现,希望给您带来更多更好的文章,喜欢的朋友们可以添加微信公众号.

    更多济南Java培训机构相关咨询,请扫描下方二维码

济南Java培训机构

上一篇:IT 创业之路 - 面试最重要的是作品
下一篇:Java 初学者,如何学习 Java?

Java线程知识拾遗

Java开发岗位面试题归类

java开发工程师有前途嘛?

半路转行学JAVA开发真的靠谱么?怎么学?

选择城市和中心
贵州省

广西省

海南省

达内教育

有位老师想和您聊一聊