當前位置:編程學習大全網 - 編程語言 - mapreduce和hadoop難嗎?

mapreduce和hadoop難嗎?

只需壹行代碼就可以運行MapReduce作業:JobClient.runJon(conf ),作業運行中涉及到四個實體:

?1.JobClient編寫代碼、配置作業和提交作業。

?2.JobTracker:初始化作業,分配作業並協調作業的操作。這是壹個java程序,主要類是JobTracker。

?3.TaskTracker:作業劃分後運行任務,即執行分配數據分配上的Map或Reduce任務。

?4.HDFS:保存作業數據和配置信息,並保存作業結果。

Map/Reduce作業的整體執行流程:

?代碼編寫->作業配置?-& gt;工作提交?-& gt;?映射任務分配和執行?-& gt;?處理中間結果?-& gt;?減少任務分配和執行?-& gt;?輸出結果

對於每項工作的執行,它還包括:

?輸入準備?-& gt;?任務執行?-& gt;?輸出結果

作業提交作業客戶端:

?JobClient的runJob方法生成壹個Jobclient實例並調用其submitJob方法,然後runJob開始循環,並在循環中調用getTaskCompetionEvents方法獲取TaskCompletionEvent實例,每秒輪詢壹次作業進度(後面會介紹進度和狀態更新),將進度寫入控制臺,作業完成後顯示作業計數器,失敗時將錯誤記錄到控制臺。

?提交作業方法作業提交流程:

?1.從JobTracker請求新的JobId。

?2.檢查與作業相關的路徑,如果路徑不正確,則返回錯誤。

?3.計算作業輸入切片及其分部信息。

?4.復制資源(jar文件、配置文件等。)對共享HDFS進行作業操作,以及

復制多個副本(參數控制,默認值為10)供tasktracker訪問,同時將計算出的碎片復制到HDFS。

?5.調用JobTracker對象的submitJob()方法來實際提交作業,並告訴JobTracker該作業已準備好執行。

作業JobTracker的初始化:

?在接收到submitJob方法的調用後,JobTracker會將該調用放入壹個內部隊列中,由Job scheduler進行調度和初始化。作業初始化創建壹個作業對象。

?當作業被調度時,JobTracker會創建壹個代表作業的JobInProgress對象,並將任務和記錄信息封裝在這個對象中,以便跟蹤任務的狀態和進度。

?初始化過程由JobInProgress對象的initTasks方法初始化。

?初始化步驟:

?1.從HDFS讀取作業對應的job.split信息,為後續初始化做準備。

?2.創建和初始化地圖並減少任務。根據數據分片信息的個數確定地圖任務的個數,然後為每個maptask生成壹個TaskInProgress對象來處理數據分片,這個對象先放入nonRunningMapCache供JobTracker在分配任務時使用。接下來,根據JobConf中的mapred.reduce.tasks屬性,使用setNumReduceTasks()方法設置reducetasks的個數,然後按照與map task相同的方式創建。

?3.最後,創建兩個初始化任務來初始化map和reduce。

任務分配JobTracker:

消息傳遞心跳:tasktracker運行壹個簡單的循環,定期向JobTracker發送心跳。心跳告訴JobTracker他是否活著,同時傳遞其他信息作為消息通道(請求新任務)。作為心跳的壹部分,tasktracker將指示它是否準備好運行新任務,如果是,jobtracker將為它分配壹個任務。

分配任務所屬的作業:在Jobtracker分配任務之前,需要確定任務所在的作業。後面會介紹各種作業調度算法,默認是FIFO作業調度。

分配Map和Reduce任務:tasktrack有固定數量的任務槽,壹個tasktrack可以同時運行多個Map和Reduce任務,但其確切數量由tasktrack的內核數量和內存大小決定。默認調度程序將首先填充Map任務槽,然後填充Reduce任務槽。Jobtracker將選擇最接近切片文件的tasktracker。理想情況下,任務是數據本地的,當然也可以是機架本地的。如果沒有本地化,他們需要從其他機架檢索數據。Reduce任務分配非常簡單,jobtracker將簡單地從要運行的Reduce任務列表中選擇壹個,而不考慮數據本地化。

任務執行任務跟蹤者:

?收到新任務後,TaskTracker將在本地運行該任務。運行任務的第壹步是通過localizedJob本地化任務本地化所需的註入配置、數據、程序和其他信息。

?1.本地化數據:從* * *文件系統本地復制job.split和job.jar(在分布式緩存中),將作業配置信息寫入job.xml。

?2.創建壹個新的本地工作目錄:tasktracker會將job.jar文件壓縮到這個工作目錄中。

?3.調用launchTaskForJob方法發布任務(其中將創建壹個新的TaskRunner實例來運行任務)。如果是Map任務,將啟用MapTaskRunner,對於Reduce,則為ReduceTaskRunner。

