高级前端
js
【Q088】如何实现 promise.map,限制 promise 并发数

如何实现 promise.map,限制 promise 并发数

更多描述 实现一个 promise.map,进行并发数控制,有以下测试用例

pMap([1, 2, 3, 4, 5], (x) => Promise.resolve(x + 1));
 
pMap([Promise.resolve(1), Promise.resolve(2)], (x) => x + 1);
 
// 注意输出时间控制
pMap([1, 1, 1, 1, 1, 1, 1, 1], (x) => sleep(1000), { concurrency: 2 });

Issue 欢迎在 Gtihub Issue 中回答此问题: Issue 89 (opens in a new tab)

Author 回答者: dmwin72015 (opens in a new tab)

没人回答

Author 回答者: shfshanyue (opens in a new tab)

以下代码见 如何实现 promise.map - codepen (opens in a new tab)

function pMap(list, mapper, concurrency = Infinity) {
  // list 为 Iterator,先转化为 Array
  list = Array.from(list);
  return new Promise((resolve, reject) => {
    let currentIndex = 0;
    let result = [];
    let resolveCount = 0;
    let len = list.length;
    function next() {
      const index = currentIndex;
      currentIndex++;
      Promise.resolve(list[index])
        .then((o) => mapper(o, index))
        .then((o) => {
          result[index] = o;
          resolveCount++;
          if (resolveCount === len) {
            resolve(result);
          }
          if (currentIndex < len) {
            next();
          }
        });
    }
    for (let i = 0; i < concurrency && i < len; i++) {
      next();
    }
  });
}

Author 回答者: heretic-G (opens in a new tab)

Promise.map = function (queue = [], opt = {}) {
  let limit = opt.limit || 5;
  let queueIndex = 0;
  let completeCount = 0;
  let _resolve;
  let result = Array(queue.length);
 
  for (let i = 0; i < limit; i++) {
    next(queueIndex++);
  }
 
  function next(index) {
    if (queue.length === 0) return;
    let curr = queue.shift();
    if (typeof curr === "function") {
      curr = curr();
    }
    Promise.resolve(curr)
      .then(
        (res) => {
          result[index] = res;
        },
        (res) => {
          result[index] = res;
        },
      )
      .finally(() => {
        completeCount += 1;
        if (completeCount === result.length) {
          return _resolve(result);
        }
        next(queueIndex++);
      });
  }
  return new Promise((resolve) => {
    _resolve = resolve;
  });
};
 
function add(a, b) {
  return Promise.resolve(a + b);
}
 
function sum(arr) {
  if (arr.length <= 2) {
    return add(arr[0] || 0, arr[1] || 0);
  }
  let mid = (arr.length / 2) | 0;
  let promiseArr = [];
  for (let i = 0; i < mid; i++) {
    promiseArr.push(add(arr[i], arr[mid + i]));
  }
  return Promise.map(promiseArr).then((res) => {
    if (arr.length % 2 !== 0) {
      res.push(arr.pop());
    }
    return sum(res);
  });
}

Author 回答者: yazhouio (opens in a new tab)

function pMap(list, mapper, cur) {
  cur = cur || list.length;
  let step = Promise.resolve();
  do {
    let temp = list.splice(0, cur);
    step = step.then(() =>
      Promise.all(
        temp.map((i, index) => Promise.resolve(i).then((e) => mapper(e, index)))
      )
    );
  } while (list.length);
 

Author 回答者: Vi-jay (opens in a new tab)

function pMap([...arr], fn, { concurrency = Infinity } = {}) {
  let queue = [],
    results = [];
  return new Promise((resolve) =>
    (function closure() {
      const times = concurrency - queue.length;
      for (let i = 0; i < times; i++) {
        if (!arr.length) return;
        const promise = Promise.resolve(arr.shift())
          .then(fn)
          .then((data) => results.push(data))
          .finally(() => {
            queue = queue.filter((item) => promise !== item);
            if (!queue.length && !arr.length) return resolve(results);
            return closure();
          });
        queue.push(promise);
      }
    })(),
  );
}

Author 回答者: yxw007 (opens in a new tab)

function parallelMap(arr, fn, concurrency = Number.MAX_SAFE_INTEGER) {
  return new Promise((resolve) => {
    let ret = [];
    let index = -1;
    function next() {
      ++index;
      Promise.resolve(arr[index])
        .then((val) => fn(val, index))
        .then((res) => {
          ret.push(res);
          if (ret.length === arr.length) {
            resolve(ret);
          } else if (index < arr.length) {
            next();
          }
        });
    }
 
    for (let i = 0; i < arr.length && i < concurrency; i++) {
      next();
    }
  });
}

Author 回答者: kirazZ1 (opens in a new tab)

function pMap(argsArr, fn, config) {
  let queue = [];
 
  const next = () => {
    if (queue.length > 0) {
      queue.shift().then((_task) => _task());
    }
  };
 
  const run = async (fn, resolve, args) => {
    const result = await (async () => fn(...args))();
    resolve(result);
    try {
      result;
    } catch (e) {}
    next();
  };
 
  return new Promise((resolve, reject) => {
    const concurrency = (config || {}).concurrency;
    if (!concurrency) {
      Promise.all(
        argsArr.map((item) =>
          item instanceof Promise ? item.then(fn) : fn(item),
        ),
      )
        .then(resolve)
        .catch(reject);
    } else {
      queue = argsArr.map(async (item) =>
        run.bind(null, fn, resolve, [
          item instanceof Promise ? await item : item,
        ]),
      );
      for (let i = 0; i < concurrency; i++) {
        queue[i].then((_task) => _task());
      }
    }
  });
}

Author 回答者: kirazZ1 (opens in a new tab)

type pMapType<T = any> = (
  argsArr: any[],
  fn: (...args: any) => Promise<T>,
  concurrency?: number,
) => Promise<T>;
 
const pMap: pMapType = (argsArr, fn, concurrency = Infinity) => {
  return new Promise((resolve, reject) => {
    const result = new Array(argsArr.length).fill(0);
 
    const taskQueue: any[] = [];
 
    let currentWorkingAmount = 0;
 
    function run() {
      while (currentWorkingAmount < concurrency) {
        const nextTask = taskQueue.shift();
        if (nextTask) {
          nextTask();
          currentWorkingAmount++;
        }
        if (taskQueue.length === 0) break;
      }
    }
 
    for (let i = 0; i < argsArr.length; i++) {
      taskQueue.push(() => {
        Promise.resolve(fn(argsArr[i]))
          .then((res) => {
            result[i] = res;
            currentWorkingAmount--;
            if (taskQueue.length === 0 && currentWorkingAmount === 0)
              return resolve(result);
            if (currentWorkingAmount < concurrency) run();
          })
          .catch(reject);
      });
    }
 
    run();
  });
};
 
function asyncFun(value: any, delay: number) {
  return new Promise((resolve) => {
    console.log("start " + value);
    setTimeout(() => resolve(value), delay);
  });
}
 
pMap([1, 2, 3, 4, 5, 6, 7, 8], (x) => asyncFun(`task${x}`, 2000), 4).then(
  (res) => {
    console.log("res", res);
  },
);