首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

Kubernetes 调度器实现初探

  • 24-03-05 05:00
  • 2997
  • 8482
blog.csdn.net

戳蓝字“CSDN云计算”关注我们哦!


640?wx_fmt=jpeg


640?wx_fmt=jpeg


作者:阿里云智能事业群高级开发工程师 萧元

转自:阿里系统软件技术


Kubernetes作为一个分布式容器编排调度引擎,资源调度是它的最重要的功能。在 Kubernetes集群中,调度器作为一个独立模块运行。本文将介绍 Kubernetes 调度器的实现原理,工作流程, 以及未来发展。



Kubernetes 调度工作方式


Kubernetes 中的调度器,是作为单独组件运行,一般运行在 Master 中,和 Master 数量保持一致。通过 Raft 协议选出一个实例作为 Leader 工作,其他实例 Backup。 当 Master 故障,其他实例之间继续通过 Raft 协议选出新的 Master 工作。

其工作模式如下:

  • 调度器内部维护一个调度的 pods 队列 podQueue, 并监听 APIServer;

  • 当我们创建 Pod 时,首先通过 APIServer 往 ETCD 写入 Pod 元数据;

  • 调度器通过 Informer 监听 Pods 状态,当有新增 Pod 时,将 Pod 加入到 podQueue 中;

  • 调度器中的主进程,会不断的从 podQueue 取出的 Pod,并将 Pod 进入调度分配节点环节;

  • 调度环节分为两个步奏, Filter 过滤满足条件的节点 、 Prioritize 根据 Pod 配置,例如资源使用率,亲和性等指标,给这些节点打分,最终选出分数最高的节点;

  • 分配节点成功, 调用 apiServer 的 binding pod 接口, 将pod.Spec.NodeName设置为所分配的那个节点;

  • 节点上的 kubelet 同样监听 ApiServer,如果发现有新的 pod 被调度到所在节点,在节点上拉起对应的容器

  • 假如调度器尝试调度 Pod 不成功,如果开启了优先级和抢占功能,会尝试做一次抢占,将节点中优先级较低的 pod 删掉,并将待调度的 pod 调度到节点上。 如果未开启,或者抢占失败,会记录日志,并将 pod 加入 podQueue 队尾。




640?wx_fmt=png




实现细节



kube-scheduling 是一个独立运行的组件,主要工作内容在 Run 函数 。 这里面主要做几件事情:

  • 初始化一个 Scheduler 实例 sched,传入各种 Informer,为关心的资源变化建立监听并注册 handler,例如维护 podQuene;

  • 注册 events 组件,设置日志;

  • 注册 http/https 监听,提供健康检查和 metrics 请求;

  • 运行主要的调度内容入口 sched.run() 。 如果设置 --leader-elect=true ,代表启动多个实例,通过Raft选主,实例只有当被选为master后运行主要工作函数sched.run。

调度核心内容在 sched.run() 函数,它会启动一个 go routine 不断运行sched.scheduleOne, 每次运行代表一个调度周期。

 
 

func (sched *Scheduler) Run() {
    if !sched.config.WaitForCacheSync() {
        return
    }
    go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

我们看下 sched.scheduleOne 主要做什么:

 
 

func (sched *Scheduler) scheduleOne() {
  pod := sched.config.NextPod()
  .... // do some pre check
  scheduleResult, err := sched.schedule(pod)
    if err != nil {
        if fitError, ok := err.(*core.FitError); ok {
            if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
                ..... // do some log
            } else {
                sched.preempt(pod, fitError)
            }
        }
    }
    ... 
    // Assume volumes first before assuming the pod.
    allBound, err := sched.assumeVolumes(assumedPod, scheduleResult.SuggestedHost)
    ...     
    fo func() {
        // Bind volumes first before Pod
        if !allBound {
            err := sched.bindVolumes(assumedPod)
            if err != nil {
                klog.Errorf("error binding volumes: %v", err)
                metrics.PodScheduleErrors.Inc()
                return
            }
        }
      err := sched.bind(assumedPod, &v1.Binding{
            ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
            Target: v1.ObjectReference{
                Kind: "Node",
                Name: scheduleResult.SuggestedHost,
            },
        })
    }
}

