JavaScript 异步模式

无论是开发前端应用还是 Node 应用,都会涉及到很多异步操作,常见的异步操作有:

  • 网络操作:Ajax、fetch、Node HTTP 请求、WebSocket、TCP/UDP 等
  • 文件操作:读取、写入等
  • Timers:setTimeout、setInterval、setImmediate
  • process.nextTick
  • 等等

JavaScript 处理异步操作的语法也在逐步发展,从最初的回调(Callbacks)Promise,再到生成器(Generators),最后到异步函数(async functions)

先不考虑不同的写法,从更抽象层次分析一下异步操作。

  • 任务:每个异步操作都是在执行一个异步任务。
  • 状态:异步任务涉及到一些状态:未执行执行中执行成功执行失败,其中执行成功执行失败都是执行完成状态。可能还涉及到执行进度信息,异步操作可以处于暂停状态,也可以处于被取消状态。
  • 常见异步任务流程

    • 执行一个异步任务,执行完成之后,使用其处理后的数据执行其他任务,或执行失败之后,使用其错误信息执行其他任务
    • 按顺序执行一组任务,前一个任务完成之后执行下一个任务,如果有任何一个任务报错,在中止执行后面的任务
    • 并发执行一组任务,所有任务执行完成之后执行其他任务
    • 并发执行一组任务,有任何一个任务完成,则执行其他任务
    • 并发执行一组任务,所有任务执行完成之后执行其他任务,但是限制同时执行的任务数量上限
    • 取消执行
    • 暂停执行
    • 恢复执行
    • 获取任务执行进度信息

常见异步任务示例

下面使用不同的语法来完成常见异步任务

执行一个异步任务,执行完成之后,使用其处理后的数据执行其他任务,或执行失败之后,使用其错误信息执行其他任务

回调

const fs = require('fs');

fs.readFile('./data/item.json', (err, data) => {
  if (err) {
    throw err;
  } else {
    console.log(data.toString('utf8'));
  }
});

Node.js 使用 CPS 风格回调,回调的第一个参数为异常信息,之后的参数为返回值。

Promise

const fs = require('fs');

// 从 Node v10.0.0 开始支持
// v10.0.0 之前版本可以使用 util.promisify() 或方法来将回调接口转换成 Promise 接口
const filehandle = fs.promises;

filehandle.readFile('./data/item.json')
  .then(data => {
    console.log(data.toString('utf8'));
  })
  .catch(err => {
    console.log(err);
  });

Promise 使用catch来捕获异常。

生成器

const fs = require('fs');
const filehandle = fs.promises;

function readFile(file) {
  return filehandle.readFile(file);
}

function runGen(genFn) {
  const iterator = genFn();
  return new Promise((resolve, reject) => {
    const parse = (result) => {
      const item = iterator.next(result);
      if (item.done) {
        resolve(result);
      } else {
        Promise.resolve(item.value).then(parse, reject);
      }
    };
    parse();
  });
}

runGen(function* gen() {
  try {
    const data = yield readFile('./data/item.json');
    console.log(data.toString('utf8'));
  } catch (err) {
    console.log(err);
  }
});

单纯使用 Generator 来写异步代码并不友好,类似于上面基于 Promise 的runGen辅助函数可以让写法更类似于我们写同步代码一样。这里的runGen只是示例性质,实际上可以使用 co,co 也支持更多功能,具体使用可以查看文档。

异步函数

const fs = require('fs');
const filehandle = fs.promises;

(async function () {
  try {
    const data = await filehandle.readFile('./data/item.json')
    console.log(data.toString('utf8'));
  } catch (err) {
    console.log(err);
  }
})();

异步函数也是基于 Promise,就像我们写同步代码一样,可以直接使用try...catch捕获异常。可以发现异步函数写法与上面的生成器写法很类似,可以认为异步函数是上面生成器写法的语法糖。

