异步队列管理器
造这个轮子其实也是没得办法,搜不到合适的轮子用,就只能自己干了。
使用场景
我们有N个异步任务promise,他们没有顺序关系,谁先触发都无所谓,但是我们只关心一点,如果某一个任务出错,后续就不要运行了,只有全部都success完成,那么才运行成功后的处理方式then。
当然,我们肯定不能使用Promise.all运行N个任务,这等于是同步触发了,如果我有2000个任务,难道你也一口气发2000个任务,那这就不现实了,所以这里我们要引入线程概念,一个进程可以有多个线程,那么进程就是管理器,线程就是我们一次可以发多少个请求。
线程是可以配置的,我们可以指定触发多少个线程,当1个线程完成后,我们要填补他的空缺,这样可以一直保持指定数量的线程在同时运行。
原理
最重要的其实就两点:
- 如何运行指定线程数量
- 如果在一个线程完成后能由下一个线程填补空缺
如何运行指定线程数量
很简单,for循环就行了,假设我们设置线程为5个,我们只需要for循环运行5次即可。
这里就有人问了?那怎么进行填补空缺?
线程填补空缺
其实很简单,假设我们有一个方法,永远可以获取到下一个要触发的promise,如果没有,这个方法返回undefined,我们假设这个方法为next
const arr = [()=>{},()=>{}];
let index = 0;
const next = ()=>{
return arr[index++];
}
那么,我们只需要在上一个完成的promise的then里面在运行一下next
获下一个人任务函数,不是就可以续上了。
当然,我们不可能傻乎乎就这样干了,你想想,如果你要在then里面再调用,就应该写一个执行体,执行体里面运行执行体才对。
这样才能循环调用,不然你手写,那得写多少then才是头。
基本的原理就是这样。
骨头架子
class AsyncQueueManager {
constructor(options = {}) {
this.initOptions(options);
}
//初始化基础配置
initOptions(options) {
Object.assign(this, {
maxParallel: 1, //线程数量
asyncArr: [], //任务数组
nextCount: 0, //下一个函数的下标
successCount: 0, //已完成任务计数器
status: true, //管理器状态,true正常,false异常
reslove: function() {}, //全部完成回调(占位-实际应该是promise的reslove回调)
reject: function(error) {
throw error;
}, //出错的回调(占位-实际应该是promise的reject回调)
});
//自定义线程数量
if (typeof options.maxParallel === "number" && options.maxParallel > 0) {
this.maxParallel = options.maxParallel;
}
}
//添加任务
push(fn) {
this.asyncArr.push(fn);
}
//获取下一个任务
next() {
return this.asyncArr[this.nextCount++];
}
//开始运行
start() {
//返回promise
return new Promise((resolve, reject) => {
//promise回调绑定到this
this.reslove = resolve;
this.reject = reject;
//运行设置的线程数量
for (let i = 0; i < this.maxParallel; i++) {
const fn = this.next();
if (!fn) break; //没有任务了则退出for循环
//运行体
this.run(fn);
}
});
}
//运行体
run(fn) {
fn().then(res => {
if (!this.status) return; //如果有异常停止后续运行
this.successCount++; //成功计数器+1
if (this.successCount >= this.asyncArr.length) {
//全部完成
return this.reslove();
}
//下一个 nextFn有可能不存在,所以不要在这做回调
const nextFn = this.next();
if (nextFn && this.status) {
this.run(nextFn);
}
}).catch(err => {
if (!this.status) return; //已经异常了不重复报错
this.status = false; //标明异常
return this.reject(err);
});
}
};
基本源码就是这样,当successCount
和asyncArr.length
数值一样的时候,就表示所有任务就已经完成了,我们不必担心successCount的值不对,事实上,他只有成功时才会加一,那么20条数据,最终successCount也是为20
错误的可能会触发多个,所以在catch的时候,我们要判断之前是否已经出错了,已经出错就不需要重复reject,如果没有,则将status状态改为false异常。再reject。
为了省事,我将start方法中返回的promise回调都赋值到this上面了,省的我们无限循环run的时候,要重复把resolve和reject作为参数传入,因为,我们不知道在哪个run运行时会报错,也不知道异步任务,谁是最后一个上传的。
所以,绑定到this上,需要时通过this调用即可。
优化
目前来说,代码基本功能有了,我们还需要完善,比如,每次成功时,我应该有一个回调接口告诉外面,我这个任务成功了,失败的时候,虽然有catch然后reject输出了,但是也应该有一个错误回调
当我们所有任务完成后,resolve并没有传入任何值。
假设我们是上传20个文件,每个文件上传完成后会返回文件的在线链接,如果我们此时在then里面,是什么都没有的,我们只知道20个文件上传成功了,所以我们应该将成功的结果保存到数组,然后在resolve中抛出去。
既然如此,那么每次成功的回调,也应该将当前成功获得的responese对象作为参数传给外面。
完整代码
class AsyncQueueManager {
constructor(options = {}) {
this.initOptions(options);
}
//初始化基础配置
initOptions(options) {
Object.assign(this, {
maxParallel: 1, //线程数量
asyncArr: [], //任务数组
nextCount: 0, //下一个函数的下标
successCount: 0, //已完成任务计数器
status: true, //管理器状态,true正常,false异常
successCallBack: function () { }, //每个任务完成后的回调
errorCallBack: function () { }, //每个任务失败后的回调
reslove: function () { }, //全部完成回调(占位-实际应该是promise的reslove回调)
reject: function (error) { throw error; }, //出错的回调(占位-实际应该是promise的reject回调)
responseArr: [], //成功后的结果集合
});
//自定义线程数量
if (typeof options.maxParallel === "number" && options.maxParallel > 0) {
this.maxParallel = options.maxParallel;
}
}
//添加任务
push(fn) {
this.asyncArr.push(fn);
}
//获取下一个任务
next() {
return this.asyncArr[this.nextCount++];
}
//开始运行
start(successFn, errorFn) {
if (typeof successFn === "function") this.successCallBack = successFn;
if (typeof errorFn === "function") this.errorCallBack = errorFn;
//返回promise
return new Promise((resolve, reject) => {
//promise回调绑定到this
this.reslove = resolve;
this.reject = reject;
//运行设置的线程数量
for (let i = 0; i < this.maxParallel; i++) {
const fn = this.next();
if (!fn) break; //没有任务了则退出for循环
//运行体
this.run(fn);
}
});
}
//运行体
run(fn) {
fn().then(res => {
if (!this.status) return; //如果有异常停止后续运行
this.responseArr.push(res); //记录结果
this.successCallBack(res); //运行每次成功后的回调
this.successCount++; //成功计数器+1
if (this.successCount >= this.asyncArr.length) {
//全部完成
return this.reslove(this.responseArr);
}
//下一个 nextFn有可能不存在,所以不要在这做回调
const nextFn = this.next();
if (nextFn && this.status) {
this.run(nextFn);
}
}).catch(err => {
if (!this.status) return; //已经异常了不重复报错
this.status = false; //标明异常
this.errorCallBack(err); //失败的回调
return this.reject(err);
});
}
};
使用方法
由于promise被new出来时就会运行,而我们任务管理器则需要控制他的运行,所以我们不能直接把promise对象传给管理,而是应该用一个函数包裹起来。
const task = ()=>{
return new Promise(...省略);
}
这样才能通过管理进行控制。
const aqm = new AsyncQueueManager({maxParallel:5});
for(let i = 0;i<10;i++) {
const task = ()=>{
return new Promise(...省略);
}
aqm.push(task);
}
aqm.start((res)=>{
console.log("每次成功的回调",res);
},(err)=>{
console.log("每次错误的回调",err)
}).then(res=>{
console.log("全部上传成功",res);
}).catch(err=>{
console.log("上传出现错误",err);
});
到这就基本差不多了,简单造了个轮子,如果有大佬想加强也是可以的,也可以把思路分享出来,留言评论也行,我看到就会回复的!(记得留真实邮箱,这样我的邮件回复提醒才能生效)
本文系作者 @木灵鱼儿 原创发布在木灵鱼儿站点。未经许可,禁止转载。
暂无评论数据