在sched.scheduleOne 中,主要会做几件事情:

  • 通过sched.config.NextPod(), 从 podQuene 中取出 pod;

  • 运行sched.schedule,尝试进行一次调度;

  • 假如调度失败,如果开启了抢占功能,会调用sched.preempt 尝试进行抢占,驱逐一些 pod,为被调度的 pod 预留空间,在下一次调度中生效;

  • 如果调度成功,执行 bind 接口。在执行 bind 之前会为 pod volume 中声明的的 PVC 做 provision。

sched.schedule 是主要的 pod 调度逻辑:

 
 

func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (result ScheduleResult, err error) {
    // Get node list
    nodes, err := nodeLister.List()
    // Filter
    filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
    if err != nil {
        return result, err
    }
    // Priority
    priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
    if err != nil {
        return result, err
    }

    // SelectHost
    host, err := g.selectHost(priorityList)
    return ScheduleResult{
        SuggestedHost:  host,
        EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap),
        FeasibleNodes:  len(filteredNodes),
    }, err
}


调度主要分为三个步奏:

  • Filters: 过滤条件不满足的节点;

  • PrioritizeNodes: 在条件满足的节点中做 Scoring,获取一个最终打分列表 priorityList;

  • selectHost: 在 priorityList 中选取分数最高的一组节点,从中根据 round-robin 方式选取一个节点。


640?wx_fmt=png


接下来我们继续拆解, 分别看下这三个步奏会怎么做


Filters

Filters 相对比较容易,调度器默认注册了一系列的 predicates 方法, 调度过程为并发调用每个节点的 predicates 方法。最终得到一个 node list,包含符合条件的节点对象。


 
 

func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
    if len(g.predicates) == 0 {
        filtered = nodes
    } else {
        allNodes := int32(g.cache.NodeTree().NumNodes())
        numNodesToFind := g.numFeasibleNodesToFind(allNodes)
        checkNode := func(i int) {
            nodeName := g.cache.NodeTree().Next()
      // 此处会调用这个节点的所有predicates 方法
            fits, failedPredicates, err := podFitsOnNode(
                pod,
                meta,
                g.cachedNodeInfoMap[nodeName],
                g.predicates,
                g.schedulingQueue,
                g.alwaysCheckAllPredicates,
            )
            if fits {
                length := atomic.AddInt32(&filteredLen, 1)
                if length > numNodesToFind {
            // 如果当前符合条件的节点数已经足够,会停止计算。
                    cancel()
                    atomic.AddInt32(&filteredLen, -1)
                } else {
                    filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
                }
            }
        }
    // 并发调用checkNode 方法
        workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
        filtered = filtered[:filteredLen]
    }
    return filtered, failedPredicateMap, nil
}


值得注意的是, 1.13 中引入了 FeasibleNodes 机制,为了提高大规模集群的调度性能。允许我们通过 bad-percentage-of-nodes-to-score 参数, 设置 filter 的计算比例(默认 50%), 当节点数大于 100 个, 在 filters的过程,只要满足条件的节点数超过这个比例,就会停止 filter 过程,而不是计算全部节点。


举个例子,当节点数为 1000, 我们设置的计算比例为 30%,那么调度器认为 filter 过程只需要找到满足条件的 300 个节点,filter 过程中当满足条件的节点数达到 300 个,filter 过程结束。 这样 filter 不用计算全部的节点,同样也会降低 Prioritize 的计算数量。 但是带来的影响是 pod 有可能没有被调度到最合适的节点。


Prioritize

Prioritize 的目的是帮助 pod,为每个符合条件的节点打分,帮助 pod 找到最合适的节点。同样调度器默认注册了一系列 Prioritize 方法。这是 Prioritize 对象的数据结构:


 
 