按顺序执行一组任务,前一个任务完成之后执行下一个任务,如果有任何一个任务报错,在中止执行后面的任务

假设./data/item.json文件内容如下:

{
  "count": 1
}

现在我们首先检查./data/item.json是否可以读取;然后,读取文件内容;然后增加count值,并写入到./data/item.json文件。

回调

const fs = require('fs');

function increment(file, callback) {
  fs.access(file, (err) => {
    if (err) {
      callback(err);
    } else {
      fs.readFile(file, 'utf8', (err1, data) => {
        if (err1) {
          callback(err1);
        } else {
          try {
            const json = JSON.parse(data);
            json.count++;
            fs.writeFile(file, JSON.stringify(json), callback);
          } catch (err2) {
            callback(err2);
          }
        }
      });
    }
  });
}

increment('./data/item.json', (err) => {
  if (err) {
    console.log(err);
  } else {
    console.log('incremented');
  }
});

上面是比较直观的写法,但是存在回调地狱(callback hell)问题,层层嵌套回调不利于理解代码逻辑,另外回调第一个参数变量命名也是问题。当然可以通过 Promise、生成器或异步函数来解决。不过即使只使用回调方法也是可以避免回调地狱的,比如使用 Async 函数库当中的serieswaterfall方法,这里使用 waterfall 方法。

const fs = require('fs');
const waterfall = require('async/waterfall');

function increment(file, callback) {
  waterfall([
    (cb) => {
      fs.access(file, cb);
    },
    (cb) => {
      fs.readFile(file, 'utf8', cb);
    },
    (data, cb) => {
      try {
        const json = JSON.parse(data);
        json.count++;
        fs.writeFile(file, JSON.stringify(json), cb);
      } catch (err) {
        cb(err);
      }
    }
  ], callback);
}

increment('./data/item.json', (err) => {
  if (err) {
    console.log(err);
  } else {
    console.log('incremented');
  }
});

Promise

const fs = require('fs');
const filehandle = fs.promises;

function increment(file) {
  return filehandle.access(file)
    .then(() => {
      return filehandle.readFile(file, 'utf8');
    })
    .then(data => {
      const json = JSON.parse(data);
      json.count++;
      return filehandle.writeFile(file, JSON.stringify(json));
    });
}

increment('./data/item.json')
  .then(() => {
    console.log('incremented');
  })
  .catch(err => {
    console.log(err);
  });

生成器

const fs = require('fs');
const co = require('co');
const filehandle = fs.promises;

function increment(file) {
  return co(function* () {
    yield filehandle.access(file);
    const data = yield filehandle.readFile(file, 'utf8');
    const json = JSON.parse(data);
    json.count++;
    yield filehandle.writeFile(file, JSON.stringify(json));
  });
}

increment('./data/item.json')
  .then(() => {
    console.log('incremented');
  })
  .catch(err => {
    console.log(err);
  });

异步函数

const fs = require('fs');
const filehandle = fs.promises;

async function increment(file) {
  await filehandle.access(file);
  const data = await filehandle.readFile(file, 'utf8');
  const json = JSON.parse(data);
  json.count++;
  await filehandle.writeFile(file, JSON.stringify(json));
}

increment('./data/item.json')
  .then(() => {
    console.log('incremented');
  })
  .catch(err => {
    console.log(err);
  });

并发执行一组任务,所有任务执行完成之后执行其他任务

假设./data目录下所有的 json 文件都是类似于之前./data/item.json数据结构,我们要读取所有的 json 文件,并使字段count增加 1。

回调函数

const fs = require('fs');
const path = require('path');
const waterfall = require('async/waterfall');
const parallel = require('async/parallel');

