如何实现 promise.map,限制 promise 并发数
更多描述 实现一个 promise.map,进行并发数控制,有以下测试用例
pMap([1, 2, 3, 4, 5], (x) => Promise.resolve(x + 1));
pMap([Promise.resolve(1), Promise.resolve(2)], (x) => x + 1);
// 注意输出时间控制
pMap([1, 1, 1, 1, 1, 1, 1, 1], (x) => sleep(1000), { concurrency: 2 });
Issue 欢迎在 Gtihub Issue 中回答此问题: Issue 89 (opens in a new tab)
Author 回答者: dmwin72015 (opens in a new tab)
没人回答
Author 回答者: shfshanyue (opens in a new tab)
以下代码见 如何实现 promise.map - codepen (opens in a new tab)
function pMap(list, mapper, concurrency = Infinity) {
// list 为 Iterator,先转化为 Array
list = Array.from(list);
return new Promise((resolve, reject) => {
let currentIndex = 0;
let result = [];
let resolveCount = 0;
let len = list.length;
function next() {
const index = currentIndex;
currentIndex++;
Promise.resolve(list[index])
.then((o) => mapper(o, index))
.then((o) => {
result[index] = o;
resolveCount++;
if (resolveCount === len) {
resolve(result);
}
if (currentIndex < len) {
next();
}
});
}
for (let i = 0; i < concurrency && i < len; i++) {
next();
}
});
}
Author 回答者: heretic-G (opens in a new tab)
Promise.map = function (queue = [], opt = {}) {
let limit = opt.limit || 5;
let queueIndex = 0;
let completeCount = 0;
let _resolve;
let result = Array(queue.length);
for (let i = 0; i < limit; i++) {
next(queueIndex++);
}
function next(index) {
if (queue.length === 0) return;
let curr = queue.shift();
if (typeof curr === "function") {
curr = curr();
}
Promise.resolve(curr)
.then(
(res) => {
result[index] = res;
},
(res) => {
result[index] = res;
},
)
.finally(() => {
completeCount += 1;
if (completeCount === result.length) {
return _resolve(result);
}
next(queueIndex++);
});
}
return new Promise((resolve) => {
_resolve = resolve;
});
};
function add(a, b) {
return Promise.resolve(a + b);
}
function sum(arr) {
if (arr.length <= 2) {
return add(arr[0] || 0, arr[1] || 0);
}
let mid = (arr.length / 2) | 0;
let promiseArr = [];
for (let i = 0; i < mid; i++) {
promiseArr.push(add(arr[i], arr[mid + i]));
}
return Promise.map(promiseArr).then((res) => {
if (arr.length % 2 !== 0) {
res.push(arr.pop());
}
return sum(res);
});
}
Author 回答者: yazhouio (opens in a new tab)
function pMap(list, mapper, cur) {
cur = cur || list.length;
let step = Promise.resolve();
do {
let temp = list.splice(0, cur);
step = step.then(() =>
Promise.all(
temp.map((i, index) => Promise.resolve(i).then((e) => mapper(e, index)))
)
);
} while (list.length);
Author 回答者: Vi-jay (opens in a new tab)
function pMap([...arr], fn, { concurrency = Infinity } = {}) {
let queue = [],
results = [];
return new Promise((resolve) =>
(function closure() {
const times = concurrency - queue.length;
for (let i = 0; i < times; i++) {
if (!arr.length) return;
const promise = Promise.resolve(arr.shift())
.then(fn)
.then((data) => results.push(data))
.finally(() => {
queue = queue.filter((item) => promise !== item);
if (!queue.length && !arr.length) return resolve(results);
return closure();
});
queue.push(promise);
}
})(),
);
}
Author 回答者: yxw007 (opens in a new tab)
function parallelMap(arr, fn, concurrency = Number.MAX_SAFE_INTEGER) {
return new Promise((resolve) => {
let ret = [];
let index = -1;
function next() {
++index;
Promise.resolve(arr[index])
.then((val) => fn(val, index))
.then((res) => {
ret.push(res);
if (ret.length === arr.length) {
resolve(ret);
} else if (index < arr.length) {
next();
}
});
}
for (let i = 0; i < arr.length && i < concurrency; i++) {
next();
}
});
}
Author 回答者: kirazZ1 (opens in a new tab)
function pMap(argsArr, fn, config) {
let queue = [];
const next = () => {
if (queue.length > 0) {
queue.shift().then((_task) => _task());
}
};
const run = async (fn, resolve, args) => {
const result = await (async () => fn(...args))();
resolve(result);
try {
result;
} catch (e) {}
next();
};
return new Promise((resolve, reject) => {
const concurrency = (config || {}).concurrency;
if (!concurrency) {
Promise.all(
argsArr.map((item) =>
item instanceof Promise ? item.then(fn) : fn(item),
),
)
.then(resolve)
.catch(reject);
} else {
queue = argsArr.map(async (item) =>
run.bind(null, fn, resolve, [
item instanceof Promise ? await item : item,
]),
);
for (let i = 0; i < concurrency; i++) {
queue[i].then((_task) => _task());
}
}
});
}
Author 回答者: kirazZ1 (opens in a new tab)
type pMapType<T = any> = (
argsArr: any[],
fn: (...args: any) => Promise<T>,
concurrency?: number,
) => Promise<T>;
const pMap: pMapType = (argsArr, fn, concurrency = Infinity) => {
return new Promise((resolve, reject) => {
const result = new Array(argsArr.length).fill(0);
const taskQueue: any[] = [];
let currentWorkingAmount = 0;
function run() {
while (currentWorkingAmount < concurrency) {
const nextTask = taskQueue.shift();
if (nextTask) {
nextTask();
currentWorkingAmount++;
}
if (taskQueue.length === 0) break;
}
}
for (let i = 0; i < argsArr.length; i++) {
taskQueue.push(() => {
Promise.resolve(fn(argsArr[i]))
.then((res) => {
result[i] = res;
currentWorkingAmount--;
if (taskQueue.length === 0 && currentWorkingAmount === 0)
return resolve(result);
if (currentWorkingAmount < concurrency) run();
})
.catch(reject);
});
}
run();
});
};
function asyncFun(value: any, delay: number) {
return new Promise((resolve) => {
console.log("start " + value);
setTimeout(() => resolve(value), delay);
});
}
pMap([1, 2, 3, 4, 5, 6, 7, 8], (x) => asyncFun(`task${x}`, 2000), 4).then(
(res) => {
console.log("res", res);
},
);