// PriorityConfig is a config used for a priority function.
type PriorityConfig struct {
    Name   string
    Map    PriorityMapFunction
    Reduce PriorityReduceFunction
    // TODO: Remove it after migrating all functions to
    // Map-Reduce pattern.
    Function PriorityFunction
    Weight   int
}


每个 PriorityConfig 代表一个评分的指标,会考虑服务的均衡性,节点的资源分配等因素。 一个 PriorityConfig 的主要 Scoring 过程分为 Map 和 Reduce:

  • Map 过程计算每个节点的分数值

  • Reduce 过程会将当前 PriorityConfig 的所有节点的打分结果再做一次处理。


所有 PriorityConfig 计算完毕后,将每个 PriorityConfig 的数值乘以对应的权重,并按照节点再做一次聚合。


 
 

workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
        nodeInfo := nodeNameToInfo[nodes[index].Name]
        for i := range priorityConfigs {
            var err error
            results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
        }
    })
    for i := range priorityConfigs {
        wg.Add(1)
        go func(index int) {
            defer wg.Done()
            if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]);
        }(i)
    }
    wg.Wait()
    // Summarize all scores.
    result := make(schedulerapi.HostPriorityList, 0, len(nodes))
    for i := range nodes {
        result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
        for j := range priorityConfigs {
            result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
        }
    }


此外 Filter 和 Prioritize 都支持 extener scheduler 的调用,本文不做过多阐述。



现状



目前 Kubernetes 调度器的调度方式是 Pod-by-Pod,也是当前调度器不足的地方。主要瓶颈如下:

  • Kubernetes 目前调度的方式,每个 pod 会对所有节点都计算一遍,当集群规模非常大,节点数很多时,pod 的调度时间会非常慢。 这也是 percentage-of-nodes-to-score 尝试要解决的问题;

  • pod-by-pod 的调度方式不适合一些机器学习场景。 Kubernetes 早期设计主要为在线任务服务,在一些离线任务场景,比如分布式机器学习中,我们需要一种新的算法 gang scheduler,pod 也许对调度的即时性要求没有那么高,但是提交任务后,只有当一个批量计算任务的所有 workers 都运行起来时,才会开始计算任务。 pod-by-pod 方式在这个场景下,当资源不足时非常容易引起资源死锁;

  • 当前调度器的扩展性不是十分好,特定场景的调度流程都需要通过硬编码实现在主流程中,比如我们看到的 bindVolume 部分, 同样也导致 Gang Scheduler 无法在当前调度器框架下通过原生方式实现。



Kubernetes 调度期的发展



社区调度器的发展,也是为了解决这些问题:

  • 调度器 V2 框架,增强了扩展性,也为在原生调度器中实现 Gang schedule 做准备;

  • Kube-batch: 一种 Gang schedule 的实现 https://github.com/kubernetes-sigs/kube-batch;

  • poseidon: Firmament 一种基于网络图调度算法的调度器,poseidon 是将 Firmament 接入 Kubernetes 调度器的实现 https://github.com/kubernetes-sigs/poseidon。


参考文献

[1]https://medium.com/jorgeacetozi/kubernetes-master-components-etcd-api-server-controller-manager-and-scheduler-3a0179fc8186
[2]
https://jvns.ca/blog/2017/07/27/how-does-the-kubernetes-scheduler-work/


640?wx_fmt=png


福利

扫描添加小编微信,备注“姓名+公司职位”,加入【云计算学习交流群】,和志同道合的朋友们共同打卡学习!


640?wx_fmt=jpeg


推荐阅读:

  • 大数据背后的无奈与焦虑:“128元连衣裙”划分矮穷挫与白富美?

  • 315 后,等待失业的程序员

  • 再不编程就老了!05 后比特币专家准备赚个 134,000,000 元!

  • Pig变飞机?AI为什么这么蠢 | Adversarial Attack

  • 互联网没有春天

  • 麦克阿瑟奖得主Dawn Song:区块链能保密和保护隐私?图样图森破!

  • 2019年最值得关注的五大微服务发展趋势


