當前位置:編程學習大全網 - 源碼下載 - C++實現分布式文件解析系統

C++實現分布式文件解析系統

整個分布式文件解析系統壹***有四個角色:任務分配服務器、文件解析服務器、工作站和管理平臺。在壹個大型的網絡中,有若幹個用戶(即“工作站”)隨機地通過“任務分配服務器”向若幹個“文件解析服務器”提交文本文件。“文件解析服務器”將收到的文件解析後寫入log文件中。“管理平臺”負責以可視化的界面觀察所有連接到的“任務分配服務器”和“工作站”的ip及工作狀態。

任務分配服務器的作用是輪流地將來自“工作站”的文件傳送請求轉發到每個在線的“文件解析服務器”上,從而避免某個特定的“文件解析服務器”工作負載太重。“任務分配服務器”輪流給所有在線的“文件解析服務器”分配任務。輪流的原則是按每工作站每批輪流,不考慮每批文件的數量多少。當有“工作站”準備傳送壹批文件時,首先連接任務分配服務器,通知任務分配服務器有壹批文件待傳。任務分配服務器則把該工作站的IP和端口告訴選定的“文件解析服務器”,以便讓“文件分析服務器”能主動和“工作站”連接,並完成該批文件傳輸。

ps:主動的含義是文件分析服務器主動通過 connect 連接工作站。“任務分配服務器”本身不接收或轉發文件。工作站每次都是按批提交文件,壹批文件的數量不等。

“文件解析服務器”每收到用戶的壹批文件後,就自動對該批文件進行“文件解析”。“文件解析”的功能是:

管理平臺的作用是采用圖形化的方式展示目前在線的工作站,文件分析服務器的工作狀態。能展示工作站的IP,工作狀態(傳文件中或沒有傳文件)。能展示文件分析服務器的IP,工作狀態(接收文件中,文件分析中,沒有傳文件,沒有文件被分析)。當壹個工作站或文件分析服務器宕機,能實時更新狀態。

整個系統的工作流程如下圖所示:

該項目源碼在 /toMyLord/DistributedFileParsingSystem

src目錄下包含 Server 類、 Client 類、 SpecificTime 類。 Server 類用於服務器初始化,監聽 socket_fd ,接受來自客戶端的連接請求,接收或發送信息,關閉套接字。 Client 類用於客戶端初始化,連接服務器,與服務器進行全雙工通信,關閉連接。 SpecificTime 類用於獲取當前系統的時間,並以規定的格式返回壹個 std::string 類型的字符串。上述三個類會被用於所有角色,因此以壹個目錄的形式單獨放在所有角色所屬目錄外,通過CmakeLists.txt將這些文件的編譯相關性聯系在壹起,從而生成可執行文件。

該目錄下為編譯 TaskDistribution 所需要的專屬文件。在當前目錄下的src目錄內,定義了 DistributionServer 類,該類內組合了DistributedFileParsingSystem/src目錄下的 Server 類。實現 DistributionServer 類的目的是為了使用該類監聽特定端口的所有網絡連接,並將所有連接的客戶端信息都存儲在壹個 std::vector 容器內。 DistributionServer 類的定義如下:

在 ClientNode 結構體內嵌套的 ClientInfo 結構體聲明和定義在DistributedFileParsingSystem/src/server.h文件內, ClientInfo 結構體保存了連接到 server 的客戶端的 sockaddr_in 、 socket_fd 以及ip信息。 ClientNode 結構體擴展了壹個新的字段 client_state ,該字段用來描述客戶端的傳輸狀態。

通過 std::vector<ClientNode> 將所有連接的客戶端信息都保存下來。每當有新的客戶端連接,就使用 client_node.push_back() 將信息保存在內存中。如果有已連接的客戶端斷開連接,需要用 find_if() 函數配合 lambda 表達式找到在 vector 中的叠代器,然後斷開連接並刪除 client_node 中對應的數據。具體過程如下:

TaskDistribution_main.cpp是任務分配服務器可執行文件編譯所需的主函數。在該文件內聲明了兩個 DistributionServer 類的實例 workstation_server 和 parsing_server ,壹個 Server 類的實例 manager_server ,分別用來處理來自工作站、文件解析服務器、管理平臺的連接。因為任務分配服務器只允許壹個管理平臺連接,因此管理平臺選擇 Server 類,因為可能會有大量工作站和文件解析服務器連接至任務分配服務器,因此選擇 DistributionServer 來分別監聽工作站和文件解析服務器的連接端口。

在主函數中,我選擇使用 epoll ——IO多路復用技術來監聽工作站、文件解析服務器和管理平臺的連接請求,從而實現在單進程中對三個端口進行監聽。

當有管理平臺連接至任務分配服務器後,任務分配服務器就不再處理來自管理平臺的連接請求,直到當前管理平臺退出。

