亚洲精品久久久中文字幕-亚洲精品久久片久久-亚洲精品久久青草-亚洲精品久久婷婷爱久久婷婷-亚洲精品久久午夜香蕉

您的位置:首頁技術文章
文章詳情頁

JS 實現請求調度器

瀏覽:71日期:2024-04-04 10:58:49

前言:JS 天然支持并行請求,但與此同時會帶來一些問題,比如會造成目標服務器壓力過大,所以本文引入“請求調度器”來節制并發度。

TLDR; 直接跳轉『抽象和復用』章節。

為了獲取一批互不依賴的資源,通常從性能考慮可以用 Promise.all(arrayOfPromises)來并發執行。比如我們已有 100 個應用的 id,需求是聚合所有應用的 PV,我們通常會這么寫:

const ids = [1001, 1002, 1003, 1004, 1005];const urlPrefix = ’http://opensearch.example.com/api/apps’;// fetch 函數發送 HTTP 請求,返回 Promiseconst appPromises = ids.map(id => `${urlPrefix}/${id}`).map(fetch);Promise.all(appPromises) // 通過 reduce 做累加 .then(apps => apps.reduce((initial, current) => initial + current.pv, 0)) .catch((error) => console.log(error));

上面的代碼在應用個數不多的情況下,可以運行正常。當應用個數達到成千上萬時,對支持并發數不是很好的系統,你的「壓測」會把第三放服務器搞掛,暫時無法響應請求:

<html><head><title>502 Bad Gateway</title></head><body bgcolor='white'><center><h1>502 Bad Gateway</h1></center><hr><center>nginx/1.10.1</center></body></html>

如何解決呢?

一個很自然的想法是,既然不支持這么多的并發請求,那就分割成幾大塊,每塊為一個 chunk,chunk 內部的請求依然并發,但塊的大小(chunkSize)限制在系統支持的最大并發數以內。前一個 chunk 結束后一個 chunk 才能繼續執行,也就是說 chunk 內部的請求是并發的,但 chunk 之間是串行的。思路其實很簡單,寫起來卻有一定難度。總結起來三個操作:分塊、串行、聚合

難點在如何串行執行 Promise,Promise 僅提供了并行(Promise.all)功能,并沒有提供串行功能。我們從簡單的三個請求開始,看如何實現,啟發式解決問題(heuristic)。

// task1, task2, task3 是三個返回 Promise 的工廠函數,模擬我們的異步請求const task1 = () => new Promise((resolve) => { setTimeout(() => { resolve(1); console.log(’task1 executed’); }, 1000);});const task2 = () => new Promise((resolve) => { setTimeout(() => { resolve(2); console.log(’task2 executed’); }, 1000);});const task3 = () => new Promise((resolve) => { setTimeout(() => { resolve(3); console.log(’task3 executed’); }, 1000);});// 聚合結果let result = 0;const resultPromise = [task1, task2, task3].reduce((current, next) => current.then((number) => { console.log(’resolved with number’, number); // task2, task3 的 Promise 將在這里被 resolve result += number; return next(); }), Promise.resolve(0)) // 聚合初始值 .then(function(last) { console.log(’The last promise resolved with number’, last); // task3 的 Promise 在這里被 resolve result += last; console.log(’all executed with result’, result); return Promise.resolve(result); });

運行結果如圖 1:

JS 實現請求調度器

代碼解析:我們想要的效果,直觀展示其實是 fn1().then(() => fn2()).then(() => fn3())。上面代碼能讓一組 Promise 按順序執行的關鍵之處就在 reduce 這個“引擎”在一步步推動 Promise 工廠函數的執行。

難點解決了,我們看看最終代碼:

/** * 模擬 HTTP 請求 * @param {String} url * @return {Promise} */function fetch(url) { console.log(`Fetching ${url}`); return new Promise((resolve) => { setTimeout(() => resolve({ pv: Number(url.match(/d+$/)) }), 2000); });}const urlPrefix = ’http://opensearch.example.com/api/apps’;const aggregator = { /** * 入口方法,開啟定時任務 * * @return {Promise} */ start() { return this.fetchAppIds() .then(ids => this.fetchAppsSerially(ids, 2)) .then(apps => this.sumPv(apps)) .catch(error => console.error(error)); }, /** * 獲取所有應用的 ID * * @private * * @return {Promise} */ fetchAppIds() { return Promise.resolve([1001, 1002, 1003, 1004, 1005]); }, promiseFactory(ids) { return () => Promise.all(ids.map(id => `${urlPrefix}/${id}`).map(fetch)); }, /** * 獲取所有應用的詳情 * * 一次并發請求 `concurrency` 個應用,稱為一個 chunk * 前一個 `chunk` 并發完成后一個才繼續,直至所有應用獲取完畢 * * @private * * @param {[Number]} ids * @param {Number} concurrency 一次并發的請求數量 * @return {[Object]} 所有應用的信息 */ fetchAppsSerially(ids, concurrency = 100) { // 分塊 let chunkOfIds = ids.splice(0, concurrency); const tasks = []; while (chunkOfIds.length !== 0) { tasks.push(this.promiseFactory(chunkOfIds)); chunkOfIds = ids.splice(0, concurrency); } // 按塊順序執行 const result = []; return tasks.reduce((current, next) => current.then((chunkOfApps) => { console.info(’Chunk of’, chunkOfApps.length, ’concurrency requests has finished with result:’, chunkOfApps, ’nn’); result.push(...chunkOfApps); // 拍扁數組 return next(); }), Promise.resolve([])) .then((lastchunkOfApps) => { console.info(’Chunk of’, lastchunkOfApps.length, ’concurrency requests has finished with result:’, lastchunkOfApps, ’nn’); result.push(...lastchunkOfApps); // 再次拍扁它 console.info(’All chunks has been executed with result’, result); return result; }); }, /** * 聚合所有應用的 PV * * @private * * @param {[]} apps * @return {[type]} [description] */ sumPv(apps) { const initial = { pv: 0 }; return apps.reduce((accumulator, app) => ({ pv: accumulator.pv + app.pv }), initial); }};// 開始運行aggregator.start().then(console.log);

運行結果如圖 2:

JS 實現請求調度器

抽象和復用

目的達到了,因具備通用性,下面開始抽象成一個模式以便復用。

串行

先模擬一個 http get 請求。

/** * mocked http get. * @param {string} url * @returns {{ url: string; delay: number; }} */function httpGet(url) { const delay = Math.random() * 1000; console.info(’GET’, url); return new Promise((resolve) => { setTimeout(() => { resolve({ url, delay, at: Date.now() }) }, delay); })}

串行執行一批請求。

const ids = [1, 2, 3, 4, 5, 6, 7];// 批量請求函數,注意是 delay 執行的『函數』對了,否則會立即將請求發送出去,達不到串行的目的const httpGetters = ids.map(id => () => httpGet(`https://jsonplaceholder.typicode.com/posts/${id}`));// 串行執行之const tasks = await httpGetters.reduce((acc, cur) => { return acc.then(cur); // 簡寫,等價于 // return acc.then(() => cur());}, Promise.resolve());tasks.then(() => { console.log(’done’);});

注意觀察控制臺輸出,應該串行輸出以下內容:

GET https://jsonplaceholder.typicode.com/posts/1GET https://jsonplaceholder.typicode.com/posts/2GET https://jsonplaceholder.typicode.com/posts/3GET https://jsonplaceholder.typicode.com/posts/4GET https://jsonplaceholder.typicode.com/posts/5GET https://jsonplaceholder.typicode.com/posts/6GET https://jsonplaceholder.typicode.com/posts/7分段串行,段中并行

重點來了。本文的請求調度器實現