640?wx_fmt=png 喜欢就点击“好看”吧

JDK的SPI机制的缺点

⽂件中的所有类都会被加载且被实例化。这样也就导致获取某个实现类的方式不够灵活,只能通过 Iterator 形式获取,不能根据某个参数来获取对应的实现类。如果不想用某些实现类,或者某些类实例化很耗时,它也被载入并实例化了,没有办法指定某⼀个类来加载和实例化,这就造成了浪费。

此时dubbo的SPI可以解决

dubbo的SPI机制

dubbo⾃⼰实现了⼀套SPI机制来解决Java的SPI机制存在的问题。

dubbo中则采用了类似kv对的样式,在具体使用的时候则通过相关想法即可获取,而且获取的文件路径也不一致

ExtensionLoader 类文件

java
代码解读
复制代码
private static final String SERVICES_DIRECTORY = "META-INF/services/"; private static final String DUBBO_DIRECTORY = "META-INF/dubbo/"; private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";

如上述代码片段可知,dubbo是支持从META-INF/dubbo/,META-INF/dubbo/internal/以及META-INF/services/三个文件夹的路径去获取spi配置

例如com.alibaba.dubbo.rpc.Protocol 文件内容

java
代码解读
复制代码
registry=com.alibaba.dubbo.registry.integration.RegistryProtocol filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper mock=com.alibaba.dubbo.rpc.support.MockProtocol injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol com.alibaba.dubbo.rpc.protocol.http.HttpProtocol com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol memcached=memcom.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol

不过观察上述文件会发现,HttpProtocol是没有对应的k值,那就是说无法通过kv对获取到其协议实现类。后面通过源码可以发现,如果没有对应的name的时候,dubbo会通过findAnnotationName方法获取一个可用的name

Dubbo SPI 使用 & 源码学习

通过获取协议的代码来分析下具体的操作过程

Protocol 获取

Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

