當前位置:編程學習大全網 - 源碼下載 - Hadoop讀寫文件的內部工作機制是怎樣的?

Hadoop讀寫文件的內部工作機制是怎樣的?

客戶端通過調用FileSystem對象的open()方法打開文件(對應HDFS文件系統,調用DistributedFileSystem對象)(也就是圖中的第壹步),DistributedFileSystem通過RPC(遠程過程調用)查詢NameNode得到這個文件的前幾個塊的文件位置(第二步)。對於每個塊,namenode返回擁有該塊備份的所有NameNode的地址信息(在集群的拓撲網絡中按離客戶端的距離排序,關於如何在Hadoop集群中進行網絡拓撲,請參見下面的介紹)。如果客戶端本身是壹個datanode(例如客戶端是壹個mapreduce任務),並且datanode本身具有所需的文件塊,則客戶端將在本地讀取文件。

以上步驟完成後,DistributedFileSystem會返回壹個FSDataInputStream(支持文件尋道),客戶端可以從FSDataInputStream中讀取數據。FSDataInputStream包裝了壹個DFSInputSteam類來處理namenode和datanode的I/O操作。

然後客戶端執行read()方法(步驟3),DFSInputStream(已經存儲了要讀取的文件的前幾個塊的位置信息)連接到第壹個datanode(也就是最近的datanode)獲取數據。通過重復調用read()方法(第4步和第5步),文件中的數據被傳輸到客戶機。當讀取到塊的末尾時,DFSInputStream會關閉指向該塊的流,轉而查找下壹個塊的位置信息,然後反復調用read()方法繼續該塊的流讀取。這些過程對用戶是透明的,在用戶看來,這是對整個文件的不間斷流式讀取。

當真正的文件被讀取後,客戶端調用FSDataInputSteam中的close()方法來關閉文件輸入流(步驟6)。

如果DFSInputStream在讀取壹個塊時檢測到錯誤,dfsinputstream會連接到下壹個datanode以獲取該塊的其他備份,同時會記錄之前檢測到的損壞的datanode,以避免以後重復讀取該datanode。DFSInputSteam還會檢查從datanode讀取的數據的校驗和,如果發現有數據損壞,就會向namenode報告損壞的塊,並重新讀取其他datanode上的其他塊備份。

這種設計模式的壹個好處是文件讀取遍布在這個集群的datanode上,namenode只提供文件塊的位置信息,需要的帶寬非常少,有效避免了單點瓶頸問題,擴大了集群的規模。

Hadoop中的網絡拓撲

如何測量Hadoop集群中兩個節點之間的距離?要知道,在高速處理數據的時候,數據處理速度的唯壹限制因素就是數據在不同節點之間的傳輸速度:這是因為帶寬的可怕不足造成的。所以我們以帶寬為標準來衡量兩個節點之間的距離。

但是計算兩個節點之間的帶寬比較復雜,需要在靜態集群中進行測量,但是Hadoop集群壹般是隨著數據處理的規模動態變化的(而且兩個節點之間直接連接的連接數是節點數的平方)。所以Hadoop用壹個簡單的方法來衡量距離。它將集群中的網絡表示為樹形結構,兩個節點之間的距離是它們與祖先的距離之和。樹通常根據數據中心、機架和datanode的結構來組織。計算節點上的本地計算速度最快,跨數據中心的計算速度最慢(跨數據中心的Hadoop集群現在已經很少用了,壹般都是在壹個數據中心完成)。

如果壹個計算節點n1位於數據中心c1的機架r1上,則可以表示為/C1/R1,以下是不同條件下兩個節點之間的距離:

距離(/d1/r1/n1,/d1/r1/n1) = 0(同壹節點上的進程)

距離(/d1/r1/n1,/d1/r1/n2) = 2(同壹機架上的不同節點)

距離(/d1/r1/n1,/d1/r2/n3) = 4(同壹數據中心不同機架上的節點)

