需要实现的功能
-
消费者请求消息时,可以通过异步操作等待消息到来,并支持中断、等待
-
允许消费者自动控制是否持续进行订阅
-
允许消费者动态订阅、取消订阅
-
允许消费者有自己的优先级,可以根据优先级决定消息被分法的顺序
-
需要支持按消息的不同主题(topic)进行分发
实现思路
消息
- 每一个消息都必须包含 topic 属性,其余属性不做限制
订阅者(消费者)
-
必须指定订阅的 topic,并支持 '*' 表示通配所有 topic
-
必须指定优先级
-
订阅者对象应该由一个工厂方法统一生成,而不是由用户手动创建
生产者
- 不做太多限制,可以直接在消息中心定义一个方法
消息分发中心
-
需要一个队列存储积压的消息
-
每一个 topic 都需要对应一个优先级队列,用于消息分发
-
每个 topic 的优先级队列中存储订阅者 Promise 的 resolve 方法,以实现异步操作
中断等待的特殊处理
-
需求:订阅者通过 Promise 的方式,将自己的 resolve 方法注册到消息分发中心后,程序外可能需要手动终止本次等待。
-
实现思路:为订阅者绑定一个 AbortController,在 Promise 中监听 abort 事件,如果中断,则调用 reject 方法
-
存在问题:在 reject 后,消息中心将消息分发到这个已经处于 reject 状态的 Promise 对象的 resolve 方法中,导致被分发的消息丢失
-
解决 1:在外部建立 Promise 与 resolve 方法的映射关系,分发之前判断 Promise 的状态,如果已经 reject 则将 resolve 方法直接出队,重新分发
-
解决 2:如果可以确保当前 topic 没有其他订阅者,则可以在中断后立即向消息中心补偿一个当前 topic 的消息用于丢弃
自动控制监听的思路
- 订阅者请求消息时,传入一个处理消息的回调函数,该函数返回一个布尔值表示是否继续进行监听
代码
优先级队列(数组实现的堆)
class PriorityQueue {
constructor(cmpFn = (a, b) => a - b) {
this._cmp = cmpFn;
this.length = 0;
this._queue = [];
};
_swap(i, j) {
let temp = this._queue[j];
this._queue[j] = this._queue[i];
this._queue[i] = temp;
}
_nodeUp(i) {
while (i !== 0) {
let root = (i - 1) >> 1;
if (this._cmp(this._queue[root], this._queue[i]) >= 0) return;
this._swap(root, i);
i = root;
}
};
_nodeDown(i) {
while (i <= (this.length - 1) >> 1) {
let l = (i << 1) + 1,
r = l + 1;
if (l + 1 < this.length) {
// 包含两个子节点且
if (this._cmp(this._queue[i], this._queue[l]) <= 0 && this._cmp(this._queue[i], this._queue[r]) <= 0) {
// 比两个子节点都小,与较大的节点替换
l = this._cmp(this._queue[l], this._queue[r]) >= 0 ? l : r;
this._swap(l, i);
i = l;
} else if (this._cmp(this._queue[i], this._queue[l]) <= 0) {
// 比左节点小
this._swap(l, i);
i = l;
} else if (this._cmp(this._queue[i], this._queue[r]) <= 0) {
// 比右节点小
this._swap(r, i);
i = r;
} else {
// 比两节点都大,结束
break;
}
} else if (this._cmp(this._queue[i], this._queue[l]) <= 0) {
// 只有左节点,且比左节点小
this._swap(l, i);
i = l;
} else {
// 退出:比仅有的左节点大
break;
}
}
};
offer(ele) {
this._queue[this.length] = ele;
this._nodeUp(this.length++);
};
peak() {
return this._queue[0];
};
shift() {
let res = this._queue[0];
this._queue[0] = this._queue[--this.length];
this._nodeDown(0);
return res;
}
isEmpty() {
return this.length === 0;
}
}
订阅者(消费者)
class Subscriber {
static allSubs = {};
constructor(topic, priority, mq, fn = null) {
this.topic = topic;
this.priority = priority;
this.mq = mq;
this.key = Symbol(topic);
this.fn = fn;
}
// 请求一条消息
async acquire() {
let that = this;
// 需要外部程序手动控制是否继续请求消息
if (that.fn === null) {
return new Promise(function (res) {
that.mq.consume(that, res);
});
} else if (that.abortSignal) { // 有 fn 和 abortSignal,根据 fn 的返回值自动控制,并允许外部中断,但是需要外部进行消息补偿
let continueListen = true;
while (continueListen) {
await new Promise((res, rej) => {
that.mq.consume(that, res);
that.abortSignal.addEventListener('abort', () => {
rej('用户中断');
continueListen = false;
})
}).catch(err => that.fn(err));
}
} else { // 根据 fn 返回值自动控制消息请求
let continueListen = true;
while (continueListen)
await new Promise((res) => that.mq.consume(that, res)).then(async msg => {
await (that.fn(msg)).then(con => continueListen = con);
});
}
}
// 取消订阅,释放内存
unsubscribe() {
delete Subscriber.allSubs[this.key];
}
/**
* 自动控制事件处理、解除订阅
* @param {function} fn 自动控制函数,接受一个参数为消息,返回布尔类型表示是否完成处理
*/
async autoControl(fn) {
let continueListen = true;
while (continueListen) {
await this.acquire().then(msg => continueListen = fn(msg));
}
this.unsubscribe();
}
}
消息分发中心(消息队列)
const MessageQueue = class {
dispacherWorking = false;
nodeCompFunc = (a, b) => b.priority - a.priority;
constructor() {
this.messageQueue = [];
this.blockingQueues = {};
this.blockingQueues['*'] = new PriorityQueue(this.nodeCompFunc);
}
// 对队列中的消息进行分发
_dispatch() {
this.dispacherWorking = true;
while (this.messageQueue.length !== 0) {
let nowMsg = this.messageQueue[0];
let targetQueue = this.blockingQueues[nowMsg.topic];
this.messageQueue.shift(); // 清除读取的消息
if (nowMsg.topic !== '*' && !this.blockingQueues['*'].isEmpty() &&
this.nodeCompFunc(this.blockingQueues['*'].peak(), targetQueue.peak()) >= 0) {
targetQueue = this.blockingQueues['*'];
}
if (targetQueue.length === 0) continue; // 消息积压时,直接丢弃消息,可以在这里添加其他处理方案
let receiver = targetQueue.shift();
receiver.res(nowMsg);
}
this.dispacherWorking = false;
}
/**
* 生成订阅者
* @param {string} topic 消息分类
* @param {number} priority 当前订阅者在所属消息分类内的优先级
* @returns {Subscriber} 订阅者对象
*/
subscribe(topic, priority, fn = null) {
let now = new Subscriber(topic, priority, this, fn);
if (!this.blockingQueues[topic]) {
this.newType(topic);
}
Subscriber.allSubs[now.key] = now;
return now;
}
/**
* 创建阻塞队列
* @param {string} topic 订阅主题
*/
newType(topic) {
if (!this.blockingQueues[topic]) {
this.blockingQueues[topic] = new PriorityQueue(this.nodeCompFunc);
}
}
// 生产消息
offer(msg) {
this.messageQueue.push(msg);
if (this.dispacherWorking) return;
this._dispatch();
}
// 不应该手动调用,只应该由订阅者进行调用
consume(sub, res) {
this.blockingQueues[sub.topic].offer(new BlockNode(sub.priority, res));
if (this.messageQueue.length !== 0 && !this.dispacherWorking) {
this._dispatch();
}
}
}
// 放在阻塞队列中的节点
const BlockNode = class {
constructor(priority, res) {
this.priority = priority;
this.res = res;
}
}
使用举例
// 初始化消息中心
const clickMQ = new MessageQueue();
// 根据 fn 自动控制
clickMQ.subscribe('inner', -10, async (clickMsg) => {
// 一些处理
// return true;
return false;
}).acquire();
// 根据 fn 自动控制,并允许外部中断
const deadSub = clickMQ.subscribe('*', 5, (err) => {
clickMQ.offer({
topic: '*',
content: '补偿消息,消息内容无意义',
});
});
deadSub.abortController = new AbortController();
deadSub.abortSignal = deadSub.abortController.signal; // 中断等待信号
deadSub.acquire();
// 普通的自动控制
cxlm.clickMQ.subscribe('inner', 20).autoControl(msg => {
// 一些处理
return true;
})