木灵鱼儿

木灵鱼儿

阅读:858

最后更新:2021/04/25/ 14:21:33

异步队列管理器

造这个轮子其实也是没得办法,搜不到合适的轮子用,就只能自己干了。

使用场景

我们有N个异步任务promise,他们没有顺序关系,谁先触发都无所谓,但是我们只关心一点,如果某一个任务出错,后续就不要运行了,只有全部都success完成,那么才运行成功后的处理方式then。

当然,我们肯定不能使用Promise.all运行N个任务,这等于是同步触发了,如果我有2000个任务,难道你也一口气发2000个任务,那这就不现实了,所以这里我们要引入线程概念,一个进程可以有多个线程,那么进程就是管理器,线程就是我们一次可以发多少个请求。

线程是可以配置的,我们可以指定触发多少个线程,当1个线程完成后,我们要填补他的空缺,这样可以一直保持指定数量的线程在同时运行。

原理

最重要的其实就两点:

  1. 如何运行指定线程数量
  2. 如果在一个线程完成后能由下一个线程填补空缺

如何运行指定线程数量

很简单,for循环就行了,假设我们设置线程为5个,我们只需要for循环运行5次即可。

这里就有人问了?那怎么进行填补空缺?

线程填补空缺

其实很简单,假设我们有一个方法,永远可以获取到下一个要触发的promise,如果没有,这个方法返回undefined,我们假设这个方法为next

const arr = [()=>{},()=>{}];
let index = 0;

const next = ()=>{
  return arr[index++];
}

那么,我们只需要在上一个完成的promise的then里面在运行一下next获下一个人任务函数,不是就可以续上了。

当然,我们不可能傻乎乎就这样干了,你想想,如果你要在then里面再调用,就应该写一个执行体,执行体里面运行执行体才对。

这样才能循环调用,不然你手写,那得写多少then才是头。

基本的原理就是这样。

骨头架子

class AsyncQueueManager {
    constructor(options = {}) {
        this.initOptions(options);
    }

    //初始化基础配置
    initOptions(options) {
        Object.assign(this, {
            maxParallel: 1, //线程数量
            asyncArr: [], //任务数组 
            nextCount: 0, //下一个函数的下标
            successCount: 0, //已完成任务计数器
            status: true, //管理器状态,true正常,false异常
            reslove: function() {}, //全部完成回调(占位-实际应该是promise的reslove回调)
            reject: function(error) {
                throw error;
            }, //出错的回调(占位-实际应该是promise的reject回调)
        });
        //自定义线程数量
        if (typeof options.maxParallel === "number" && options.maxParallel > 0) {
            this.maxParallel = options.maxParallel;
        }
    }

    //添加任务
    push(fn) {
        this.asyncArr.push(fn);
    }

    //获取下一个任务
    next() {
        return this.asyncArr[this.nextCount++];
    }

    //开始运行
    start() {
        //返回promise
        return new Promise((resolve, reject) => {
            //promise回调绑定到this
            this.reslove = resolve;
            this.reject = reject;

            //运行设置的线程数量
            for (let i = 0; i < this.maxParallel; i++) {
                const fn = this.next();
                if (!fn) break; //没有任务了则退出for循环
                //运行体
                this.run(fn);
            }
        });
    }

    //运行体
    run(fn) {
        fn().then(res => {
            if (!this.status) return; //如果有异常停止后续运行
            this.successCount++; //成功计数器+1
            if (this.successCount >= this.asyncArr.length) {
                //全部完成
                return this.reslove();
            }
            //下一个 nextFn有可能不存在,所以不要在这做回调
            const nextFn = this.next();
            if (nextFn && this.status) {
                this.run(nextFn);
            }
        }).catch(err => {
            if (!this.status) return; //已经异常了不重复报错
            this.status = false; //标明异常
            return this.reject(err);
        });
    }
};

基本源码就是这样,当successCountasyncArr.length数值一样的时候,就表示所有任务就已经完成了,我们不必担心successCount的值不对,事实上,他只有成功时才会加一,那么20条数据,最终successCount也是为20

错误的可能会触发多个,所以在catch的时候,我们要判断之前是否已经出错了,已经出错就不需要重复reject,如果没有,则将status状态改为false异常。再reject。

为了省事,我将start方法中返回的promise回调都赋值到this上面了,省的我们无限循环run的时候,要重复把resolve和reject作为参数传入,因为,我们不知道在哪个run运行时会报错,也不知道异步任务,谁是最后一个上传的。