之後TaskRunner會啟動壹個新的JVM來運行每壹個Map/Reduce任務,防止tasktracker因為程序原因崩潰,但是不同任務之間還是可以重用JVM的,後面會講到任務JVM重用。

?對於單個地圖,任務執行的簡單過程是:

?1.分配任務執行參數

?2.將map任務信息添加到Child的臨時文件中(Child是運行Map和Reduce任務的主進程)。

?3.配置地圖任務的日誌文件夾以及通信和輸出參數。

?4.讀取輸入拆分以生成由RecordReader讀取的數據。

?5.為Map生成MapRunnable,依次從RecordReader接收數據,調用Map函數進行處理。

?6.最後,將map函數的輸出調用收集到MapOutputBuffer中(參數控制其大小)。

流和管道:

?流和管道都運行特殊的Map和Reduce任務來運行用戶提供的可執行文件並與之通信。

?流:使用標準的輸入和輸出流與進程通信。

?Pipes:用來監聽套接字,它會向C++程序發送壹個端口號,兩者可以建立鏈接。

進度和狀態更新:

?作業及其任務具有狀態,包括:運行成功或失敗狀態、映射/減少進度、作業計數器值和狀態消息。

?狀態消息和客戶端之間的通信:

?1.跟蹤地圖任務的進度:進度是已處理輸入的比例。

?2.對於reduce:略復雜,Reduce的任務分為三個階段(每個階段占1/3),即復制、排序和Reduce處理。如果reduce已經執行了壹半的輸入,那麽任務進度就是1/3+1/6 = 5。

?3.任務計數器:任務有壹組計數器,負責計算任務運行的事件。

?4.任務進度報告:如果任務報告了進度,將設置壹個標誌,表明該狀態將被發送到tasktracker。壹個獨立的線程每三秒檢查壹次這個標誌,如果它被設置,它就告訴tasktracker當前的狀態。

?5.tasktracker進度報告:tasktracker會每5秒向jobtracker發送壹次心跳(這個心跳是由集群大小決定的,集群越大需要的時間越長),在調用過程中tasktracker的所有運行狀態都會發送給jobtracker。

?6.jobtracker合並任務報告:它生成壹個全局視圖,顯示所有運行作業的機器的任務狀態。

?上面提到的JobClient通過每秒查詢JobTracker來接收最新的狀態,客戶端JobClient的getJob方法可以得到壹個RunningJob的實例,其中包含了作業的所有狀態信息。

工作的完成:

?當jobtracker收到作業的最後壹項任務已經完成的通知時,它會將作業狀態設置為成功。當JobClient查詢狀態時,它知道任務已經成功完成,因此JobClient打印壹條消息通知用戶,然後從runJob方法返回。

?如果jobtracker有相應的設置,也會向客戶端發送Http作業通知,想要接收回調指令的客戶端可以通過job.end.notification.url屬性進行設置。

?Jobtracker表示作業的工作狀態,表示tasktracker也清除作業的工作狀態,比如刪除中間輸出。

失敗

?其實用戶代碼出現軟件錯誤,進程會崩潰,機器會失效,但是Hadoop可以很好的處理這些故障,完成作業。

?1.任務失敗?

?子任務異常:如果Map/Reduce任務中的用戶代碼拋出異常,子任務JVM進程將在退出前向父進程tasktracker發送錯誤報告,錯誤將記錄在用戶日誌中。Tasktracker會將此任務嘗試標記為已結束,並釋放此任務槽以運行另壹個任務。

?子進程JVM突然退出:由於JVM bug導致的用戶代碼引起的壹些特殊原因,JVM可能會退出。在這種情況下,tasktracker會註意到進程已經退出,並將嘗試標記為失敗。

?任務掛起:壹旦tasktracker註意到它在壹段時間內沒有收到進度更新,它將把任務標記為失敗,JVM子進程將被自動終止。任務失敗的間隔通常是10分鐘,到期時間可以基於作業或集群來設置。參數為mapred.task.timeout註意:如果參數值設置為0,則掛起的任務永遠不會釋放其任務槽,隨著時間的推移會降低整個集群的效率。

?任務失敗嘗試:當jobtracker得知tasktracker失敗時,它將重新安排任務。當然,jobtracker會盡量避免重新調度失敗的tasktracker任務。如果任務嘗試超過4次,將不會重試。該值可以設置。對於地圖任務,該參數為mapred.map.max.attempts,對於reduce任務,該參數由mapred.reduce.max.attempts屬性控制。如果次數超過限制,整個操作將失敗。當然,有時我們不想在少數任務失敗時停止整個作業,因為即使壹些任務失敗,作業的壹些結果可能仍然有用。在這種情況下,您可以設置作業允許的最大任務失敗百分比,而不會觸發作業失敗。Map任務和Reduce任務可以獨立控制,參數為mapred . max . map . failures . percent和mapred.max.failures。

