需要实现的功能

  • 消费者请求消息时,可以通过异步操作等待消息到来,并支持中断、等待

  • 允许消费者自动控制是否持续进行订阅

  • 允许消费者动态订阅、取消订阅

  • 允许消费者有自己的优先级,可以根据优先级决定消息被分法的顺序

  • 需要支持按消息的不同主题(topic)进行分发

实现思路

消息

  • 每一个消息都必须包含 topic 属性,其余属性不做限制

订阅者(消费者)

  • 必须指定订阅的 topic,并支持 '*' 表示通配所有 topic

  • 必须指定优先级

  • 订阅者对象应该由一个工厂方法统一生成,而不是由用户手动创建

生产者

  • 不做太多限制,可以直接在消息中心定义一个方法

消息分发中心

  • 需要一个队列存储积压的消息

  • 每一个 topic 都需要对应一个优先级队列,用于消息分发

  • 每个 topic 的优先级队列中存储订阅者 Promise 的 resolve 方法,以实现异步操作

中断等待的特殊处理

  • 需求:订阅者通过 Promise 的方式,将自己的 resolve 方法注册到消息分发中心后,程序外可能需要手动终止本次等待。

  • 实现思路:为订阅者绑定一个 AbortController,在 Promise 中监听 abort 事件,如果中断,则调用 reject 方法

  • 存在问题:在 reject 后,消息中心将消息分发到这个已经处于 reject 状态的 Promise 对象的 resolve 方法中,导致被分发的消息丢失

  • 解决 1:在外部建立 Promise 与 resolve 方法的映射关系,分发之前判断 Promise 的状态,如果已经 reject 则将 resolve 方法直接出队,重新分发

  • 解决 2:如果可以确保当前 topic 没有其他订阅者,则可以在中断后立即向消息中心补偿一个当前 topic 的消息用于丢弃

自动控制监听的思路

  • 订阅者请求消息时,传入一个处理消息的回调函数,该函数返回一个布尔值表示是否继续进行监听

代码

优先级队列(数组实现的堆)

class PriorityQueue {
  constructor(cmpFn = (a, b) => a - b) {
    this._cmp = cmpFn;
    this.length = 0;
    this._queue = [];
  };
  _swap(i, j) {
    let temp = this._queue[j];
    this._queue[j] = this._queue[i];
    this._queue[i] = temp;
  }
  _nodeUp(i) {
    while (i !== 0) {
      let root = (i - 1) >> 1;
      if (this._cmp(this._queue[root], this._queue[i]) >= 0) return;
      this._swap(root, i);
      i = root;
    }
  };
  _nodeDown(i) {
    while (i <= (this.length - 1) >> 1) {
      let l = (i << 1) + 1,
        r = l + 1;
      if (l + 1 < this.length) {
        // 包含两个子节点且
        if (this._cmp(this._queue[i], this._queue[l]) <= 0 && this._cmp(this._queue[i], this._queue[r]) <= 0) {
          // 比两个子节点都小,与较大的节点替换
          l = this._cmp(this._queue[l], this._queue[r]) >= 0 ? l : r;
          this._swap(l, i);
          i = l;
        } else if (this._cmp(this._queue[i], this._queue[l]) <= 0) {
          // 比左节点小
          this._swap(l, i);
          i = l;
        } else if (this._cmp(this._queue[i], this._queue[r]) <= 0) {
          // 比右节点小
          this._swap(r, i);
          i = r;
        } else {
          // 比两节点都大,结束
          break;
        }
      } else if (this._cmp(this._queue[i], this._queue[l]) <= 0) {
        // 只有左节点,且比左节点小
        this._swap(l, i);
        i = l;
      } else {
        // 退出:比仅有的左节点大
        break;
      }
    }
  };
  offer(ele) {
    this._queue[this.length] = ele;
    this._nodeUp(this.length++);
  };
  peak() {
    return this._queue[0];
  };
  shift() {
    let res = this._queue[0];
    this._queue[0] = this._queue[--this.length];
    this._nodeDown(0);
    return res;
  }
  isEmpty() {
    return this.length === 0;
  }
}

订阅者(消费者)

