首頁>Program>source

我需要對某些外部API进行一些延迟的迴圈呼叫,以防止出現"超出使用者速率限製"的限製。

Google Maps Geocoding API對" req / sec"敏感,允许10 req / sec.我應该對數百个聯系人进行地理編碼,因此需要這樣的延迟.因此,我需要有10个異步地理編碼功能,每个功能都需要延迟1秒.因此,我收集陣列中的所有聯系人,然後以異步方式遍歷陣列.

通常,我需要同時拥有N个執行緒,每个執行緒的末尾都有D毫秒的延迟.整个迴圈遍歷一系列User實體.每个執行緒照常處理單个實體。

我想有這樣的代碼:

const N = 10;   # threads count
const D = 1000; # delay after each execution
var processUser = function(user, callback){ 
  someBusinessLogicProc(user, function(err) {
    setTimeout(function() {
      return callback(err);
    }, D);
  });      
 }
 var async = require('async') ;
 var people = new Array(900);
 async.batchMethod(people, processUser, N, finalCallback);

在此偽代碼 batchMethod中 是我要的一種方法。

最新回復
  • 5月前
    1 #

    延迟結果並不是您真正想要的.相反,您想跟蹤已發送的內容以及發送的時間,以便一旦您落入每秒請求的範圍之內,就可以發送另一个請求。


    這是函式的一般概念,它將控製您的速率限製為每秒固定數量的請求.這使用了Promise,並要求您提供一个返迴Promise的請求函式(如果您現在不使用Promise,則只需將您的request函式包裝在Promise中即可)。

    // pass the following arguments:
    //   array - array of values to iterate
    //   requestsPerSec - max requests per second to send (integer)
    //   maxInFlight - max number of requests in process at a time
    //   fn - function to process an array value
    //        function is passed array element as first argument
    //        function returns a promise that is resolved/rejected when async operation is done
    // Returns: promise that is resolved with an array of resolves values
    //          or rejected with first error that occurs
    function rateLimitMap(array, requestsPerSec, maxInFlight, fn) {
        return new Promise(function(resolve, reject) {
            var index = 0;
            var inFlightCntr = 0;
            var doneCntr = 0;
            var launchTimes = [];
            var results = new Array(array.length);
            // calculate num requests in last second
            function calcRequestsInLastSecond() {
                var now = Date.now();
                // look backwards in launchTimes to see how many were launched within the last second
                var cnt = 0;
                for (var i = launchTimes.length - 1; i >= 0; i--) {
                    if (now - launchTimes[i] < 1000) {
                        ++cnt;
                    } else {
                        break;
                    }
                }
                return cnt;            
            }
            function runMore() {
                while (index < array.length && inFlightCntr < maxInFlight && calcRequestsInLastSecond() < requestsPerSec) {
                    (function(i) {
                        ++inFlightCntr;
                        launchTimes.push(Date.now());
                        fn(array[i]).then(function(val) {
                            results[i] = val;
                            --inFlightCntr;
                            ++doneCntr;
                            runMore();
                        }, reject);
                    })(index);
                    ++index;
                }
                // see if we're done
                if (doneCntr === array.length) {
                    resolve(results);
                } else if (launchTimes.length > requestsPerSec) {
                    // calc how long we have to wait before sending more
                    var delta = 1000 - (Date.now() - launchTimes[launchTimes.length - requestsPerSec]);
                    if (delta > 0) {
                        setTimeout(runMore, delta);
                    }
                }
            }
            runMore();
        });
    }
    

    示例用法:

    rateLimitMap(inputArrayToProcess, 9, 20, myRequestFunc).then(function(results) {
        // process array of results here
    }, function(err) {
        // process error here
    });
    

    此代碼背後的总體思路是:

    您傳入一个陣列以进行迭代

    它返迴一个诺言,谁的解決value是一系列結果(按順序)

    您傳遞的請求數達到了歷史最高水平

    您同時傳遞了飞行中的最大請求數

    您傳遞了一个函式,该函式將傳遞要迭代的陣列中的元素,並且必须返迴promise

    它保留最後一次發送請求時的時間戳陣列.

    要查看是否可以發送另一个請求,它会在陣列中向後看,並計算最後一秒發送了多少个請求.

    如果该數字低於阈值,那麼它將發送另一个.

    如果该數字達到阈值,那麼它將計算您需要等待多长時間才能發送另一个數字,並為该時間設置一个計時器。

    完成每个請求後,它会檢查是否可以發送更多請求

    如果任何請求拒绝其承诺,則返迴的承诺將立即拒绝.如果您不希望它在出現第一个錯誤時停止,請修改傳入的函式以使其不拒绝,而是使用一些值进行解析,以便稍後在處理結果時將其標識為失败的請求。

    這是一个有效的模擬:https://jsfiddle.net/jfriend00/3gr0tq7k/

    註意:如果是 maxInFlight 您傳遞的值高於 requestsPerSec 值,那麼此函式基本上只会發送requestPerSec請求,然後一秒钟後再發送另一个requestPerSec請求,因為這是停留在 requestsPerSec下的最快方法 邊界.如果是 maxInFlight 值等於或小於 requestsPerSec 然後它將發送 requestsPerSec 然後在每个請求完成時,它將查看是否可以發送另一个請求。

相似問題

  • c++:什麼是Intel HD 3000上正確的OpenGL初始化?
  • asp.net mvc 4:我可以在"實體框架代碼優先"中指定全域性對映規則吗?