java
代码解读
复制代码
public static ExtensionLoader getExtensionLoader(Class type) { // 静态方法,意味着可以直接通过类调用 if (type == null) throw new IllegalArgumentException("Extension type == null"); if(!type.isInterface()) { throw new IllegalArgumentException("Extension type(" + type + ") is not interface!"); } if(!withExtensionAnnotation(type)) { throw new IllegalArgumentException("Extension type(" + type + ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!"); } ExtensionLoader loader = (ExtensionLoader) EXTENSION_LOADERS.get(type); // 从map中获取该类型的ExtensionLoader数据 if (loader == null) { EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader(type)); // 如果没有则,创建一个新的ExtensionLoader对象,并且以该类型存储 loader = (ExtensionLoader) EXTENSION_LOADERS.get(type); // 再从map中获取 // 这里这样做的原因就是为了防止并发的问题,而且map本是也是个ConcurrentHashMap } return loader; }
java
代码解读
复制代码
private ExtensionLoader(Class type) { this.type = type; objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()); } // 传入的type是Protocol.class,所以需要获取ExtensionFactory.class最合适的实现类
java
代码解读
复制代码
public T getAdaptiveExtension() { Object instance = cachedAdaptiveInstance.get(); if (instance == null) { if(createAdaptiveInstanceError == null) { synchronized (cachedAdaptiveInstance) { instance = cachedAdaptiveInstance.get(); if (instance == null) { try { instance = createAdaptiveExtension(); // 创建对象,也是需要关注的函数 cachedAdaptiveInstance.set(instance); } catch (Throwable t) { createAdaptiveInstanceError = t; throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t); } } } } else { throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError); } } return (T) instance; }
java
代码解读
复制代码
private T createAdaptiveExtension() { try { return injectExtension((T) getAdaptiveExtensionClass().newInstance()); // (T) getAdaptiveExtensionClass().newInstance() 创建一个具体的实例对象 // getAdaptiveExtensionClass() 生成相关的class // injectExtension 往该对象中注入数据 } catch (Exception e) { throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e); } }
java
代码解读
复制代码
private Class getAdaptiveExtensionClass() { getExtensionClasses(); // 通过加载SPI配置,获取到需要的所有的实现类存储到map中 // 通过可能会去修改cachedAdaptiveClass数据,具体原因在spi配置文件解析中分析 if (cachedAdaptiveClass != null) { return cachedAdaptiveClass; } return cachedAdaptiveClass = createAdaptiveExtensionClass(); }
java
代码解读
复制代码
private Class createAdaptiveExtensionClass() { String code = createAdaptiveExtensionClassCode(); // 动态生成需要的代码内容字符串 ClassLoader classLoader = findClassLoader(); com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension(); return compiler.compile(code, classLoader); // 编译,生成相应的类 }

如下代码Protocol$Adpative 整个的类就是通过createAdaptiveExtensionClassCode()方法生成的一个大字符串

java
代码解读
复制代码
public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol { public void destroy() {throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); } public int getDefaultPort() {throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); } public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException { // 暴露远程服务,传入的参数是invoke对象 if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null"); com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); // 如果没有具体协议,则使用dubbo协议 if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.export(arg0); } public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException { // 引用远程对象,生成相对应的远程invoke对象 if (arg1 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg1; String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.refer(arg0, arg1); } }

到现在可以认为是最上面的获取protocol的方法Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension() 返回了一个代码拼接而成然后编译操作的实现类Protocol$Adpative

可是得到具体的实现呢?在com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName)这个代码中,当然这个是有在具体的暴露服务或者引用远程服务才被调用执行的。

java
代码解读
复制代码
public T getExtension(String name) { if (name == null || name.length() == 0) throw new IllegalArgumentException("Extension name == null"); if ("true".equals(name)) { return getDefaultExtension(); } Holder holder = cachedInstances.get(name); if (holder == null) { cachedInstances.putIfAbsent(name, new Holder()); holder = cachedInstances.get(name); } // 无论是否真有数据,在cachedInstances存储的是一个具体的Holder对象 Object instance = holder.get(); if (instance == null) { synchronized (holder) { instance = holder.get(); if (instance == null) { instance = createExtension(name); // 创建对象 holder.set(instance); } } } return (T) instance; }
java
代码解读
复制代码
private T createExtension(String name) { Class clazz = getExtensionClasses().get(name); // 获取具体的实现类的类 if (clazz == null) { throw findException(name); } try { T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) { EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance()); // clazz.newInstance 才是真正创建对象的操作 instance = (T) EXTENSION_INSTANCES.get(clazz); } injectExtension(instance); // 往实例中反射注入参数 Set> wrapperClasses = cachedWrapperClasses; if (wrapperClasses != null && wrapperClasses.size() > 0) { for (Class wrapperClass : wrapperClasses) { instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } return instance; } catch (Throwable t) { throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type + ") could not be instantiated: " + t.getMessage(), t); } }

到这一步就可以认为是dubbo的spi加载整个的过程完成了,整个链路有些长,需要好好的梳理一下

SPI配置文件解析

上文说到getExtensionClasses完成对spi文件的解析

java
代码解读
复制代码
private Map> loadExtensionClasses() { final SPI defaultAnnotation = type.getAnnotation(SPI.class); // 查看该类是否存在SPI注解信息 if(defaultAnnotation != null) { String value = defaultAnnotation.value(); if(value != null && (value = value.trim()).length() > 0) { String[] names = NAME_SEPARATOR.split(value); if(names.length > 1) { throw new IllegalStateException("more than 1 default extension name on extension " + type.getName() + ": " + Arrays.toString(names)); } if(names.length == 1) cachedDefaultName = names[0]; // 设置默认的名称,如果注解的值经过切割,发现超过1个的数据,则同样会认为错误 } } Map> extensionClasses = new HashMap>(); loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY); // 加载文件 loadFile(extensionClasses, DUBBO_DIRECTORY); loadFile(extensionClasses, SERVICES_DIRECTORY); return extensionClasses; } private void loadFile(Map> extensionClasses, String dir) { String fileName = dir + type.getName(); // type.getName就是类名称,和java的类似 try { Enumeration urls; ClassLoader classLoader = findClassLoader(); if (classLoader != null) { urls = classLoader.getResources(fileName); } else { urls = ClassLoader.getSystemResources(fileName); } if (urls != null) { while (urls.hasMoreElements()) { java.net.URL url = urls.nextElement(); try { BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8")); try { String line = null; while ((line = reader.readLine()) != null) { final int ci = line.indexOf('#'); if (ci >= 0) line = line.substring(0, ci); line = line.trim(); if (line.length() > 0) { try { String name = null; int i = line.indexOf('='); // 对k=v这样的格式进行分割操作,分别获取 if (i > 0) { name = line.substring(0, i).trim(); line = line.substring(i + 1).trim(); } if (line.length() > 0) { Class clazz = Class.forName(line, true, classLoader); if (! type.isAssignableFrom(clazz)) { throw new IllegalStateException("Error when load extension class(interface: " + type + ", class line: " + clazz.getName() + "), class " + clazz.getName() + "is not subtype of interface."); } if (clazz.isAnnotationPresent(Adaptive.class)) { // 如果获取的类包含了Adaptive注解 if(cachedAdaptiveClass == null) { cachedAdaptiveClass = clazz; } else if (! cachedAdaptiveClass.equals(clazz)) { // 已经存在了该数据,现在又出现了,则抛出异常 throw new IllegalStateException("More than 1 adaptive class found: " + cachedAdaptiveClass.getClass().getName() + ", " + clazz.getClass().getName()); } } else { // 不包含Adaptive注解信息 try { clazz.getConstructor(type); // 查看构造函数是否包含了type的类型参数 // 如果不存在,则抛出NoSuchMethodException异常 Set> wrappers = cachedWrapperClasses; if (wrappers == null) { cachedWrapperClasses = new ConcurrentHashSet>(); wrappers = cachedWrapperClasses; } wrappers.add(clazz); // 往wrappers中添加该类 } catch (NoSuchMethodException e) { clazz.getConstructor(); if (name == null || name.length() == 0) { // 没用名字的那种,也就是不存在k=v这种样式 // 例如上面的HttpProtocol name = findAnnotationName(clazz); // 查看该类是否存在注解 if (name == null || name.length() == 0) { if (clazz.getSimpleName().length() > type.getSimpleName().length() && clazz.getSimpleName().endsWith(type.getSimpleName())) { name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase(); } else { throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url); } } } String[] names = NAME_SEPARATOR.split(name); // 可能存在多个,切割开 if (names != null && names.length > 0) { Activate activate = clazz.getAnnotation(Activate.class); if (activate != null) { // 如果类存在Activate的注解信息 cachedActivates.put(names[0], activate); } for (String n : names) { if (! cachedNames.containsKey(clazz)) { cachedNames.put(clazz, n); } Class c = extensionClasses.get(n); if (c == null) { extensionClasses.put(n, clazz); // 往容器中填充该键值对信息,k和v } else if (c != clazz) { // 存在多个同名扩展类,则抛出异常信息 throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName()); } } } } } } } catch (Throwable t) { IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t); exceptions.put(line, e); } } } // end of while read lines } finally { reader.close(); } } catch (Throwable t) { logger.error("Exception when load extension class(interface: " + type + ", class file: " + url + ") in " + url, t); } } // end of while urls } } catch (Throwable t) { logger.error("Exception when load extension class(interface: " + type + ", description file: " + fileName + ").", t); } }

这样完成了对spi配置文件的整个的扫描过程了

注:本文转载自blog.csdn.net的CSDN云计算的文章"https://blog.csdn.net/FL63Zv9Zou86950w/article/details/88802057"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top