class Subscriber {
  static allSubs = {};
  constructor(topic, priority, mq, fn = null) {
    this.topic = topic;
    this.priority = priority;
    this.mq = mq;
    this.key = Symbol(topic);
    this.fn = fn;
  }
  // 请求一条消息
  async acquire() {
    let that = this;
    // 需要外部程序手动控制是否继续请求消息
    if (that.fn === null) {
      return new Promise(function (res) {
        that.mq.consume(that, res);
      });
    } else if (that.abortSignal) { // 有 fn 和 abortSignal,根据 fn 的返回值自动控制,并允许外部中断,但是需要外部进行消息补偿
      let continueListen = true;
      while (continueListen) {
        await new Promise((res, rej) => {
          that.mq.consume(that, res);
          that.abortSignal.addEventListener('abort', () => {
            rej('用户中断');
            continueListen = false;
          })
        }).catch(err => that.fn(err));
      }
    } else { // 根据 fn 返回值自动控制消息请求
      let continueListen = true;
      while (continueListen)
        await new Promise((res) => that.mq.consume(that, res)).then(async msg => {
          await (that.fn(msg)).then(con => continueListen = con);
        });
    }
  }

  // 取消订阅,释放内存
  unsubscribe() {
    delete Subscriber.allSubs[this.key];
  }

  /**
   * 自动控制事件处理、解除订阅
   * @param {function} fn 自动控制函数,接受一个参数为消息,返回布尔类型表示是否完成处理
   */
  async autoControl(fn) {
    let continueListen = true;
    while (continueListen) {
      await this.acquire().then(msg => continueListen = fn(msg));
    }
    this.unsubscribe();
  }
}

消息分发中心(消息队列)

const MessageQueue = class {
  dispacherWorking = false;
  nodeCompFunc = (a, b) => b.priority - a.priority;
  constructor() {
    this.messageQueue = [];
    this.blockingQueues = {};
    this.blockingQueues['*'] = new PriorityQueue(this.nodeCompFunc);
  }

  // 对队列中的消息进行分发
  _dispatch() {
    this.dispacherWorking = true;
    while (this.messageQueue.length !== 0) {
      let nowMsg = this.messageQueue[0];
      let targetQueue = this.blockingQueues[nowMsg.topic];
      this.messageQueue.shift(); // 清除读取的消息
      if (nowMsg.topic !== '*' && !this.blockingQueues['*'].isEmpty() &&
        this.nodeCompFunc(this.blockingQueues['*'].peak(), targetQueue.peak()) >= 0) {
        targetQueue = this.blockingQueues['*'];
      }
      if (targetQueue.length === 0) continue; // 消息积压时,直接丢弃消息,可以在这里添加其他处理方案
      let receiver = targetQueue.shift();
      receiver.res(nowMsg);
    }
    this.dispacherWorking = false;
  }

  /**
   * 生成订阅者
   * @param {string} topic 消息分类
   * @param {number} priority 当前订阅者在所属消息分类内的优先级
   * @returns {Subscriber} 订阅者对象
   */
  subscribe(topic, priority, fn = null) {
    let now = new Subscriber(topic, priority, this, fn);
    if (!this.blockingQueues[topic]) {
      this.newType(topic);
    }
    Subscriber.allSubs[now.key] = now;
    return now;
  }

  /**
   * 创建阻塞队列
   * @param {string} topic 订阅主题
   */
  newType(topic) {
    if (!this.blockingQueues[topic]) {
      this.blockingQueues[topic] = new PriorityQueue(this.nodeCompFunc);
    }
  }

  // 生产消息
  offer(msg) {
    this.messageQueue.push(msg);
    if (this.dispacherWorking) return;
    this._dispatch();
  }

  // 不应该手动调用,只应该由订阅者进行调用
  consume(sub, res) {
    this.blockingQueues[sub.topic].offer(new BlockNode(sub.priority, res));
    if (this.messageQueue.length !== 0 && !this.dispacherWorking) {
      this._dispatch();
    }
  }

}

// 放在阻塞队列中的节点
const BlockNode = class {
  constructor(priority, res) {
    this.priority = priority;
    this.res = res;
  }
}

使用举例

// 初始化消息中心
const clickMQ = new MessageQueue();

// 根据 fn 自动控制
clickMQ.subscribe('inner', -10, async (clickMsg) => {
  // 一些处理
  // return true;
  return false;
}).acquire();

// 根据 fn 自动控制,并允许外部中断
const deadSub = clickMQ.subscribe('*', 5, (err) => {
  clickMQ.offer({
    topic: '*',
    content: '补偿消息,消息内容无意义',
  });
});
deadSub.abortController = new AbortController();
deadSub.abortSignal = deadSub.abortController.signal; // 中断等待信号
deadSub.acquire();

// 普通的自动控制
cxlm.clickMQ.subscribe('inner', 20).autoControl(msg => {
  // 一些处理
  return true;
})