當前位置:編程學習大全網 - 編程語言 - 如何形象地描述RxJava中的背壓和流控機制

如何形象地描述RxJava中的背壓和流控機制

在RxJava中,可以通過對Observable連續調用多個Operator組成壹個調用鏈,其中數據從上遊向下遊傳遞。當上遊發送數據的速度大於下遊處理數據的速度時,就需要進行Flow Control了。

這就像小學做的那道數學題:壹個水池,有壹個進水管和壹個出水管。如果進水管水流更大,過壹段時間水池就會滿(溢出)。這就是沒有Flow Control導致的結果。

Flow Control有哪些思路呢?大概是有四種:

(1) 背壓(Backpressure)。

(2) 節流(Throttling)。

(3) 打包處理。

(4) 調用棧阻塞(Callstack blocking)。

下面分別詳細介紹。

註意:目前RxJava的1.x和2.x兩個版本序列同時並存,2.x相對於1.x在接口上有很大變動,其中也包括Backpressure的部分。但是,這裏要討論的Flow Control機制中的相關概念,卻都是適用的。

Flow Control的幾種思路

背壓(Backpressure)

Backpressure,也稱為Reactive Pull,就是下遊需要多少(具體是通過下遊的request請求指定需要多少),上遊就發送多少。這有點類似於TCP裏的流量控制,接收方根據自己的接收窗口的情況來控制接收速率,並通過反向的ACK包來控制發送方的發送速率。

這種方案只對於所謂的cold Observable有效。cold Observable指的是那些允許降低速率的發送源,比如兩臺機器傳壹個文件,速率可大可小,即使降低到每秒幾個字節,只要時間足夠長,還是能夠完成的。相反的例子是音視頻直播,數據速率低於某個值整個功能就沒法用了(這種就屬於hot Observable了)。

節流(Throttling)

節流(Throttling),說白了就是丟棄。消費不過來,就處理其中壹部分,剩下的丟棄。還是舉音視頻直播的例子,在下遊處理不過來的時候,就需要丟棄數據包。

而至於處理哪些和丟棄哪些數據,就有不同的策略。主要有三種策略:

sample (也叫throttleLast)

throttleFirst

debounce (也叫throttleWithTimeout)

從細的方面分別解釋壹下。

sample,采樣。類比壹下音頻采樣,8kHz的音頻就是每125微秒采壹個值。sample可以配置成,比如每100毫秒采樣壹個值,但100毫秒內上遊可能過來很多值,選哪個值呢,就是選最後那個值。所以它也叫throttleLast。

throttleFirst跟sample類似,比如還是每100毫秒采樣壹個值,但選這100毫秒內的第壹個值。在Android開發中有時候可以把throttleFirst用作點擊事件的防抖動處理,就是因為它可以在指定的壹段時間內處理第壹個點擊事件(即采樣第壹個值),但丟棄後面的點擊事件。

debounce,也叫throttleWithTimeout,名字裏就包含壹個例子。比如,壹個網絡程序維護壹個TCP連接,不停地收發數據,但中間沒數據可以收發的時候,就有間歇。這段間歇的時間,可以稱為idle time。當idle time超過壹個預設值的時候,就算超時了(time out),這個時候可能就需要把連接斷開了。實際上壹些做server端的網絡程序就是這麽工作的。每收發壹個數據包之後,啟動壹個計時器,等待壹個idle time。如果計時器到時之前,又有收發數據包的行為,那麽計時器重置,等待壹個新的idle time;而如果計時器時間到了,就超時了(time out),這個連接就可以關閉了。debounce的行為,跟這個非常類似,可以用它來找到那些連續的收發事件之後的idle time超時事件。換句話說,debounce可以把連續發生的事件之間的較大的間歇找出來。

打包處理

打包就是把上遊來的小包裹打成大包裹,分發到下遊。這樣下遊需要處理的包裹的個數就減少了。RxJava中提供了兩類這樣的機制:buffer和window。

buffer和window的功能基本壹樣,只是輸出格式不太壹樣:buffer打包後的包裹用壹個List表示,而window打包後的包裹又是壹個Observable。

調用棧阻塞(Callstack blocking)

這是壹種特殊情況,阻塞住整個調用棧(Callstack blocking)。之所以說這是壹種特殊情況,是因為這種方式只適用於整個調用鏈都在壹個線程上同步執行的情況,這要求中間的各個operator都不能啟動新的線程。在平常使用中這種應該是比較少見的,因為我們經常使用subscribeOn或observeOn來切換執行線程,而且有些復雜的operator本身也會在內部啟動新的線程來處理。另外,如果真的出現了完全同步的調用鏈,前面的另外三種Flow Control思路仍然可能是適用的,只不過這種阻塞的方式更簡單,不需要額外的支持。

這裏舉個例子把調用棧阻塞和前面的Backpressure比較壹下。“調用棧阻塞”相當於很多車行駛在盤山公路上,而公路只有壹條車道。那麽排在最前面的第壹輛車就擋住了整條路,後面的車也只能排在後面。而“Backpressure”相當於銀行辦業務時的窗口叫號,窗口主動叫某個號過去(相當於請求),那個人才過去辦理。

如何讓Observable支持Backpressure?

在RxJava 1.x中,有些Observable是支持Backpressure的,而有些不支持。但不支持Backpressure的Observable可以通過壹些operator來轉化成支持Backpressure的Observable。這些operator包括:

onBackpressureBuffer

onBackpressureDrop

onBackpressureLatest

onBackpressureBlock(已過期)

它們轉化成的Observable分別具有不同的Backpressure策略。

而在RxJava 2.x中,Observable不再支持Backpressure,而是改用Flowable來專門支持Backpressure。上面提到的四種operator的前三種分別對應Flowable的三種Backpressure策略:

BackpressureStrategy.BUFFER

BackpressureStrategy.DROP

BackpressureStrategy.LATEST

onBackpressureBuffer是不丟棄數據的處理方式。把上遊收到的全部緩存下來,等下遊來請求再發給下遊。相當於壹個水庫。但上遊太快,水庫(buffer)就會溢出。

onBackpressureDrop和onBackpressureLatest比較類似,都會丟棄數據。這兩種策略相當於壹種令牌機制(或者配額機制),下遊通過request請求產生令牌(配額)給上遊,上遊接到多少令牌,就給下遊發送多少數據。當令牌數消耗到0的時候,上遊開始丟棄數據。但這兩種策略在令牌數為0的時候有壹點微妙的區別:onBackpressureDrop直接丟棄數據,不緩存任何數據;而onBackpressureLatest則緩存最新的壹條數據,這樣當上遊接到新令牌的時候,它就先把緩存的上壹條“最新”數據發送給下遊。可以結合下面兩幅圖來理解。

onBackpressureBlock是看下遊有沒有需求,有需求就發給下遊,下遊沒有需求,不丟棄,但試圖堵住上遊的入口(能不能真堵得住還得看上遊的情況了),自己並不緩存。這種策略已經廢棄不用。

  • 上一篇:單片機組裝問題11
  • 下一篇:有哪些好用的學英語app?
  • copyright 2024編程學習大全網