waterfall([
  (callback) => {
    fs.readdir('./data', callback);
  },
  (files, callback) => {
    const jsonFiles = files.map(file => path.join('./data', file));
    const tasks = jsonFiles.map(jsonFile => {
      return (callback) => {
        // increment 为上一节回调函数当中定义的 increment 函数
        increment(jsonFile, callback);
      };
    });
    parallel(tasks, callback);

  }
], (err, results) => {
  if (err) {
    console.log(err);
  } else {
    console.log('incremented');
  }
});

Promise

const fs = require('fs');
const path = require('path');
const filehandle = fs.promises;

filehandle.readdir('./data')
  .then(files => {
    const jsonFiles = files.map(file => path.join('./data', file));
    // increment 为上一节 Promise 当中定义的 increment 函数
    const tasks = jsonFiles.map(jsonFile => increment(jsonFile));
    return Promise.all(tasks);
  })
  .then(() => {
    console.log('incremented');
  })
  .catch(err => {
    console.log(err);
  });

生成器

const fs = require('fs');
const path = require('path');
const co = require('co');
const filehandle = fs.promises;

co(function* () {
  const files = yield filehandle.readdir('./data');
  const jsonFiles = files.map(file => path.join('./data', file));
  // increment 为上一节生成器当中定义的 increment 函数
  const tasks = jsonFiles.map(jsonFile => increment(jsonFile));
  yield tasks;
}).then(() => {
  console.log('incremented');
}).catch(err => {
  console.log(err);
});

异步函数

const fs = require('fs');
const path = require('path');
const co = require('co');
const filehandle = fs.promises;

(async function () {
  try {
    const files = await filehandle.readdir('./data');
    const jsonFiles = files.map(file => path.join('./data', file));
    // increment 为上一节异步函数当中定义的 increment 函数
    const tasks = jsonFiles.map(jsonFile => increment(jsonFile));
    await tasks;
    console.log('incremented');
  } catch (err) {
    console.log(err);
  }
})();

并发执行一组任务,有任何一个任务完成,则执行其他任务

假设./data目录下面有一组 json 文件,现在读取这些文件,只要有一个文件读取完成,就打印出该文件内容。

回调函数

const fs = require('fs');
const path = require('path');
const waterfall = require('async/waterfall');
const race = require('async/race');

waterfall([
  (callback) => {
    fs.readdir('./data', callback);
  },
  (files, callback) => {
    const jsonFiles = files.map(file => path.join('./data', file));
    const tasks = jsonFiles.map(jsonFile => {
      return (cb) => {
        fs.readFile(jsonFile, 'utf8', cb);
      };
    });
    race(tasks, callback);
  }
], (err, result) => {
  if (err) {
    console.log(err);
  } else {
    console.log(result);
  }
});

Promise

const fs = require('fs');
const path = require('path');
const filehandle = fs.promises;

filehandle.readdir('./data')
  .then(files => {
    const jsonFiles = files.map(file => path.join('./data', file));
    const tasks = jsonFiles.map(jsonFile => {
      return filehandle.readFile(jsonFile, 'utf8');
    });
    return Promise.race(tasks);
  })
  .then(result => {
    console.log(result);
  })
  .catch(err => {
    console.log(err);
  });

生成器

const fs = require('fs');
const path = require('path');
const co = require('co');
const filehandle = fs.promises;

co(function* () {
  const files = yield filehandle.readdir('./data')
  const jsonFiles = files.map(file => path.join('./data', file));
  const tasks = jsonFiles.map(jsonFile => {
    return filehandle.readFile(jsonFile, 'utf8');
  });
  return yield Promise.race(tasks);
})
  .then(result => {
    console.log(result);
  })
  .catch(err => {
    console.log(err);
  });

异步函数

const fs = require('fs');
const path = require('path');
const filehandle = fs.promises;

(async function () {
  const files = await filehandle.readdir('./data')
  const jsonFiles = files.map(file => path.join('./data', file));
  const tasks = jsonFiles.map(jsonFile => {
    return filehandle.readFile(jsonFile, 'utf8');
  });
  return await Promise.race(tasks);
})()
  .then(result => {
    console.log(result);
  })
  .catch(err => {
    console.log(err);
  });

