木灵鱼儿
阅读:1966
异步队列管理器
造这个轮子其实也是没得办法,搜不到合适的轮子用,就只能自己干了。
使用场景
我们有N个异步任务promise,他们没有顺序关系,谁先触发都无所谓,但是我们只关心一点,如果某一个任务出错,后续就不要运行了,只有全部都success完成,那么才运行成功后的处理方式then。
当然,我们肯定不能使用Promise.all运行N个任务,这等于是同步触发了,如果我有2000个任务,难道你也一口气发2000个任务,那这就不现实了,所以这里我们要引入线程概念,一个进程可以有多个线程,那么进程就是管理器,线程就是我们一次可以发多少个请求。
线程是可以配置的,我们可以指定触发多少个线程,当1个线程完成后,我们要填补他的空缺,这样可以一直保持指定数量的线程在同时运行。
原理
最重要的其实就两点:
- 如何运行指定线程数量
- 如果在一个线程完成后能由下一个线程填补空缺
如何运行指定线程数量
很简单,for循环就行了,假设我们设置线程为5个,我们只需要for循环运行5次即可。
这里就有人问了?那怎么进行填补空缺?
线程填补空缺
其实很简单,假设我们有一个方法,永远可以获取到下一个要触发的promise,如果没有,这个方法返回undefined,我们假设这个方法为next
const arr = [()=>{},()=>{}];
let index = 0;
const next = ()=>{
return arr[index++];
}
那么,我们只需要在上一个完成的promise的then里面在运行一下next
获下一个人任务函数,不是就可以续上了。
当然,我们不可能傻乎乎就这样干了,你想想,如果你要在then里面再调用,就应该写一个执行体,执行体里面运行执行体才对。
这样才能循环调用,不然你手写,那得写多少then才是头。
基本的原理就是这样。
骨头架子
class AsyncQueueManager {
constructor(options = {}) {
this.initOptions(options);
}
//初始化基础配置
initOptions(options) {
Object.assign(this, {
maxParallel: 1, //线程数量
asyncArr: [], //任务数组
nextCount: 0, //下一个函数的下标
successCount: 0, //已完成任务计数器
status: true, //管理器状态,true正常,false异常
reslove: function() {}, //全部完成回调(占位-实际应该是promise的reslove回调)
reject: function(error) {
throw error;
}, //出错的回调(占位-实际应该是promise的reject回调)
});
//自定义线程数量
if (typeof options.maxParallel === "number" && options.maxParallel > 0) {
this.maxParallel = options.maxParallel;
}
}
//添加任务
push(fn) {
this.asyncArr.push(fn);
}
//获取下一个任务
next() {
return this.asyncArr[this.nextCount++];
}
//开始运行
start() {
//返回promise
return new Promise((resolve, reject) => {
//promise回调绑定到this
this.reslove = resolve;
this.reject = reject;
//运行设置的线程数量
for (let i = 0; i < this.maxParallel; i++) {
const fn = this.next();
if (!fn) break; //没有任务了则退出for循环
//运行体
this.run(fn);
}
});
}
//运行体
run(fn) {
fn().then(res => {
if (!this.status) return; //如果有异常停止后续运行
this.successCount++; //成功计数器+1
if (this.successCount >= this.asyncArr.length) {
//全部完成
return this.reslove();
}
//下一个 nextFn有可能不存在,所以不要在这做回调
const nextFn = this.next();
if (nextFn && this.status) {
this.run(nextFn);
}
}).catch(err => {
if (!this.status) return; //已经异常了不重复报错
this.status = false; //标明异常
return this.reject(err);
});
}
};
基本源码就是这样,当successCount
和asyncArr.length
数值一样的时候,就表示所有任务就已经完成了,我们不必担心successCount的值不对,事实上,他只有成功时才会加一,那么20条数据,最终successCount也是为20
错误的可能会触发多个,所以在catch的时候,我们要判断之前是否已经出错了,已经出错就不需要重复reject,如果没有,则将status状态改为false异常。再reject。
为了省事,我将start方法中返回的promise回调都赋值到this上面了,省的我们无限循环run的时候,要重复把resolve和reject作为参数传入,因为,我们不知道在哪个run运行时会报错,也不知道异步任务,谁是最后一个上传的。
所以,绑定到this上,需要时通过this调用即可。
优化
目前来说,代码基本功能有了,我们还需要完善,比如,每次成功时,我应该有一个回调接口告诉外面,我这个任务成功了,失败的时候,虽然有catch然后reject输出了,但是也应该有一个错误回调
当我们所有任务完成后,resolve并没有传入任何值。
假设我们是上传20个文件,每个文件上传完成后会返回文件的在线链接,如果我们此时在then里面,是什么都没有的,我们只知道20个文件上传成功了,所以我们应该将成功的结果保存到数组,然后在resolve中抛出去。
既然如此,那么每次成功的回调,也应该将当前成功获得的responese对象作为参数传给外面。
完整代码
class AsyncQueueManager {
constructor(options = {}) {
this.initOptions(options);
}
//初始化基础配置
initOptions(options) {
Object.assign(this, {
maxParallel: 1, //线程数量
asyncArr: [], //任务数组
nextCount: 0, //下一个函数的下标
successCount: 0, //已完成任务计数器
status: true, //管理器状态,true正常,false异常
successCallBack: function () { }, //每个任务完成后的回调
errorCallBack: function () { }, //每个任务失败后的回调
reslove: function () { }, //全部完成回调(占位-实际应该是promise的reslove回调)
reject: function (error) { throw error; }, //出错的回调(占位-实际应该是promise的reject回调)
responseArr: [], //成功后的结果集合
});
//自定义线程数量
if (typeof options.maxParallel === "number" && options.maxParallel > 0) {
this.maxParallel = options.maxParallel;
}
}
//添加任务
push(fn) {
this.asyncArr.push(fn);
}
//获取下一个任务
next() {
return this.asyncArr[this.nextCount++];
}
//开始运行
start(successFn, errorFn) {
if (typeof successFn === "function") this.successCallBack = successFn;
if (typeof errorFn === "function") this.errorCallBack = errorFn;
//返回promise
return new Promise((resolve, reject) => {
//promise回调绑定到this
this.reslove = resolve;
this.reject = reject;
//运行设置的线程数量
for (let i = 0; i < this.maxParallel; i++) {
const fn = this.next();
if (!fn) break; //没有任务了则退出for循环
//运行体
this.run(fn);
}
});
}
//运行体
run(fn) {
fn().then(res => {
if (!this.status) return; //如果有异常停止后续运行
this.responseArr.push(res); //记录结果
this.successCallBack(res); //运行每次成功后的回调
this.successCount++; //成功计数器+1
if (this.successCount >= this.asyncArr.length) {
//全部完成
return this.reslove(this.responseArr);
}
//下一个 nextFn有可能不存在,所以不要在这做回调
const nextFn = this.next();
if (nextFn && this.status) {
this.run(nextFn);
}
}).catch(err => {
if (!this.status) return; //已经异常了不重复报错
this.status = false; //标明异常
this.errorCallBack(err); //失败的回调
return this.reject(err);
});
}
};
使用方法
由于promise被new出来时就会运行,而我们任务管理器则需要控制他的运行,所以我们不能直接把promise对象传给管理,而是应该用一个函数包裹起来。
const task = ()=>{
return new Promise(...省略);
}
这样才能通过管理进行控制。
const aqm = new AsyncQueueManager({maxParallel:5});
for(let i = 0;i<10;i++) {
const task = ()=>{
return new Promise(...省略);
}
aqm.push(task);
}
aqm.start((res)=>{
console.log("每次成功的回调",res);
},(err)=>{
console.log("每次错误的回调",err)
}).then(res=>{
console.log("全部上传成功",res);
}).catch(err=>{
console.log("上传出现错误",err);
});
到这就基本差不多了,简单造了个轮子,如果有大佬想加强也是可以的,也可以把思路分享出来,留言评论也行,我看到就会回复的!(记得留真实邮箱,这样我的邮件回复提醒才能生效)
版权申明
本文系作者 @木灵鱼儿 原创发布在木灵鱼儿 - 有梦就能远航站点。未经许可,禁止转载。
相关推荐
生成器 Generator
简介Promise的出现,我们可以将回调进行反客为主,不在受回调调用者的限制(控制反转和信任问题),但是它并没有解决异步编程导致的代码顺序问题,有没有一种方式,可以让我们的代码虽然是异步的,但是书写顺序却是同步的,这样完全符合我们人类大脑理解方式?console.log("同步的1"); axios({ ... }).then(() => { console.log("异步的回调,如果需要等待结果处理,代码只能写在这个回调里") }); console.log("同步的2")很明显,如果我们希望"同步...
弹窗队列管理器
原理全局存在一个弹窗管理器,他管理着一个数组,所有的弹窗触发全部包成一个函数,交给管理器,管理器去查看队列是否有内容,如果有内容就不触发,无内容就将本次函数push进数组中,然后触发next方法弹出下一个弹窗。当弹窗关闭后,触发管理器的remove方法,数组首位出栈,触发next方法,弹出下一个弹窗示意图源码四个文件:dialogMap.ts 弹窗数据,可能这个弹窗这次需要队列,下次就不需要,通过这个配置index.ts 入口文件queueManager.ts 管理器types.ts 类型声明types.ts/* * @Author: mulingyuer * @Date: 20...

Promise失败重试,可指定重试次数
//模拟异步请求 function axiosFn() { return new Promise((resolve, reject) => { const flge = Math.random(); //随机值 setTimeout(() => { //大于0.7就是成功 if (flge > 0.7) { return resolve(flge); } else { return reject(...
animation 动画的三个事件
const div = document.querySelector("div.box"); div.addEventListener("animationstart", function() { //动画开始运行触发 }); div.addEventListener("animationiteration", function() { //动画每执行一次触发一次,适用用多次动画 }); div.addEventListener("animationend", function() { //...
手写Promise
/* * @Author: mulingyuer * @Date: 2021-12-30 22:06:58 * @LastEditTime: 2022-01-03 05:22:30 * @LastEditors: mulingyuer * @Description: 手写promise * @FilePath: \undefinedc:\Users\13219\Desktop\promise.js * 怎么可能会有bug!!! */ /** * @description: 自定义promise * @param {fucntion} executor 执行器函数(同...
promise 队列
数组map实现function fn1() { return new Promise((resolve, reject) => { setTimeout(() => { console.log(1); resolve(); }, 500) }) } function fn2() { return new Promise((resolve, reject) => { setTimeout(() => { console.l...
利用Promise实现一个超时结束等待操作
promise如果没有指定状态,那么就一直会处于pending中,如果长时间不处理,那么这个东西会一直存在于内存中,显然是不合理的。如果是一个超多请求项目,那么我们就需要考虑下性能问题了。promise中有一个rece方法,它接收一个promise作为值的数组,它的特性就是:哪个promise先执行,他就处理那一个,不管是resolve还是reject;在then中,他也只有一个值,不同于Promise.all方法返回的是一个数组,rece返回的值是最快完成的那个promise的返回值。利用这个特性,我们可以制作一个超时处理。function delayPromise(promise, ...
深度合并对象的方法
找了很久,现有的库有两个:1. Mergenpm地址: Merge用法:import merge from 'merge'; const obj1 = { name: 2}; const obj2 = { value: 1 }; //合并 merge(obj1,obj2); console.log(obj1); //{name:2,value:1} //克隆合并-与目标对象无关联 const obj3 = merge(true,obj1,obj2); console.log(obj3); //{name:2,value:1} console.log(obj3 === obj1)...
利用JSON过滤对象和数组中指定的key属性
有时候我们在vue中进行for循环,就会涉及到绑定唯一值key的问题,但是并不是任何时候都会存在所谓的唯一值,使用index下标明显是不合适的,官方也不推荐,除非你for循环出来的列表不用变化。所以一般常用的做法就是给for循环的对象添加一个属性,属性的值是随机的uuid或者时间戳。这样前端问题解决了,如果遍历的数据还需要提交到后端,那么不就多了一个属性,这个属性后端不需要的。所以,我们需要在提交数据前,对数据进行过滤。过滤又得for循环删除?那怎么行,有没有那种通用的,简单的方法。过滤方法/** * @description: 过滤对象中指定的属性,也可以拿来浅拷贝 * @para...