?Kill:任務終止不同於任務失敗。可以中止任務嘗試,因為它是壹個推測副本,或者因為它的tasktracker失敗,這將導致jobtracker將它上面的所有任務嘗試標記為已終止。終止的任務嘗試將不計入任務運行嘗試的次數,因為嘗試中止不是任務的錯誤。

?2.tasktracker失敗

?Tasktracker因為崩潰或者運行太慢而失敗,他會停止向jobtracker發送心跳(或者很少發送心跳)。Jobtracker註意到tasktracker已經停止發送心跳(過期時間由參數mapred設置。tasktracker.expire.interval,以毫秒為單位)並將其從等待調度的tasktracker池中移除。如果是未完成的作業,jobtracker會安排已經在第二個tasktracker上成功運行的Map任務重新運行,因為此時無法訪問reduce任務(中間輸出存儲在失敗的tasktracker的本地文件系統上)。

?即使tasktracker沒有失敗,也可能被jobtracker列入黑名單。如果tasktracker上失敗任務的數量遠遠高於集群中失敗任務的平均數量,他就會被列入黑名單,而被列入黑名單的tasktracker可以通過重啟的方式從jobtracker黑名單中移除。

3.jobtracker失敗

?舊版本JobTracker的失敗是單點失敗,在這種情況下,作業註定會失敗。

作業計劃:

?提前作業調度FIFO:按作業提交順序的FIFO。可以設置優先級,通過設置JobClient的mapred.job.priority屬性或setJobPriority()方法設置優先級(優先級:very _ high、high、normal、low、very _ low)。請註意,FIFO調度算法不支持搶占,因此高優先級作業仍然會被運行了很長時間的低優先級作業阻塞。

?公平調度器:目標是讓每個用戶公平地享受集群能力。當集群中有很多作業時,會以“讓每個用戶* * *享受集群”的方式分配空閑的任務槽。默認情況下,每個用戶都有自己的職務池。FairScheduler支持搶占,所以如果壹個池在壹定時間內沒有獲得公平的資源份額,它就會終止池中資源過多的任務,從而將任務槽讓給資源不足的池。FairScheduler是壹個後續模塊。要使用它,您需要將其jar文件放在Hadoop的類路徑中。可以通過參數map . red . job tracker . task scheduler配置(值為org . Apache . Hadoop . map red . fair scheduler)。

?容量調度程序:

?壹個集群由許多隊列組成,每個隊列都有壹個分配容量,類似於FairScheduler,只不過在每個隊列內部,作業是按照FIFO來調度的。本質上,容量調度程序允許用戶或組織模擬壹個為每個用戶獨立使用FIFO的集群。

無序播放和排序:

?MapReduce確保每個Reducer的輸入都按鍵排序。由系統執行的排序過程——將映射輸出作為輸入傳遞給reducer的過程稱為shuffle。Shuffle是不斷優化和改進的代碼庫的壹部分。在許多方面,shuffle是MapReduce的核心。

?整個洗牌過程應該是這樣的:

?地圖結果分區?分類分類分離溢出?合並同壹個部門?合並同壹個部門?合並結果排序減少處理輸出

?地圖結束:

?寫緩沖區:Map函數的輸出由收集器處理,並不是簡單的把結果寫到磁盤上。它使用緩沖區寫入內存,並為了提高效率而對其進行預排序。每個映射都有壹個用於任務輸出的環形內存緩沖區。默認的緩沖區大小是100MB(由參數io.sort.mb調整)。壹旦緩沖區內容達到閾值(默認為0.8),後臺進程就開始將內容寫入磁盤(溢出)。在寫入磁盤的過程中,地圖輸出將繼續寫入緩沖區,但如果緩沖區已滿,地圖將阻止寫入磁盤。寫入磁盤將通過輪詢寫入由mapred.local.dir屬性指定的特定於作業的子目錄中。

?寫出緩沖區:當collect寫出緩沖區的內容時,會調用sortAndSpill函數,主要用於創建壹個溢出文件,按照鍵值對數據進行排序,按照劃分將數據寫入文件。如果配置了combiner類,它將在寫入文件之前調用combineAndSpill函數。SortAndspill每次被調用時都會寫入壹個溢出文件。

?合並所有地圖的溢出文件:TaskTracker將在每個地圖任務後合並所有由地圖生成的溢出文件。合並規則是將所有溢出文件的同壹分區中的數據按照分區進行合並,寫入壹個分區排序後的地圖輸出文件中。在唯壹分區和排序的映射輸出文件被寫入最後壹條記錄之後,映射上的洗牌階段結束。

