hadoop加载fs.hdfs.impl

网友投稿 424 2022-11-22

hadoop加载fs.hdfs.impl

背景

目前core-default.xml文件中,HDFS客户端的类名配置项为:

fs.AbstractFileSystem.hdfs.impl org.apache.hadoop.fs.Hdfs The FileSystem for hdfs: uris.

实际上,生效的确是如下配置:

org.apache.hadoop.hdfs.DistributedFileSystem

然而,在core-site.xml中,并没有指定重写这一项属性,源码中也没有显示指定这一属性。因此,可以研究一下DistributedFileSystem类的加载过程。

设想

在hadoop源码包下,有如下文件:hadoop-hdfs-project/hadoop-hdfs-client/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem,文件内容如下:

org.apache.hadoop.hdfs.DistributedFileSystem org.apache.hadoop.hdfs.web.WebHdfsFileSystem org.apache.hadoop.hdfs.web.SWebHdfsFileSystem

看到这里,可以想到,系统通过SPI的方式加载到了DistributedFileSystem类。接下来验证这个想法...

验证

创建HDFS的客户端对象会调用createFileSystem方法:

//uri是hdfs://192.168.1.2:9000/格式 private static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException { Tracer tracer = FsTracer.get(conf); try(TraceScope scope = tracer.newScope("FileSystem#createFileSystem")) { scope.addKVAnnotation("scheme", uri.getScheme()); //scheme是hdfs //获取hdfs这种scheme对应的文件系统的实现类,这里理应是DistributedFileSystem Class clazz = getFileSystemClass(uri.getScheme(), conf); //新建DistributedFileSystem对象 FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf); fs.initialize(uri, conf); return fs; }

获取HDFS文件系统的Class对象:

public static Class getFileSystemClass(String scheme, Configuration conf) throws IOException { //FILE_SYSTEMS_LOADED表示FileSystem是否已经加载过了,如果已经加载过了,后续就不会再次加载 if (!FILE_SYSTEMS_LOADED) { //通过SPI机制加载FileSystem的实现类 loadFileSystems(); } LOGGER.debug("Looking for FS supporting {}", scheme); Class clazz = null; if (conf != null) { String property = "fs." + scheme + ".impl"; LOGGER.debug("looking for configuration option {}", property); //尝试从core-default.xml或core-site.xml文件中加载fs.hdfs.impl配置,实际上文件中是没有这个配置的 clazz = (Class) conf.getClass( property, null); } else { LOGGER.debug("No configuration: skipping check for fs.{}.impl", scheme); } //实际上是从这里面获取到hdfs配置,loadFileSystems方法将实现类放到SERVICE_FILE_SYSTEMS变量中 if (clazz == null) { LOGGER.debug("Looking in service filesystems for implementation class"); //scheme是"hdfs" clazz = SERVICE_FILE_SYSTEMS.get(scheme); } else { LOGGER.debug("Filesystem {} defined in configuration option", scheme); } if (clazz == null) { throw new UnsupportedFileSystemException("No FileSystem for scheme " + "\"" + scheme + "\""); } LOGGER.debug("FS for {} is {}", scheme, clazz); return clazz; }

loadFileSystems方法超级重要,实际上就是这个方法加载FileSystem实现类名:

private static void loadFileSystems() { LOGGER.debug("Loading filesystems"); synchronized (FileSystem.class) { if (!FILE_SYSTEMS_LOADED) { //这里就证实了猜想,确实使用SPI加载DistributedFileSystem类 ServiceLoader serviceLoader = ServiceLoader.load(FileSystem.class); Iterator it = serviceLoader.iterator(); while (it.hasNext()) { FileSystem fs; try { fs = it.next(); try { // DistributedFileSystem的scheme成员就是hdfs,将它们放入SERVICE_FILE_SYSTEMS中 SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("{}:// = {} from {}", fs.getScheme(), fs.getClass(), ClassUtil.findContainingJar(fs.getClass())); } } catch (Exception e) { LOGGER.warn("Cannot load: {} from {}", fs, ClassUtil.findContainingJar(fs.getClass())); LOGGER.info("Full exception loading: {}", fs, e); } } catch (ServiceConfigurationError ee) { LOG.warn("Cannot load filesystem: " + ee); Throwable cause = ee.getCause(); // print all the nested exception messages while (cause != null) { LOG.warn(cause.toString()); cause = cause.getCause(); } // and at debug: the full stack LOG.debug("Stack Trace", ee); } } FILE_SYSTEMS_LOADED = true; } } }

总结

hadoop通过SPI机制,向SERVICE_FILE_SYSTEMS这个map中加入("hdfs", DistributedFileSystem.class),后续访问map即可获取到DistributedFileSystem实现类。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:什么是串行端口,它一般分为哪几种类
下一篇:0010 - YARN入门指南
相关文章

 发表评论

暂时没有评论,来抢沙发吧~