距離(/d1/r1/n1,/d2/r3/n4) = 6(不同數據中心的節點)

如下圖所示:

Hadoop

寫文件

現在我們來看看Hadoop中的文件寫入機制。通過文件寫入機制,我們可以更好地理解Hadoop中的壹致性模型。

Hadoop

上圖向我們展示了壹個創建新文件並向其中寫入數據的例子。

首先,客戶端通過DistributedFileSystem上的create()方法指明要創建的文件的文件名(步驟1),然後DistributedFileSystem通過RPC調用向NameNode申請創建壹個新文件(步驟2,文件還沒有分配到相應的塊)。Namenode檢查是否存在同名文件,以及用戶是否具有創建該文件的相應權限。如果檢查通過,namenode將為該文件創建壹個新記錄;否則,文件創建會失敗,客戶端會得到壹個IOException異常。DistributedFileSystem返回壹個FSDataOutputStream供客戶端寫入數據。與FSDataInputStream類似,FSDataOutputStream封裝了壹個DFSOutputStream來處理namenode和datanode之間的通信。

當客戶端開始寫入數據時(步驟3),DFSOutputStream將寫入的數據分成包,並將它們放入壹個中間隊列——數據隊列。DataStreamer從數據隊列中獲取數據,並向namenode應用壹個新的塊來存儲它所獲得的數據。Namenode選擇壹系列合適的datanodes(數量由文件中副本的數量決定)來形成管道。這裏,我們假設副本是3,因此管道中有三個datanodes。DataSteamer將數據寫入管道流中的第壹個datanode(步驟4),第壹個datanode將接收到的數據傳輸到第二個datanode(步驟4),依此類推。

同時,DFSOutputStream還維護著另壹個中間隊列——ack隊列。ACK隊列中的數據包不會被移出ACK隊列,直到它們被管道中的所有datanode確認(步驟5)。

如果在寫入數據時典當了datanode,將執行以下對用戶透明的步驟:

1)當流水線關閉時,確認隊列中的所有數據都會被移動到數據隊列的頭部重新傳輸,這樣可以保證流水線中被典當的datanode下遊的datanode不會因為典當的datanode而丟失數據包。

2)在仍然正常運行的datanode上的當前塊上做壹個標記,這樣當被丟棄的datanode重新啟動時,namenode就會知道datanode上的哪個塊是崩潰後留下的部分損壞的塊,以便刪除。

3)將典當的datanode從流水線中移除,未完成塊的其他數據繼續寫入另外兩個仍在正常運行的datanode中。namenode知道這個塊仍然處於欠復制狀態(即備份數量不足),然後他會安排壹個新的副本來達到所需的備份數量。隨後的塊寫入方法與之前的正常時間相同。

管道中多個datanode失敗是有可能的(雖然不常見),但是只要創建了dfs.replication.min(默認為1)副本,我們就認為創建成功了。剩余的副本將在以後異步創建,以達到指定的副本數量。

當客戶端完成寫入數據時,它將調用close()方法(步驟6)。該操作將把所有剩余的包刷新到管道中,等待這些包被成功確認,然後通知namenode文件被成功寫入(步驟7)。此時,namenode知道文件由哪些塊組成(因為DataStreamer請求namenode分配新的塊,namenode當然會知道它為給定的文件分配了哪個塊),它會等待創建最小數量的副本,然後成功返回。

副本是如何分發的?

Hadoop在創建新文件時如何選擇塊的位置?壹般來說,要考慮以下因素:帶寬(包括寫帶寬和讀帶寬)和數據安全性。如果我們把三個備份都放在壹個datanode上,可以避免寫入帶寬的消耗,但是幾乎不能提供數據冗余帶來的安全性,因為如果這個datanode崩潰,這個文件的所有數據都會丟失。另壹個極端,如果三個冗余備份都放在不同的機架上,即使在數據中心,雖然數據會很安全,但是寫數據會消耗很多帶寬。Hadoop 0.17.0為我們提供了默認的副本分配策略(Hadoop 1之後。x,副本策略是允許可插拔的,也就是妳可以自己制定副本分配策略)。副本的默認分配策略是將第壹個備份放在與客戶端相同的datanode上(如果客戶端在群集外運行,則隨機選擇壹個datanode來存儲第壹個副本),將第二個副本放在與第壹個副本機架不同的隨機datanode上,將第三個副本放在與第二個副本機架相同的隨機datanode上。如果副本的數量大於三個,後續的副本會隨機存儲在集群中,Hadoop會盡量避免在同壹個機架中存儲太多副本。選擇副本位置後,管線的網絡拓撲如下:

Hadoop

總的來說,上面默認的副本分配策略給了我們很好的可用性(塊放在兩個機架上,更安全),寫帶寬優化(寫數據只需要跨壹個機架),讀帶寬優化(可以從兩個機架中選擇較近的壹個來讀)。

壹致性模型

HDFS可能在性能上有些地方不符合POSIX(是的,妳沒看錯,POSIX不僅僅適用於linux/unix,Hadoop使用POSIX設計讀取文件系統文件流),所以看起來可能和妳預想的不壹樣,壹定要小心。

創建文件後,可以在命名空間中看到它:

Path p =新路徑(" p ");

fs . create(p);

assertThat(fs.exists(p),is(true));

但是,寫入該文件的任何數據都不能保證可見。即使刷新寫入的數據,該文件的長度仍可能為零:

Path p =新路徑(" p ");

output stream out = fs . create(p);

out.write("內容"。getBytes(" UTF-8 "));

out . flush();

assertThat(fs.getFileStatus(p)。getLen(),is(0L));

這是因為,在Hadoop中,這個文件中的內容只有在壹個完整的數據塊被寫入文件後才會可見(也就是這些數據會被寫入硬盤),所以當前正在寫入的塊中的內容總是不可見的。

Hadoop提供了壹種強制將緩沖區中的內容刷新到datanode的方法,就是FSDataOutputStream的sync()方法。調用sync()方法後,Hadoop會確保所有已寫入的數據都被刷新到管道中的datanode中,並且對所有讀者都可見:

Path p =新路徑(" p ");

FSDataOutputStream out = fs . create(p);

out.write("內容"。getBytes(" UTF-8 "));

out . flush();

out . sync();

assertThat(fs.getFileStatus(p)。getLen(),是(((long)“內容”。長度()))));

這個方法類似於POSIX中的fsync系統調用(它將給定文件描述符中的所有緩沖數據刷新到磁盤)。比如用java API寫壹個本地文件,我們可以保證在調用flush()和synchronization之後可以看到寫了什麽:

file output stream out = new file output stream(local file);

out.write("內容"。getBytes(" UTF-8 "));

out . flush();//刷新到操作系統

out.getFD()。sync();//同步到磁盤(getFD()返回流對應的文件描述符)

assertThat(localFile.length())是((long)“內容”。長度()))));

在HDFS中關閉流會隱式調用sync()方法:

Path p =新路徑(" p ");

output stream out = fs . create(p);

out.write("內容"。getBytes(" UTF-8 "));

out . close();

assertThat(fs.getFileStatus(p)。getLen(),是(((long)“內容”。長度()))));

由於Hadoop中壹致性模型的限制,如果不調用sync()方法,很可能會丟失壹大塊數據。這是不可接受的,所以我們應該使用sync()方法來確保數據已經寫入磁盤。但是頻繁調用sync()方法並不好,因為會造成很多額外的開銷。我們可以在寫入壹定量的數據後調用sync()方法。至於具體的數據大小,就看妳的應用了。數據越大越好,不會影響應用程序的性能。

  • 上一篇:按照是否運行被測軟件,可以將測試技術分為
  • 下一篇:DNF,我刷不掉壹個PK 1-5的遊戲,而且我家網速很快,但是不知道怎麽回事。誰知道我怎麽能不掉它?
  • copyright 2024編程學習大全網