舉個最簡單的例子,壹個用戶推薦了另壹個用戶,定壹個二十四小時之後的任務,看看被推薦的用戶有沒有來註冊,如果沒註冊就給他搞壹條短信過去。Σ>―(〃°ω°〃)?→
最初的設想
壹開始是想把這個計時器做在內存裏面直接調用的。
考慮到 Node.js 的定時並不是那麽準確(無論是setTimeout還是setInterval),所以本來打算自己維護這個定時器隊列。
又考慮到 Node.js 原生對象比較耗內存。之前用JSON對象存了壹本字典,約十二萬多的詞條,原文件大概也就五六兆,用 Node.js 的原生對象壹存居然有五六百兆的內存占用——所以打算這個定時器隊列用 C++ 來寫 addon。
考慮到任何時候插入的任務都有可能在已有的任務之前或者之後,所以本來想用 C++ 來寫壹個 小根堆 。每次用戶來壹個任務的時候就將這個任務插入到堆中。
如果按照上述方法的話,再加上對時間要求掐得也不是那麽緊,於是就是壹個不斷的process.nextTick()的過程。
在process.nextTick()當中執行這麽壹個函數:
從小根堆中不斷獲取堆頂的任務並處理,壹直處理到堆頂任務的執行時間大於當前時間為止。
繼續process.nextTick()來讓下壹個 tick 執行步驟 1 中的流程。
所以最後就是壹邊往小根堆插入任務,另壹邊通過不斷process.nextTick()消費任務的這麽壹個過程。
最後,為了考慮到程序重啟的時候內存數據會丟失,還應該做壹個持久化的事情——在每次插入任務的時候順便往持久化中間件中插壹條副本,比如 MySQL、MongoDB、Redis、Riak 等等任何三方依賴。消費任務的時候順便把中間件中的這條任務數據給刪除。
也就是說中間件中永遠存的就是當前尚未完成的任務。每當程序重啟的時候都先從中間件中把所有任務讀取進來重建壹下堆,然後就能繼續工作了。
如果當時沒有發現 Redis 的這個妙用的話,上述的流程將會是實現定時任務的流程了。
Redis 妙用
在 Redis 的 2.8.0 版本之後,其推出了壹個新的特性——鍵空間消息( Redis Keyspace Notifications ),它配合 2.0.0 版本之後的SUBSCRIBE就能完成這個定時任務的操作了, 不過定時的單位是秒 。
Publish / Subscribe
Redis 在 2.0.0 之後推出了 Pub / Sub 的指令,大致就是說壹邊給 Redis 的特定頻道發送消息,另壹邊從 Redis 的特定頻道取值——形成了壹個簡易的消息隊列
比如可以往foo頻道推壹個消息bar,那麽就可以直接:
PUBLISH foo bar
javascript
var Redis = require("ioredis");
var sub = new Redis(/** 連接信息 */);
sub.once("connect", function() {
// 假設需要選擇 redis 的 db,因為實際上們不會去汙染默認的 db 0
sub.select(DB_NUMBER, function(err) {
if(err) process.exit(4);
sub.subscribe("foo", function() {
//... 訂閱頻道成功
});
});
});
// 監聽從 `foo` 來的消息
sub.on("message", function(channel, msg) {
console.log(channel, msg);
});
Redis Keyspace Notifications
在 Redis 裏面有壹些事件,比如鍵到期、鍵被刪除等。然後可以通過配置壹些東西來讓 Redis 壹旦觸發這些事件的時候就往特定的 Channel 推壹條消息。
本文所涉及到的需求的話所需要關心的事件是EXPIRE即過期事件。
大致的流程就是給 Redis 的某壹個 db 設置過期事件,使其鍵壹旦過期就會往特定頻道推消息,在自己的客戶端這邊就壹直消費這個頻道就好了。
以後壹來壹條定時任務,就把這個任務狀態壓縮成壹個鍵,並且過期時間為距這個任務執行的時間差。那麽當鍵壹旦到期,就到了任務該執行的時間,Redis 自然會把過期消息推去,的客戶端就能接收到了。這樣壹來就起到了定時任務的作用。
消息類型
當達到壹定條件後,有兩種類型的這種消息會被觸發,用哪個需要自己選了。舉個例子,刪除了在 db 0 中壹個叫foo的鍵,那麽系統會往兩個頻道推消息,壹個是del事件頻道推foo消息,另壹個是foo頻道推del消息,它們小倆口被系統推送的指令分別等價於:
PUBLISH __keyspace@0__:foo del PUBLISH __keyevent@0__:del foo
其中往foo推送del的頻道名為__keyspace@0__:foo,即是"__keyspace@" + DB_NUMBER + "__:" + KEY_NAME;而del的頻道名為"__keyevent@" + DB_NUMBER + "__:" + EVENT_NAME。
配置
即使妳的 Redis 版本達標了,但是 Redis 默認是關閉這個功能的,妳需要修改配置文件來打開它,或者直接在 CLI 裏面通過指令修改。這裏就說說配置文件的修改吧。。
首先打開 Redis 的配置文件,在不同的系統和安裝方式下文件位置可能不同,比如通過brew安裝的 MacOS 下可能是在/usr/local/etc/redis.conf下面,通過apt-get安裝的 Ubuntu 下可能是在/etc/redis/redis.conf下,總之找到配置文件。或者自己寫壹個配置文件,啟動的時候指定配置文件地址就好。
然後找到壹項叫notify-keyspace-events的地方,如果找不到則自行添加,其值可以是Ex、Klg等等。這些字母的具體含義如下所示:
K ,表示keyspace事件,有這個字母表示會往__keyspace@<db>__頻道推消息。
E ,表示keyevent事件,有這個字母表示會往__keyevent@<db>__頻道推消息。
g ,表示壹些通用指令事件支持,如DEL、EXPIRE、RENAME等等。
$ ,表示字符串(String)相關指令的事件支持。
l ,表示列表(List)相關指令事件支持。
s ,表示集合(Set)相關指令事件支持。
h ,哈希(Hash)相關指令事件支持。
z ,有序集(Sorted Set)相關指令事件支持。
x ,過期事件,與 g 中的EXPIRE不同的是, g 的EXPIRE是指執行EXPIRE key ttl這條指令的時候順便觸發的事件,而這裏是指那個key剛好過期的這個時間點觸發的事件。
e ,驅逐事件,壹個key由於內存上限而被驅逐的時候會觸發的事件。
A ,g$lshzxe的別名。也就是說AKE的意思就代表了所有的事件。
結合上述列表就能拼湊出自己所需要的事件支持字符串了,在需求中只需要Ex就可以滿足了,所以配置項就是這樣的:
notify-keyspace-events Ex
然後保存配置文件,啟動 Redis 就啟用了過期事件的支持了。
實踐
先說任務的創造者吧。由於這裏 Redis 的事件只會傳鍵名,並不會傳鍵值,而過期事件觸發的時候那個鍵已經沒了,妳也無法獲取鍵值,加上主系統和任務系統是分布式的,所以就把所有需要的信息往鍵名塞。
壹個最簡單的鍵名設計就是任務類型 + ":" + JSON.stringify 化後的參數數組;更有甚者可以直接把任務類型替換成所需的函數路徑,比如需要執行這個任務的函數在task/foo/bar文件下面的baz函數,參數arguments數組為[ 1, 2 ],那麽鍵名的設計可以是task/foo/bar.baz:[1,2],反正只需要觸發這個鍵,用不著去查詢這個鍵。等到真正過期了任務系統接收到這個鍵名的時候再壹壹解析,得到需要執行task/foo/bar.baz這個消息,並且網函數裏面傳入[1,2]這個arguments。
所以當接收到壹個定時任務的時候,得到消息、函數名、過期時間參數,這個函數
/** 假設 redis 是壹個 ioredis 的對象 */
var sampleTaskMaker = function(message, func, timeout) {
message = JSON.stringify(message);
console.log("Received a new task:", func, message, "after " + timeout + ".");
// 這裏的 uuid 是 npm 壹個包
// 生成壹個唯壹 uuid 的目的是為了防止兩個任務用了相同的函數和參數,那麽
// 鍵名可能會重復並覆蓋的情況
// uuid 的文檔為 /package/node-uuid
//
// 這裏的 ? 是壹個分隔符,冒號是分割 uuid 和後面內容的,而 ? 是分割函數名
// 和消息的
var key = uuid.v1().replace(/-/g, "") +
":?" + func + "?" + message;
var content = "";
redis.multi()
.set(key, content)
.expire(key, timeout)
.exec(function(err) {
if(err) {
console.error("Failed to publish EXPIRE EVENT for " + content);
console.error(err);
return;
}
});
};
// assign 是 sugarjs 裏面的函數
// 把 db 塞到字符串裏面的 {db} 裏去
var subscribeKey = "__keyevent@{db}__:expired".assign({ db: 1 });
// 假設 sub 是 ioredis 的對象
sub.once("connect", function() {
// 假設需要選擇 redis 的 db,因為實際上不會去汙染默認的 db 0
sub.select(1, function(err) {
if(err) process.exit(4);
sub.subscribe("foo", function() {
//... 訂閱頻道成功
});
});
});
// 監聽從 `foo` 來的消息
sub.on("message", sampleOnExpired);
註意:這裏選擇 db 1 是因為壹旦開啟過期事件監聽,那麽這個 db 的所有過期事件都會被發送。為了不跟正常使用的 redis 過期鍵混淆,為這個事情專門用壹個新的 db。比如在自己正常使用的 db 0 裏面監聽了,那麽不是任務觸發的過期事件也會傳過來,這個時候解析的鍵名就不對了。
最後就是sampleOnExpired函數了。
var sampleOnExpired = function(channel, key) {
// UUID:?func?params
var body = key.split("?");
if(body.length < 3) return;
// 取出 body 第壹位為 func
var func = body[1];
// 推出前兩位,後面剩下的有可能是參數裏面自帶 ? 而被分割,所以要拼回去
body.shift(); body.shift();
var params = body.join("?");
// 然後把 params 傳入 func 去執行
// func:
// path1/path2.func
func = func.split(".");
if(func.length !== 2) {
console.error("Bad params for task:", func.join("."), "-", params);
return;
}
var path = func[0];
func = func[1];
var mod;
try {
mod = require("./tasks/" + path);
} catch(e) {
console.error("Failed to load module", path);
console.error(e.stack);
return;
}
process.nextTick(function() {
try {
mod[func].apply(null, JSON.parse(params));
} catch(e) {
console.error("Failed to call function", path, "-", func, "-", params);
console.error(e.stack);
}
});
};
這個簡易的架子搭好後,只需要去寫壹堆任務執行函數,然後在生成任務的時候把相應參數傳給sampleTaskMaker就好了。Redis 會自動過期並且觸發事件給sampleOnExpired函數,然後就會去執行相應的任務處理函數了