c语言sscanf函数的用法是什么
257
2022-11-26
七、HDFS上传和下载原理(有源码解析)
[TOC]
一、HDFS文件上传基本原理
1、基本流程
1)客户端通过本地通过RPC与namenode建立rpc通信,然后请求上传文件2)namenode收到请求后,会检查是否能创建该文件(比如校验用户是否有该权限,文件是否已经存在等)。如果检查通过,namenode就会开始记录该新文件的元信息(先写入到edits文件,然后更新内存中的metadata),并响应client可以开始上传。3)client 在本地将文件进行切块(按照指定的block大小)。然后请求namemode上传第一个block。4)namenode根据策略以及每个datanode的情况,返回3个datanode地址给client(这里默认3副本)。5)client与请求namenode返回的3个datanode建立pipeline,即 client请求dn1,dn1请求dn2,dn2请求dn3,这样一个串行通道。6)3个datanode逐级响应,最终响应给client。表示可以传输数据7)client会将每个block还会分割成一个个packet,然后放入 data queue中,等待上传。每传输一个packet,就会将packet加入到另外一个 ack queue中,等到pipeline中的datanode响应传输完成后,就会讲相应的packet从ack queue中移除。8)后面就是重复上面的流程,直到client关闭通道,并将所有的queue中的packet刷写到pipeline中之后,datanode就会标记文件已完成。
注意:client完成写入之后,此时block 才是可见的,正在写的block是不可见的。当调用sync方法时(将缓冲区数据刷写到磁盘中),client才确认写入已经完成。client关闭流时调用 的close方法,底层就会调用sync。是否需要手动调用取决你根据程序需 要在数据健壮性和吞吐率之间的权衡。
2、datanode发生错误的解决方式
问题:传输过程中,某个datanode发生错误,hdfs是怎么解决?1)pipeline关闭掉2)为了防止丢包,ack queue中的packet会同步到data queue中。重新进行下一次传输。3)把产生错误的datanode上当前在写,但未完成的block删除掉4)剩下的block写到剩余两个正常的datanode中。5)namenode会自动寻找另外合适的一个datanode复制另外两个datanode中刷写的block,完成3副本的写入。当然,这个操作namenode的内部机制,对client来说是无感知的。
3、元数据存储
namenode使用两种文件保存元数据,fsimag和edits文件。fsimage:元数据镜像文件,存储某一时间段内的namenode的内存元数据信息edits:操作日志文件。fstime:保存最近一次checkpoint的时间。更详细的 fsimage和edits文件讲解,请看 “hdfs体系架构”
4、元数据的合并
1)首先是 SNN通知 NN切换edits文件,主要是保证合并过程有新的写入操作时能够正常写入edits文件。2)SNN通过fsimage和edits文件。3)SNN将fsiamge载入内存,开始合并edits到fsimage,生成新的fsimage4)SNN将新的fsimage发送给NN5)NN用新的fsimage,替换旧的fsimage。
4、写入时的网络拓扑选择
写入操作时,默认3副本,那么副本分布在哪些datanode节点上,会影响写入速度。在hdfs的网络拓扑中,有那么四种物理范围,同一节点、同一机架上的不同节点、同一机房中不同节点、不同机房中的不同节点。这4中物理范围表示节点间的距离逐渐增大。这种物理距离越远会影响副本之间所在节点之间的传输效率,即传输效率越低。
5、机架感知
路径是 r1/n1 --> r2/n1 --> r2/n2
路径是 r1/n1 --> r1/n2 --> r2/n2(后面这个其实任意都行,主要处于不同机架就好)这种方式比第一种要好,因为这种方式数据经过的总路径更短了,只要一个副本需要跨机架传输,而上面的则有两个副本需要跨机架传输。
二、HDFS上传文件源码分析
下面的分析过程基于 hadoop2.8.4 的源码分析的。
1、client初始化源码分析
一般来说,会先通过 FileSystem.get() 获取到操作hdfs 的客户端对象,后面所有的操作都通过调用该对象的方法完成的。
FileSystem client = FileSystem.get(new URI("hdfs://bigdata121:9000"), conf);
接着我们看看 FileSystem.get() 的实现
public static FileSystem get(URI uri, Configuration conf) throws IOException { String scheme = uri.getScheme(); String authority = uri.getAuthority(); if (scheme == null && authority == null) { return get(conf); } else { if (scheme != null && authority == null) { URI defaultUri = getDefaultUri(conf); if (scheme.equals(defaultUri.getScheme()) && defaultUri.getAuthority() != null) { return get(defaultUri, conf); } } String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme); /* 这里是关键代码,表示进入 CACHE.get() 方法 */ return conf.getBoolean(disableCacheName, false) ? createFileSystem(uri, conf) : CACHE.get(uri, conf); } }
CACHE是FileSystem的一个静态内部类Cache 的对象。继续看看 CACHE.get()方法
FileSystem get(URI uri, Configuration conf) throws IOException { FileSystem.Cache.Key key = new FileSystem.Cache.Key(uri, conf); //进入CACHE对象的 getInternal() 方法 return this.getInternal(uri, conf, key); }
进入CACHE对象的 getInternal() 方法
private FileSystem getInternal(URI uri, Configuration conf, FileSystem.Cache.Key key) throws IOException { FileSystem fs; synchronized(this) { /* 获取map中的filesytem对象,表示之前已经初始化了filesystem对象,并存储到map集合中,现在直接从map中获取就好。 */ fs = (FileSystem)this.map.get(key); } if (fs != null) { //如果fs存在,就直接返回存在的filesytem实例即可 return fs; } else { //如果是初次使用filesystem,就得创建并初始化 fs = FileSystem.createFileSystem(uri, conf); synchronized(this) { FileSystem oldfs = (FileSystem)this.map.get(key); if (oldfs != null) { fs.close(); return oldfs; } else { if (this.map.isEmpty() && !ShutdownHookManager.get().isShutdownInProgress()) { ShutdownHookManager.get().addShutdownHook(this.clientFinalizer, 10); } fs.key = key; this.map.put(key, fs); if (conf.getBoolean("fs.automatic.close", true)) { this.toAutoClose.add(key); } return fs; } } } }
我们看到了上面有两种方式,一种是如果filesytem对象已存在,那么直接从map获取并返回对象即可。如果不存在,就调用 FileSystem.createFileSystem() 方法创建,创建完成后返回fs。下面看看这个方法.
private static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException { Tracer tracer = FsTracer.get(conf); TraceScope scope = tracer.newScope("FileSystem#createFileSystem"); scope.addKVAnnotation("scheme", uri.getScheme()); FileSystem var6; try { Class> clazz = getFileSystemClass(uri.getScheme(), conf); FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf); //这是关键性的代码,看名字就知道,对filesytem 进行初始化 fs.initialize(uri, conf); var6 = fs; } finally { scope.close(); } return var6; }
我们要注意,FileSystem这个类是抽象类,它的实现子类是 DistributedFileSystem,所以虽然 fs是FileSystem类型的,但是对象本身是DistributedFileSystem类型的,也就是java 的多态特性。所以fs.initialize() 调用的实际上是 DistributedFileSystem中initialize()方法。下面看看这个方法
/* DistributedFileSystem.class */ public void initialize(URI uri, Configuration conf) throws IOException { super.initialize(uri, conf); this.setConf(conf); String host = uri.getHost(); if (host == null) { throw new IOException("Incomplete HDFS URI, no host: " + uri); } else { this.homeDirPrefix = conf.get("dfs.user.home.dir.prefix", "/user"); //这是关键性代码,创建了一个DFSClient对象,顾名思义就是RPC的客户端 this.dfs = new DFSClient(uri, conf, this.statistics); this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); this.workingDir = this.getHomeDirectory(); this.storageStatistics = (DFSOpsCountStatistics)GlobalStorageStatistics.INSTANCE.put("DFSOpsCountStatistics", new StorageStatisticsProvider() { public StorageStatistics provide() { return new DFSOpsCountStatistics(); } }); } }
看到上面创建了一个 DFSClient() 对象,赋值给了 this.dfs。下面看看这个类的构造方法。
/*
DFSClient.class
*/
public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, Statistics stats) throws IOException {
.............................
/*源码比较长,所以截取重要的部分显示*/
//这是一个关键性变量,其实就是namenode代理对象,只不过还没有创建对象
ProxyAndInfo
可以看到上面已经通过 this.namenode = (ClientProtocol)proxyInfo.getProxy(); 获取到了 namenode 的代理对象,也就是rpc的客户端对象。下面看看 ClientProtocol 这个是啥东西,因为代理对象是这个类型的。
/* ClientProtocol.class 这是个接口 */ public interface ClientProtocol { long versionID = 69L; /* 下面主要是定义很多个抽象方法,主要就是用于对hdfs进行操作的接口,比如,open,create等这些常用方法。 */ }
下面看看 proxyInfo创建代理对象的方法
/*
NameNodeProxiesClient
*/
public static NameNodeProxiesClient.ProxyAndInfo
可以看到上面是已经创建了 proxy对象,并返回,而且我们也可以看到,创建的proxy对象就是clientProtocol类型的。下面看看创建proxy对象的方法 createNonHAProxyWithClientProtocol()
/*
NameNodeProxiesClient
*/
public static ClientProtocol createNonHAProxyWithClientProtocol(InetSocketAddress address, Configuration conf, UserGroupInformation ugi, boolean withRetries, AtomicBoolean fallbackToSimpleAuth) throws IOException {
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
RetryPolicy defaultPolicy = RetryUtils.getDefaultRetryPolicy(conf, "dfs.client.retry.policy.enabled", false, "dfs.client.retry.policy.spec", "10000,6,60000,10", SafeModeException.class.getName());
long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
//这里是核心代码,可以明显看到调用 RPC 模块中的方法创建proxy对象
ClientNamenodeProtocolPB proxy = (ClientNamenodeProtocolPB)RPC.getProtocolProxy(ClientNamenodeProtocolPB.class, version, address, ugi, conf, NetUtils.getDefaultSocketFactory(conf), Client.getTimeout(conf), defaultPolicy, fallbackToSimpleAuth).getProxy();
if (withRetries) {
Map
所以至此我们可以发现,客户端和namenode之间通信的方式就是通过RPC实现的。
2、上传源码分析
一般来说,上传操作,首先得
OutputStream os = fs.create(new Path("xxxx"));
即创建文件,然后再上传文件数据。上传数据的流程和普通的流操作没什么区别。下面看看这个 create方法。
/* FileSystem.class */ public abstract FSDataOutputStream create(Path var1, FsPermission var2, boolean var3, int var4, short var5, long var6, Progressable var8) throws IOException;
可以看到这是个抽象方法,前面也说到,它的实现子类是 DistributedFileSystem,这里这里是调用子类的 create方法,继续看
/*
DistributedFileSystem.class
*/
public FSDataOutputStream create(Path f, final FsPermission permission, final EnumSet
可以看见上面创建返回了 DFSOutputStream 输出流对象。下面看看DFSClient.create方法的实现代码。
/*
DFSClient.class
*/
public DFSOutputStream create(String src, FsPermission permission, EnumSet
继续看 DFSOutputStream.newStreamForCreate 这个方法.
/*
DistributedFileSystem.class
*/
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet
上面看到 DFSOutputStream 对象居然有一个 start方法,来看看先。
/* DFSOutputStream.class */ protected synchronized void start() { this.getStreamer().start(); } // 继续看 this.getStreamer() 这个方法,可以看到这个方法返回的是DataStreamer,继续看这个类 protected DataStreamer getStreamer() { return this.streamer; } /* DataStreamer.class */ //可以看到这个类继承了 Daemon类,而Daemon本身是继承了 Thread类 class DataStreamer extends Daemon { }
由此可得知,DFSOutputStream 这个类本身并没有继承 Thread类,但是使用DataStreamer这个继承了 Thread类的来新建线程传输数据,不占用当前线程。而在 DataStreamer 这个类中,重写了 Thread标志性的 run 方法。传输数据就是在这里完成的。前面说到的 hdfs的 pipeline 也是这个run方法中实现的,里面是一个while死循环,知道传输完数据为止,或者客户端关闭。代码过长,就不看了。反正看到这里已经成功获取了 client的输出流对象,后面就是传统的输入流和输出流的对接了,这里不细讲了。
1、FileSystem初始化,Client拿到NameNodeRpcServer代理对象,建立与NameNode的RPC通信2、调用FileSystem的create()方法,由于实现类为DistributedFileSystem,所有是调用该类中的create()方法3、DistributedFileSystem持有DFSClient的引用,继续调用DFSClient中的create()方法4、DFSOutputStream提供的静态newStreamForCreate()方法中调用NameNodeRpcServer服务端的create()方法并创建DFSOutputStream输出流对象返回5、通过hadoop提供的IOUtil工具类将输出流输出到本地
三、HDFS下载基本原理
1)客户端向namenode请求下载文件,namenode在内存的metadata查找对应的文件的元数据,如果无则返回无,有则返回对应文件的block位置信息。而且,namenode会根据客户端所在的位置,根据datanode以及client之间的距离大小,将返回的 block 的副本的datanode节点从距离小到大排序,距离最近的datanode则排在第一位。2)client通过机架感知策略,选择最近的datanode进行block请求读取3)datanode开始传输数据给client,以packet为单位,并做校验4)客户端接收packet之后,本地缓存,然后再往本地路径写入该block。5)第二块,第三块block重复以上过程
注意:
如果在读数据的时候, DFSInputStream和datanode的通讯发生异常,就会尝试正在读的block的排序第二近的datanode,并且会记录哪个 datanode发生错误,剩余的blocks读的时候就会直接跳过该datanode。 DFSInputStream也会检查block数据校验和,如果发现一个坏的block,就会先报告到namenode节点,然后 DFSInputStream在其他的datanode上读该block的镜像。
四、HDFS下载源码分析
client的初始化代码是一样的,这里不重复分析了。直接看下载首先通过 open方法获取目标文件的输入流对象。
FSDataInputStream fis = client.open(getPath);
下面看看这个open方法
/* FileSystem.class */ public FSDataInputStream open(Path f) throws IOException { return this.open(f, this.getConf().getInt("io.file.buffer.size", 4096)); } public abstract FSDataInputStream open(Path var1, int var2) throws IOException;
可以看到,依旧是抽象方法,所以依旧是调用 DistributedFileSystem的open方法。
/*
DistributedFileSystem.class
*/
public FSDataInputStream open(Path f, final int bufferSize) throws IOException {
this.statistics.incrementReadOps(1);
this.storageStatistics.incrementOpCounter(OpType.OPEN);
Path absF = this.fixRelativePart(f);
return (FSDataInputStream)(new FileSystemLinkResolver
熟悉的套路,依旧调用 dfsclient的open方法,创建输入流,下面看看这个open方法
/* DFSClient.class */ public DFSInputStream open(String src, int buffersize, boolean verifyChecksum) throws IOException { this.checkOpen(); TraceScope ignored = this.newPathTraceScope("newDFSInputStream", src); Throwable var5 = null; DFSInputStream var6; try { //这里直接创建一个输入流对象,如果按照上面上传文件的套路来说,应该是 dfsclient.namenode.open(xxx)才对的,这里并没有 var6 = new DFSInputStream(this, src, verifyChecksum, (LocatedBlocks)null); } catch (Throwable var15) { var5 = var15; throw var15; } finally { if (ignored != null) { if (var5 != null) { try { ignored.close(); } catch (Throwable var14) { var5.addSuppressed(var14); } } else { ignored.close(); } } } return var6; }
上面并没有调用DFSClient.open,而是将DFSClient作为参数传入DFSInputStream。下面看看 DFSInputStream 这个神奇的类。
/* DFSInputStream.class */ DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, LocatedBlocks locatedBlocks) throws IOException { //将dfsclinet保存到当前类中 this.dfsClient = dfsClient; this.verifyChecksum = verifyChecksum; this.src = src; synchronized(this.infoLock) { this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy(); } this.locatedBlocks = locatedBlocks; //核心方法,开始获取block信息,如有多少个block,以及每个block所在的datanode节点名 this.openInfo(false); }
下面看看 openInfo() 方法
/* DFSInputStream.class */ void openInfo(boolean refreshLocatedBlocks) throws IOException { DfsClientConf conf = this.dfsClient.getConf(); synchronized(this.infoLock) { //获取block的位置信息以及最后一个block的长度(因为最后一个block肯定不是完整的128M的长度) this.lastBlockBeingWrittenLength = this.fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks); int retriesForLastBlockLength; for(retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength(); retriesForLastBlockLength > 0 && this.lastBlockBeingWrittenLength == -1L; --retriesForLastBlockLength) { DFSClient.LOG.warn("Last block locations not available. Datanodes might not have reported blocks completely. Will retry for " + retriesForLastBlockLength + " times"); this.waitFor(conf.getRetryIntervalForGetLastBlockLength()); this.lastBlockBeingWrittenLength = this.fetchLocatedBlocksAndGetLastBlockLength(true); } if (this.lastBlockBeingWrittenLength == -1L && retriesForLastBlockLength == 0) { throw new IOException("Could not obtain the last block locations."); } } }
下面看看 fetchLocatedBlocksAndGetLastBlockLength 这个获取block信息的方法
/*
DFSInputStream.class
*/
private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh) throws IOException {
LocatedBlocks newInfo = this.locatedBlocks;
if (this.locatedBlocks == null || refresh) {
//可以看到这里是调用 dfsclient中的方法俩获取block信息
newInfo = this.dfsClient.getLocatedBlocks(this.src, 0L);
}
DFSClient.LOG.debug("newInfo = {}", newInfo);
if (newInfo == null) {
throw new IOException("Cannot open filename " + this.src);
} else {
if (this.locatedBlocks != null) {
Iterator
看到上面又回到调用 dfsClient.getLocatedBlocks,看看这个方法
/* DFSClient.class */ public LocatedBlocks getLocatedBlocks(String src, long start) throws IOException { return this.getLocatedBlocks(src, start, this.dfsClientConf.getPrefetchSize()); } //继续调用下面这个方法 public LocatedBlocks getLocatedBlocks(String src, long start, long length) throws IOException { TraceScope ignored = this.newPathTraceScope("getBlockLocations", src); Throwable var7 = null; LocatedBlocks var8; try { //调用这个静态方法获取 block位置信息 var8 = callGetBlockLocations(this.namenode, src, start, length); } catch (Throwable var17) { var7 = var17; throw var17; } finally { if (ignored != null) { if (var7 != null) { try { ignored.close(); } catch (Throwable var16) { var7.addSuppressed(var16); } } else { ignored.close(); } } } return var8; } //继续看 static LocatedBlocks callGetBlockLocations(ClientProtocol namenode, String src, long start, long length) throws IOException { try { //熟悉的味道,通过 namenode 的代理对象获取block信息 return namenode.getBlockLocations(src, start, length); } catch (RemoteException var7) { throw var7.unwrapRemoteException(new Class[]{AccessControlException.class, FileNotFoundException.class, UnresolvedPathException.class}); } }
上面可以看到,仍旧是通过 namenode代理对象发起操作,下面看看 namenode.getBlockLocations。因为代理对象的类型是 ClientProtocol类型的,是个接口,所以得到实现子类中查看 ,ClientNamenodeProtocolTranslatorPB这个类。
/* ClientNamenodeProtocolTranslatorPB.class */ public LocatedBlocks getBlockLocations(String src, long offset, long length) throws IOException { GetBlockLocationsRequestProto req = GetBlockLocationsRequestProto.newBuilder().setSrc(src).setOffset(offset).setLength(length).build(); try { //熟悉的味道,调用 rcpProxy 向namenode server 发起操作。 GetBlockLocationsResponseProto resp = this.rpcProxy.getBlockLocations((RpcController)null, req); return resp.hasLocations() ? PBHelperClient.convert(resp.getLocations()) : null; } catch (ServiceException var8) { throw ProtobufHelper.getRemoteException(var8); } }
看到这里,下面就是RPC底层的操作了。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~