并发执行一组任务,所有任务执行完成之后执行其他任务,但是限制同时执行的任务数量上限

假设./data目录下有一组 json 文件,现在并发读取所有文件,读取完所有文件之后则该任务完成,但是限制同时读取的文件数量为 5。

如何实现同时执行任务数量上限?假定上限为 N,所有待执行任务位于一个队列 Q 当中,初始化执行时,从队列 Q 当中取出 N 个任务执行,当且只当一个任务完成之后从队列 Q 当中取出新的任务执行。下面是一种实现,其中taskQueue是一个数组,其元素均为一个函数,表示一个任务,函数返回一个 Promise 对象。

function limit(taskQueue, number) {
  return new Promise((resolve, reject) => {
    const queueLength = taskQueue.length;
    const results = Array(queueLength).fill(null);
    let initNumber = Math.min(queueLength, number);
    let runningNumber = 0;
    let isResolved = false;
    let isRejected = false;

    function runNext() {
      runningNumber++;

      const id = queueLength - taskQueue.length;
      const task = taskQueue.shift();
      task()
        .then((...args) => {
          runningNumber--;
          results[id] = args;
          if (taskQueue.length > 0) {
            runNext();
          }
          if (runningNumber === 0 && !isResolved && !isRejected) {
            resolve(results);
            isResolved = true;
          }
        })
        .catch(err => {
          runningNumber--;
          if (runningNumber === 0 && !isResolved && !isRejected) {
            reject(err);
            isRejected = true;
          }
        });
    }
    while (initNumber--) {
      runNext();
    }
  });
}

回调函数

这里不使用上面的limit函数,而是使用 Async 的parallelLimit函数。

const fs = require('fs');
const path = require('path');
const waterfall = require('async/waterfall');
const parallelLimit = require('async/parallelLimit');

waterfall([
  (callback) => {
    fs.readdir('./data', callback);
  },
  (files, callback) => {
    const jsonFiles = files.map(file => path.join('./data', file));
    const tasks = jsonFiles.map(jsonFile => cb => fs.readFile(jsonFile, 'utf8', cb));
    parallelLimit(tasks, 5, callback);
  },
], (err, results) => {
  if (err) {
    console.log(err);
  } else {
    console.log(results);
  }
});

Promise

const fs = require('fs');
const path = require('path');
const filehandle = fs.promises;

filehandle.readdir('./data')
  .then(files => {
    const jsonFiles = files.map(file => path.join('./data', file));
    const tasks = jsonFiles.map(jsonFile => () => filehandle.readFile(jsonFile, 'utf8'));
    return limit(tasks, 5);
  })
  .then(results => {
    console.log(results);
  })
  .catch(err => {
    console.log(err);
  });

生成器

const fs = require('fs');
const path = require('path');
const co = require('co');
const filehandle = fs.promises;

co(function* () {
  const files = yield filehandle.readdir('./data');
  const jsonFiles = files.map(file => path.join('./data', file));
  const tasks = jsonFiles.map(jsonFile => () => filehandle.readFile(jsonFile, 'utf8'));
  return yield limit(tasks, 5);
})
  .then(results => {
    console.log(results);
  })
  .catch(err => {
    console.log(err);
  });

异步函数

const fs = require('fs');
const path = require('path');
const filehandle = fs.promises;

(async function () {
  const files = await filehandle.readdir('./data');
  const jsonFiles = files.map(file => path.join('./data', file));
  const tasks = jsonFiles.map(jsonFile => () => filehandle.readFile(jsonFile, 'utf8'));
  return await limit(tasks, 5);
})()
  .then(results => {
    console.log(results);
  })
  .catch(err => {
    console.log(err);
  });

Todo: 取消执行、暂停执行、恢复执行、获取执行进度信息

相关连接