Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion proxy-api/src/datasources/subgraph-clients.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class SubgraphClients {
const response = await client.request(query, variables);
return response;
};
const limiterWrapped = await BottleneckLimiters.wrap(endpointIndex, callableClient);
const limiterWrapped = await BottleneckLimiters.wrap(endpointIndex, subgraphName, callableClient);
return limiterWrapped;
}
}
Expand Down
2 changes: 1 addition & 1 deletion proxy-api/src/scheduled/tasks/status.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class StatusTask {
static async checkAll() {
for (const subgraphName of EnvUtil.getEnabledSubgraphs()) {
for (const endpointIndex of EnvUtil.endpointsForSubgraph(subgraphName)) {
const utilization = await BottleneckLimiters.getUtilization(endpointIndex);
const utilization = await BottleneckLimiters.getUtilization(endpointIndex, subgraphName);
if (utilization <= EnvUtil.getStatusCheckMaxUtilization()) {
try {
await SubgraphStatusService.checkFatalError(endpointIndex, subgraphName);
Expand Down
9 changes: 8 additions & 1 deletion proxy-api/src/services/subgraph-status-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ class SubgraphStatusService {
const alchemyStatus = await this._getAlchemyStatus(endpointIndex, subgraphName);
fatalError = alchemyStatus.data.data.indexingStatusForCurrentVersion.fatalError?.message;
break;
case 'ormi':
return undefined;
case 'graph':
// Graph status endpoint has changed, I couldnt find a realiable alternative.
// const graphStatus = await this._getGraphStatus(endpointIndex, subgraphName);
Expand Down Expand Up @@ -45,7 +47,11 @@ class SubgraphStatusService {

static async _getAlchemyStatus(endpointIndex, subgraphName) {
const statusUrl = EnvUtil.underlyingUrl(endpointIndex, subgraphName).replace('/api', '/status');
const status = await BottleneckLimiters.schedule(endpointIndex, async () => await axios.post(statusUrl));
const status = await BottleneckLimiters.schedule(
endpointIndex,
subgraphName,
async () => await axios.post(statusUrl)
);
return status;
}

Expand All @@ -60,6 +66,7 @@ class SubgraphStatusService {

const status = await BottleneckLimiters.schedule(
endpointIndex,
subgraphName,
async () =>
await axios.post(statusUrl, {
operationName: 'SubgraphIndexingStatusFatalError',
Expand Down
5 changes: 5 additions & 0 deletions proxy-api/src/utils/env.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const ENDPOINT_TYPES = process.env.ENDPOINT_TYPES?.split('|');
const ENDPOINT_RATE_LIMITS = process.env.ENDPOINT_RATE_LIMITS?.split('|').map((sg) =>
sg.split(',').map((s) => parseInt(s))
);
const RATE_LIMIT_TYPE = process.env.RATE_LIMIT_TYPE?.split('|');
const ENDPOINT_UTILIZATION_PREFERENCE = process.env.ENDPOINT_UTILIZATION_PREFERENCE?.split('|').map((s) =>
parseFloat(s)
);
Expand Down Expand Up @@ -83,6 +84,10 @@ class EnvUtil {
return ENDPOINT_RATE_LIMITS;
}

static getEndpointRateLimitType(endpointIndex) {
return RATE_LIMIT_TYPE[endpointIndex];
}

static getEndpointUtilizationPreference() {
return ENDPOINT_UTILIZATION_PREFERENCE;
}
Expand Down
49 changes: 31 additions & 18 deletions proxy-api/src/utils/load/bottleneck-limiters.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const EnvUtil = require('../env');

class BottleneckLimiters {
static bottleneckLimiters = [];
static maxPeriodicRquests = [];
static maxPeriodicRequests = [];
static maxReservoirSizes = [];

// Create a limiter for each configured endpoint
Expand All @@ -15,7 +15,7 @@ class BottleneckLimiters {
throw new Error('Invalid .env configuration: bottleneck requires rate limit interval divisible by 250.');
}

this.bottleneckLimiters.push(
const limiterFactory = () =>
new Bottleneck({
reservoir: maxBurst,
reservoirIncreaseAmount: rqPerInterval,
Expand All @@ -26,39 +26,52 @@ class BottleneckLimiters {
// The burst limiter would have no minTime as it would allow for making many concurrent requests
// when there is initially no traffic. Not necessary to implement at this time.
minTime: Math.ceil(interval / rqPerInterval)
})
);
this.maxPeriodicRquests.push(rqPerInterval);
});

let makeSubgraphLimiter;
if (EnvUtil.getEndpointRateLimitType(i) === 'per-subgraph') {
makeSubgraphLimiter = () => limiterFactory();
} else {
const lim = limiterFactory();
makeSubgraphLimiter = () => lim;
}

for (const sgName of EnvUtil.subgraphsForEndpoint(i)) {
(this.bottleneckLimiters[i] ??= {})[sgName] = makeSubgraphLimiter();
}

this.bottleneckLimiters.push();
this.maxPeriodicRequests.push(rqPerInterval);
this.maxReservoirSizes.push(maxBurst);
}
}

static async wrap(endpointIndex, fnToWrap) {
if (await this.isBurstDepleted(endpointIndex)) {
throw new RateLimitError(`Exceeded rate limit for e-${endpointIndex}.`);
static async wrap(endpointIndex, subgraphName, fnToWrap) {
if (await this.isBurstDepleted(endpointIndex, subgraphName)) {
throw new RateLimitError(`Exceeded rate limit for e-${endpointIndex}-${subgraphName}.`);
}
return this.bottleneckLimiters[endpointIndex].wrap(fnToWrap);
return this.bottleneckLimiters[endpointIndex][subgraphName].wrap(fnToWrap);
}

static async schedule(endpointIndex, fnToSchedule) {
if (await this.isBurstDepleted(endpointIndex)) {
throw new RateLimitError(`Exceeded rate limit for e-${endpointIndex}.`);
static async schedule(endpointIndex, subgraphName, fnToSchedule) {
if (await this.isBurstDepleted(endpointIndex, subgraphName)) {
throw new RateLimitError(`Exceeded rate limit for e-${endpointIndex}-${subgraphName}.`);
}
return await this.bottleneckLimiters[endpointIndex].schedule(fnToSchedule);
return await this.bottleneckLimiters[endpointIndex][subgraphName].schedule(fnToSchedule);
}

static async isBurstDepleted(endpointIndex) {
return (await this.bottleneckLimiters[endpointIndex].currentReservoir()) === 0;
static async isBurstDepleted(endpointIndex, subgraphName) {
return (await this.bottleneckLimiters[endpointIndex][subgraphName].currentReservoir()) === 0;
}

// Returns the utilization as a ratio of current active requests / max rq per interval.
// Can exceed 100%
static async getUtilization(endpointIndex) {
const currentReservoir = await this.bottleneckLimiters[endpointIndex].currentReservoir();
static async getUtilization(endpointIndex, subgraphName) {
const currentReservoir = await this.bottleneckLimiters[endpointIndex][subgraphName].currentReservoir();
// These aren't necessarily still executing, but they are considered "active" in that they
// were either scheduled recently or are queued to be executed.
const activeRequests = this.maxReservoirSizes[endpointIndex] - currentReservoir;
return activeRequests / this.maxPeriodicRquests[endpointIndex];
return activeRequests / this.maxPeriodicRequests[endpointIndex];
}
}

Expand Down
7 changes: 5 additions & 2 deletions proxy-api/src/utils/load/endpoint-balance.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class EndpointBalanceUtil {
static async getSubgraphUtilization(subgraphName) {
const utilization = {};
for (const endpointIndex of EnvUtil.endpointsForSubgraph(subgraphName)) {
utilization[endpointIndex] = await BottleneckLimiters.getUtilization(endpointIndex);
utilization[endpointIndex] = await BottleneckLimiters.getUtilization(endpointIndex, subgraphName);
}
return utilization;
}
Expand All @@ -56,7 +56,10 @@ class EndpointBalanceUtil {
let options = [];
// Remove blacklisted/overutilized endpoints
for (const endpointIndex of subgraphEndpoints) {
if (!(await BottleneckLimiters.isBurstDepleted(endpointIndex)) && !blacklist.includes(endpointIndex)) {
if (
!(await BottleneckLimiters.isBurstDepleted(endpointIndex, subgraphName)) &&
!blacklist.includes(endpointIndex)
) {
options.push(endpointIndex);
}
}
Expand Down