所以,绑定到this上,需要时通过this调用即可。

优化

目前来说,代码基本功能有了,我们还需要完善,比如,每次成功时,我应该有一个回调接口告诉外面,我这个任务成功了,失败的时候,虽然有catch然后reject输出了,但是也应该有一个错误回调

当我们所有任务完成后,resolve并没有传入任何值。

假设我们是上传20个文件,每个文件上传完成后会返回文件的在线链接,如果我们此时在then里面,是什么都没有的,我们只知道20个文件上传成功了,所以我们应该将成功的结果保存到数组,然后在resolve中抛出去。

既然如此,那么每次成功的回调,也应该将当前成功获得的responese对象作为参数传给外面。

完整代码

class AsyncQueueManager {
  constructor(options = {}) {
    this.initOptions(options);
  }

  //初始化基础配置
  initOptions(options) {
    Object.assign(this, {
      maxParallel: 1, //线程数量
      asyncArr: [],  //任务数组 
      nextCount: 0, //下一个函数的下标
      successCount: 0, //已完成任务计数器
      status: true,  //管理器状态,true正常,false异常
      successCallBack: function () { }, //每个任务完成后的回调
      errorCallBack: function () { }, //每个任务失败后的回调
      reslove: function () { }, //全部完成回调(占位-实际应该是promise的reslove回调)
      reject: function (error) { throw error; },  //出错的回调(占位-实际应该是promise的reject回调)
      responseArr: [], //成功后的结果集合
    });
    //自定义线程数量
    if (typeof options.maxParallel === "number" && options.maxParallel > 0) {
      this.maxParallel = options.maxParallel;
    }
  }

  //添加任务
  push(fn) {
    this.asyncArr.push(fn);
  }

  //获取下一个任务
  next() {
    return this.asyncArr[this.nextCount++];
  }

  //开始运行
  start(successFn, errorFn) {
    if (typeof successFn === "function") this.successCallBack = successFn;
    if (typeof errorFn === "function") this.errorCallBack = errorFn;
    //返回promise
    return new Promise((resolve, reject) => {
      //promise回调绑定到this
      this.reslove = resolve;
      this.reject = reject;

      //运行设置的线程数量
      for (let i = 0; i < this.maxParallel; i++) {
        const fn = this.next();
        if (!fn) break; //没有任务了则退出for循环
        //运行体
        this.run(fn);
      }
    });
  }

  //运行体
  run(fn) {
    fn().then(res => {
      if (!this.status) return; //如果有异常停止后续运行
      this.responseArr.push(res); //记录结果
      this.successCallBack(res);  //运行每次成功后的回调
      this.successCount++;  //成功计数器+1
      if (this.successCount >= this.asyncArr.length) {
        //全部完成
        return this.reslove(this.responseArr);
      }
      //下一个 nextFn有可能不存在,所以不要在这做回调
      const nextFn = this.next();
      if (nextFn && this.status) {
        this.run(nextFn);
      }
    }).catch(err => {
      if (!this.status) return; //已经异常了不重复报错
      this.status = false; //标明异常
      this.errorCallBack(err);  //失败的回调
      return this.reject(err);
    });
  }
};

使用方法

由于promise被new出来时就会运行,而我们任务管理器则需要控制他的运行,所以我们不能直接把promise对象传给管理,而是应该用一个函数包裹起来。

const task = ()=>{
  return new Promise(...省略);
}

这样才能通过管理进行控制。

const aqm = new AsyncQueueManager({maxParallel:5});

for(let i = 0;i<10;i++) {
  const task = ()=>{
    return new Promise(...省略);
  }
  aqm.push(task);
}

aqm.start((res)=>{
  console.log("每次成功的回调",res);
},(err)=>{
  console.log("每次错误的回调",err)
}).then(res=>{
  console.log("全部上传成功",res);
}).catch(err=>{
  console.log("上传出现错误",err);
});

到这就基本差不多了,简单造了个轮子,如果有大佬想加强也是可以的,也可以把思路分享出来,留言评论也行,我看到就会回复的!(记得留真实邮箱,这样我的邮件回复提醒才能生效)

版权申明

本文系作者 @木灵鱼儿 原创发布在木灵鱼儿 - 有梦就能远航站点。未经许可,禁止转载。

关于作者

站点职位 博主
获得点赞 0
文章被阅读 858

相关文章