當有文件解析服務器連接至任務分配服務器後,任務分配服務器會將文件解析添加至就緒隊列,等待來自工作站的請求。每當有工作站發出文件傳輸請求,就將處於就緒隊列頂端的文件解析服務器出隊列,並分配給當前工作站用來處理解析任務。處理完文件解析任務後,任務分配服務器會將文件解析服務器重新加入隊列。

當工作站連接至任務分配服務器後,任務分配服務器會建立壹個新的線程用來處理工作站發出的請求。當工作站發出文件解析請求後,該線程會將就緒隊列頂端的文件解析服務器分配給工作站,並壹直監控整個解析過程,直到解析完成,將文件解析服務器重新入隊列。完成後此線程會重新監聽來自工作站的解析請求,直到工作站斷開連接,該線程就會消亡。如果工作站發出解析請求後,沒有任務文件解析服務器就緒,那麽任務分配服務器會每過1s檢測壹次是否有就緒的文件解析服務器(此處可以改進,將每秒輪詢的方式改為等待時長者優先策略)。

該目錄下為編譯 TaskDistribution 所需要的專屬文件。在當前目錄下的src目錄內,主要實現兩個功能: 使用單例模式實現日誌文件記錄 使用狀態模式實現文件解析服務器在等待狀態和解析狀態之間的切換

使用單例模式來實現日誌記錄的 RecordingLog 類的實例只會創建壹次,從而避免創建多個日誌類實例,造成寫入混亂的情況。 RecordingLog 類會將特定信息以 filesteam 的方式寫入文件中,具體被記錄在日誌文件中的信息包括接收來自工作站文件的時間、日期、工作站ip、工作站端口、文件名、文件前8個字節內容、文件總長度。該類的聲明情況如下所示:

在本系統的FileParsing程序中,使用狀態模式用來處理文件解析服務器的狀態轉換。文件解析服務器主要有兩種狀態——等待任務分配服務器發送解析文件請求狀態和接受來自工作站的文件並完成解析,寫入日誌文件的狀態。

在第壹種狀態下,文件解析服務器收到來自任務分配服務器的解析請求後,會將需要主動連接的工作站的ip及端口保存下來,並切換到第二種狀態。

在第二種狀態下,文件解析服務器根據保存的ip和端口主動連接工作站,連接完成後接收來自工作站的文件信息,解析後通過 RecordingLog 類中的 getLogInstance() 函數將解析後的結果寫入日誌文件,完成後會將解析完成情況回傳給任務分配服務器,在任務分配服務器中,將該文件解析服務器重新添加到就緒隊列。之後會主動切換到第壹種狀態,再次等待任務請求。

這部分程序使用了狀態模式,使得文件解析服務器在等待分配任務狀態及解析文件狀態自動切換,從而保證了兩種狀態上的邏輯分離,具有非常好的封裝性,並體現了開閉原則和單壹職責原則:每個狀態都是壹個子類,如果需要增加狀態就只需要增加子類,如果需要修改狀態,就只需要修改壹個子類即可完成,具有很高的擴展性。

該目錄下為編譯 WorkStation 所需要的專屬文件。在當前目錄下的src目錄內,定義了 SendDirectory 類。通過該類,可以實現根據輸入的壹個目錄,自動發送該目錄下的所有文件。註意,該目錄下不可再包含目錄。

該目錄下為編譯 Managerplatform 所需要的專屬文件。此目錄可以使用QtCreater打開,但是編譯還是需要根據DistributedFileParsingSystem/CMakeLists.txt文件通過cmake編譯。

該目錄下的 ClientThread 類通過繼承 QThread 類,實現在Qt中使用線程。在該部分程序運行時,就會創建壹個 ClientThread 線程,用於與任務分配服務器建立連接,並不斷接受來自任務分配服務發送的有關工作站及文件解析服務器的在線狀態以及傳輸狀態。每當有新的信息後,就通過Qt的槽機制,發送給 QDialog ,顯示在界面中。

編譯完成後會生成四個可執行文件:TaskDistribution、FileParsing、WorkStation、ManagerPlatform。

運行過程如下所示:

首先分別運行每個可執行程序,其中運行兩個FileParsing程序(上線兩個文件解析服務器):

在WorkStation程序下輸入需要發送的文件目錄:

可以在任務分配服務器的log中看到有新上線的文件解析服務器,同時在管理平臺也可以看到對應的文件解析服務器。我們通過工作站連續發送三次文件。

實現了整個分布式任務解析服務器框架,整個項目可以正常實現需求功能。

缺陷:任務分配服務器存在不能正常處理所有連接的正常close。

在任務分配服務器處新增了心跳包機制——任務分配服務器會每隔5s向所有連接在線的文件解析服務器和工作站發送心跳包,文件解析服務器和工作站在收到心跳包後,也會主動回應心跳。通過心跳包機制可以很好的處理來自其他連接的斷開情況。同時新增了在管理平臺退出後,任務分配服務器可以重新接受來自管理平臺的連接。

  • 上一篇:天才農場源代碼
  • 下一篇:求益盟操盤手操盤線工作原理?
  • copyright 2024編程學習大全網