/** * Schedule promises. * @param {Array<(...arg: any[]) => Promise<any>>} factories * @param {number} concurrency */function schedulePromises(factories, concurrency) { /** * chunk * @param {any[]} arr * @param {number} size * @returns {Array<any[]>} */ const chunk = (arr, size = 1) => { return arr.reduce((acc, cur, idx) => { const modulo = idx % size; if (modulo === 0) { acc[acc.length] = [cur]; } else { acc[acc.length - 1].push(cur); } return acc; }, []) }; const chunks = chunk(factories, concurrency); let resps = []; return chunks.reduce( (acc, cur) => { return acc .then(() => { console.log(’---’); return Promise.all(cur.map(f => f())); }) .then((intermediateResponses) => { resps.push(...intermediateResponses); return resps; }) }, Promise.resolve() );}

測試下,執行調度器:

// 分段串行,段中并行schedulePromises(httpGetters, 3).then((resps) => { console.log(’resps:’, resps);});

控制臺輸出:

---GET https://jsonplaceholder.typicode.com/posts/1GET https://jsonplaceholder.typicode.com/posts/2GET https://jsonplaceholder.typicode.com/posts/3---GET https://jsonplaceholder.typicode.com/posts/4GET https://jsonplaceholder.typicode.com/posts/5GET https://jsonplaceholder.typicode.com/posts/6---GET https://jsonplaceholder.typicode.com/posts/7resps: [ { 'url': 'https://jsonplaceholder.typicode.com/posts/1', 'delay': 733.010980640727, 'at': 1615131322163 }, { 'url': 'https://jsonplaceholder.typicode.com/posts/2', 'delay': 594.5056229848931, 'at': 1615131322024 }, { 'url': 'https://jsonplaceholder.typicode.com/posts/3', 'delay': 738.8230109146299, 'at': 1615131322168 }, { 'url': 'https://jsonplaceholder.typicode.com/posts/4', 'delay': 525.4604386109747, 'at': 1615131322698 }, { 'url': 'https://jsonplaceholder.typicode.com/posts/5', 'delay': 29.086379722201183, 'at': 1615131322201 }, { 'url': 'https://jsonplaceholder.typicode.com/posts/6', 'delay': 592.2345027398272, 'at': 1615131322765 }, { 'url': 'https://jsonplaceholder.typicode.com/posts/7', 'delay': 513.0684467560949, 'at': 1615131323284 }]總結 如果并發請求的數量太大,可以考慮分塊串行,塊中請求并發。 問題看似復雜,不放先簡化之,然后一步步推導出關鍵點,最后抽象,就能找到解決方案。 本文的精髓在于使用 reduce 作為串行推動的引擎,故掌握其對我們日常開發遇到的迷局破解可提供新思路,reduce 精通見上篇 你終于用 Reduce 了 🎉。

以上就是JS 實現請求調度器的詳細內容,更多關于JS 請求調度器的資料請關注好吧啦網其它相關文章!

標簽: JavaScript
相關文章:
主站蜘蛛池模板: 亚洲精品国产专区一区 | 一级黄网站 | 欧美人成一本免费观看视频 | 欧美三区| 中文字幕一区2区 | 国产精品视频网址 | 爽爽爽爽爽爽a成人免费视频 | 日韩欧美亚洲一区精选 | 国产午夜精品尤物福利视频 | 1000部拍拍拍18勿入免费凤凰福利 | 亚洲精品午夜国产va久久成人 | 91短视频在线高清hd | 在线观看视频一区二区 | 久久人成 | a网站| 免费视频久久久 | 国产精品原创巨作无遮挡 | 看片久久| 日韩 欧美 亚洲 | 日韩欧美色综合 | 欧美黄色片在线播放 | 最近更新中文字幕第一页 | 男女做www免费高清视频 | 日本特级全黄一级毛片 | 亚洲国产天堂久久综合 | 色yeye成人免费视频 | 国产在线精品福利91香蕉 | 免费黄色在线网址 | 一级在线观看视频 | 日本特黄a级高清免费酷网 日本特黄一级 | 伊人中文字幕在线 | 国产精品午夜在线观看 | 国产成人污污网站在线观看 | 大尺度一级毛片波多野结衣 | 偷窥自拍有声 | 劲爆欧美第1页婷婷 | 操出白浆视频 | 黄色一级大片免费看 | 簧片免费网站 | 织田真子中文字幕 | 99欧美精品|