?在寫入磁盤之前,線程首先根據數據最終將被傳遞到的reducer將數據劃分為響應分區。在每個分區中,後臺線程按鍵進行內部排序。如果有合並器,它將在排序後的輸出上運行。

?當內存達到溢出寫的閾值時,會創建壹個新的溢出寫文件,因為map任務完成它的最後壹條輸出記錄後,會有幾個溢出寫文件。在任務完成之前,溢出的寫文件將被合並到壹個分區和排序的輸出文件中。配置屬性io.sort.facor控制壹次可以合並的最大數據流數量,默認值為10。

?如果指定了合並器,並且寫入次數至少為3次(由min.mum.spills.for.combine設置),合並器將在輸出文件寫入磁盤之前運行。運行合並器的意義是讓地圖輸出更緊湊,並且寫入本地磁盤,發送給reducer的數據更少。

?寫盤時壓縮:寫盤時壓縮會使寫更快,節省磁盤空間,減少傳輸到reducer的數據量。默認情況下,輸出是未壓縮的,但是可以通過將mapred.compress.map.output值設置為true來啟用壓縮。使用的壓縮庫由mapred . map . output . compression . codec制作。

?reducer獲取的工作線程:reducer通過http獲取輸出文件的分區,用於文件分區的工作線程數量由tracker.http.threads屬性指定。此設置適用於每個tasktracker,而不是每個地圖任務槽。默認值為40,在大型集群上可以根據需要增加。

減少結束:

?復制階段:reduce將定期從JobTracker獲取地圖的輸出位置。壹旦獲得輸出位置,reduce會將對應TaskTracker的地圖輸出復制到本地(如果地圖輸出較小,則復制到TaskTracker節點的內存中,否則作為磁盤釋放),無需等到所有地圖任務完成(當然這也是由參數控制的)。

?合並階段:從每個TaskTracker復制的地圖輸出文件(無論是在磁盤中還是在內存中)被集成,並且保持數據的原始順序。

?Reduce階段:從合並文件中依次取出壹條數據進行reduce函數處理,然後將結果輸出到本地HDFS。

?地圖的輸出文件位於運行地圖任務的tasktracker的本地磁盤上。現在,tasktracker希望對分區文件運行reduce任務。每個任務的完成時間可能不同,但只要完成壹個任務,reduce任務就開始復制其輸出,這就是reduce任務的復制階段。reduce任務有少量的復制線程,因此它可以並行獲得map輸出。默認值為5個線程,可以通過mapred.reduce.parallel.copies屬性進行設置。

?Reducer如何知道從哪個tasktracker獲取地圖輸出:地圖任務完成後,它會通知其父級tasktracker狀態已經更新,然後tasktracker會通知(通過心跳)jobtracker。所以JobTracker知道地圖輸出和tasktracker的映射關系,reducer中的壹個線程周期性的詢問jobtracker知道地圖輸出的位置。因為reducer可能會失敗,所以當第壹個reducer檢索到地圖輸出時,tasktracker不會立即將其從磁盤中刪除。相反,他等待jobtracker宣布可以刪除地圖輸出,這是作業完成後的最後壹次執行。

?如果map輸出較小,則直接復制到reduce tasktracker的內存緩沖區(大小由mapred . job . buffer . input . buffer . percentage控制,占堆空間的百分比);否則,地圖輸出將被復制到磁盤。壹旦內存緩沖區達到閾值大小(通過mapred . iob . buffer . merge . percent)

或者達到映射輸出閾值大小時(mapred.inmem.threadhold),合並的溢出被寫入磁盤。

?隨著磁盤上的副本越來越多,後臺線程會將它們合並成更大的有序文件。註意:為了合並,壓縮的地圖輸出必須在內存中解壓縮。

?排序階段:復制階段完成後,reduce任務將進入排序階段,或者更準確地說,是合並階段,它將合並地圖輸出並保持它們的順序。合並是循環的,每次合並的輸出文件數由合並因子決定。但是它使得產生中間文件成為可能。

?Reduce階段:在最後的reduce階段,將排序後的文件直接輸入reduce函數,中間文件不再合並。最終的合並可以來自內存或磁盤。這個階段的輸出將直接寫入文件系統,通常是hdfs。

?細節:這裏的合並不是壹般的合並。例如,有40個文件,合並因子是10。我們不合並每遍10個文件,合並四遍。而是第壹遍合並四個文件,最後三遍合並10。最後壹遍,4個合並文件,剩下的6個文件直接合並到reduce中。

  • 上一篇:佳能單反相機500d的詳細參數和照片,價格。和600d相機的參數圖片和價格
  • 下一篇:這個符號&是什麽意思?讀音怎麽讀?
  • copyright 2024編程學習大全網