mirror of
https://activitypub.software/TransFem-org/Sharkey.git
synced 2025-04-13 09:44:40 +00:00
track the number of concurrent requests to redis, and bypass if the request is guaranteed to reject
This commit is contained in:
parent
47ea8527fd
commit
922a7ba1d4
2 changed files with 124 additions and 60 deletions
|
@ -42,6 +42,10 @@ While performance has not been formally tested, it's expected that SkRateLimiter
|
|||
Redis memory usage should be notably lower due to the reduced number of keys and avoidance of set / array constructions.
|
||||
If redis load does become a concern, then a dedicated node can be assigned via the `redisForRateLimit` config setting.
|
||||
|
||||
To prevent Redis DoS, SkRateLimiterService internally tracks the number of concurrent requests for each unique client/endpoint combination.
|
||||
If the number of requests exceeds the limit's maximum value, then any further requests are automatically rejected.
|
||||
The lockout will automatically end when the number of active requests drops to within the limit value.
|
||||
|
||||
## Concurrency and Multi-Node Correctness
|
||||
|
||||
To provide consistency across multi-node environments, leaky bucket is implemented with only atomic operations (`Increment`, `Decrement`, `Add`, and `Subtract`).
|
||||
|
|
|
@ -17,9 +17,14 @@ import type { RoleService } from '@/core/RoleService.js';
|
|||
// Required because MemoryKVCache doesn't support null keys.
|
||||
const defaultUserKey = '';
|
||||
|
||||
interface Lockout {
|
||||
at: number;
|
||||
info: LimitInfo;
|
||||
interface ParsedLimit {
|
||||
key: string;
|
||||
now: number;
|
||||
bucketSize: number;
|
||||
dripRate: number;
|
||||
dripSize: number;
|
||||
fullResetMs: number;
|
||||
fullResetSec: number;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
|
@ -27,7 +32,8 @@ export class SkRateLimiterService {
|
|||
// 1-minute cache interval
|
||||
private readonly factorCache = new MemoryKVCache<number>(1000 * 60);
|
||||
// 10-second cache interval
|
||||
private readonly lockoutCache = new MemoryKVCache<Lockout>(1000 * 10);
|
||||
private readonly lockoutCache = new MemoryKVCache<number>(1000 * 10);
|
||||
private readonly requestCounts = new Map<string, number>();
|
||||
private readonly disabled: boolean;
|
||||
|
||||
constructor(
|
||||
|
@ -65,14 +71,7 @@ export class SkRateLimiterService {
|
|||
}
|
||||
|
||||
const actor = typeof(actorOrUser) === 'object' ? actorOrUser.id : actorOrUser;
|
||||
|
||||
// TODO add to docs
|
||||
// Fast-path to avoid extra redis calls for blocked clients
|
||||
const lockoutKey = `@${actor}#${limit.key}`;
|
||||
const lockout = this.getLockout(lockoutKey);
|
||||
if (lockout) {
|
||||
return lockout;
|
||||
}
|
||||
const actorKey = `@${actor}#${limit.key}`;
|
||||
|
||||
const userCacheKey = typeof(actorOrUser) === 'object' ? actorOrUser.id : defaultUserKey;
|
||||
const userRoleKey = typeof(actorOrUser) === 'object' ? actorOrUser.id : null;
|
||||
|
@ -89,66 +88,81 @@ export class SkRateLimiterService {
|
|||
throw new Error(`Rate limit factor is zero or negative: ${factor}`);
|
||||
}
|
||||
|
||||
const info = await this.applyLimit(limit, actor, factor);
|
||||
|
||||
// Store blocked status to avoid hammering redis
|
||||
if (info.blocked) {
|
||||
this.lockoutCache.set(lockoutKey, {
|
||||
at: this.timeService.now,
|
||||
info,
|
||||
});
|
||||
const parsedLimit = this.parseLimit(limit, factor);
|
||||
if (parsedLimit == null) {
|
||||
return disabledLimitInfo;
|
||||
}
|
||||
|
||||
return info;
|
||||
// Fast-path to avoid extra redis calls for blocked clients
|
||||
const lockout = this.getLockout(actorKey, parsedLimit);
|
||||
if (lockout) {
|
||||
return lockout;
|
||||
}
|
||||
|
||||
// Fast-path to avoid queuing requests that are guaranteed to fail
|
||||
const overflow = this.incrementOverflow(actorKey, parsedLimit);
|
||||
if (overflow) {
|
||||
return overflow;
|
||||
}
|
||||
|
||||
try {
|
||||
const info = await this.limitBucket(parsedLimit, actor);
|
||||
|
||||
// Store blocked status to avoid hammering redis
|
||||
if (info.blocked) {
|
||||
this.lockoutCache.set(actorKey, info.resetMs);
|
||||
}
|
||||
|
||||
return info;
|
||||
} finally {
|
||||
this.decrementOverflow(actorKey);
|
||||
}
|
||||
}
|
||||
|
||||
private getLockout(lockoutKey: string): LimitInfo | null {
|
||||
const lockout = this.lockoutCache.get(lockoutKey);
|
||||
if (!lockout) {
|
||||
private getLockout(lockoutKey: string, limit: ParsedLimit): LimitInfo | null {
|
||||
const lockoutReset = this.lockoutCache.get(lockoutKey);
|
||||
if (!lockoutReset) {
|
||||
// Not blocked, proceed with redis check
|
||||
return null;
|
||||
}
|
||||
|
||||
const now = this.timeService.now;
|
||||
const elapsedMs = now - lockout.at;
|
||||
if (elapsedMs >= lockout.info.resetMs) {
|
||||
if (limit.now >= lockoutReset) {
|
||||
// Block expired, clear and proceed with redis check
|
||||
this.lockoutCache.delete(lockoutKey);
|
||||
return null;
|
||||
}
|
||||
|
||||
// Limit is still active, update calculations
|
||||
lockout.at = now;
|
||||
lockout.info.resetMs -= elapsedMs;
|
||||
lockout.info.resetSec = Math.ceil(lockout.info.resetMs / 1000);
|
||||
lockout.info.fullResetMs -= elapsedMs;
|
||||
lockout.info.fullResetSec = Math.ceil(lockout.info.fullResetMs / 1000);
|
||||
|
||||
// Re-cache the new object
|
||||
this.lockoutCache.set(lockoutKey, lockout);
|
||||
return lockout.info;
|
||||
// Lockout is still active, pre-emptively reject the request
|
||||
return {
|
||||
blocked: true,
|
||||
remaining: 0,
|
||||
resetMs: limit.fullResetMs,
|
||||
resetSec: limit.fullResetSec,
|
||||
fullResetMs: limit.fullResetMs,
|
||||
fullResetSec: limit.fullResetSec,
|
||||
};
|
||||
}
|
||||
|
||||
private async applyLimit(limit: Keyed<RateLimit>, actor: string, factor: number) {
|
||||
private parseLimit(limit: Keyed<RateLimit>, factor: number): ParsedLimit | null {
|
||||
if (isLegacyRateLimit(limit)) {
|
||||
return await this.limitLegacy(limit, actor, factor);
|
||||
return this.parseLegacyLimit(limit, factor);
|
||||
} else {
|
||||
return await this.limitBucket(limit, actor, factor);
|
||||
return this.parseBucketLimit(limit, factor);
|
||||
}
|
||||
}
|
||||
|
||||
private async limitLegacy(limit: Keyed<LegacyRateLimit>, actor: string, factor: number): Promise<LimitInfo> {
|
||||
private parseLegacyLimit(limit: Keyed<LegacyRateLimit>, factor: number): ParsedLimit | null {
|
||||
if (hasMaxLimit(limit)) {
|
||||
return await this.limitLegacyMinMax(limit, actor, factor);
|
||||
return this.parseLegacyMinMax(limit, factor);
|
||||
} else if (hasMinLimit(limit)) {
|
||||
return await this.limitLegacyMinOnly(limit, actor, factor);
|
||||
return this.parseLegacyMinOnly(limit, factor);
|
||||
} else {
|
||||
return disabledLimitInfo;
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private async limitLegacyMinMax(limit: Keyed<MaxLegacyLimit>, actor: string, factor: number): Promise<LimitInfo> {
|
||||
if (limit.duration === 0) return disabledLimitInfo;
|
||||
private parseLegacyMinMax(limit: Keyed<MaxLegacyLimit>, factor: number): ParsedLimit | null {
|
||||
if (limit.duration === 0) return null;
|
||||
if (limit.duration < 0) throw new Error(`Invalid rate limit ${limit.key}: duration is negative (${limit.duration})`);
|
||||
if (limit.max < 1) throw new Error(`Invalid rate limit ${limit.key}: max is less than 1 (${limit.max})`);
|
||||
|
||||
|
@ -161,35 +175,30 @@ export class SkRateLimiterService {
|
|||
// Calculate final dripRate from dripSize and duration/max
|
||||
const dripRate = Math.max(Math.round(limit.duration / (limit.max / dripSize)), 1);
|
||||
|
||||
const bucketLimit: Keyed<BucketRateLimit> = {
|
||||
return this.parseBucketLimit({
|
||||
type: 'bucket',
|
||||
key: limit.key,
|
||||
size: limit.max,
|
||||
dripRate,
|
||||
dripSize,
|
||||
};
|
||||
return await this.limitBucket(bucketLimit, actor, factor);
|
||||
}, factor);
|
||||
}
|
||||
|
||||
private async limitLegacyMinOnly(limit: Keyed<MinLegacyLimit>, actor: string, factor: number): Promise<LimitInfo> {
|
||||
if (limit.minInterval === 0) return disabledLimitInfo;
|
||||
private parseLegacyMinOnly(limit: Keyed<MinLegacyLimit>, factor: number): ParsedLimit | null {
|
||||
if (limit.minInterval === 0) return null;
|
||||
if (limit.minInterval < 0) throw new Error(`Invalid rate limit ${limit.key}: minInterval is negative (${limit.minInterval})`);
|
||||
|
||||
const dripRate = Math.max(Math.round(limit.minInterval), 1);
|
||||
const bucketLimit: Keyed<BucketRateLimit> = {
|
||||
return this.parseBucketLimit({
|
||||
type: 'bucket',
|
||||
key: limit.key,
|
||||
size: 1,
|
||||
dripRate,
|
||||
dripSize: 1,
|
||||
};
|
||||
return await this.limitBucket(bucketLimit, actor, factor);
|
||||
}, factor);
|
||||
}
|
||||
|
||||
/**
|
||||
* Implementation of Leaky Bucket rate limiting - see SkRateLimiterService.md for details.
|
||||
*/
|
||||
private async limitBucket(limit: Keyed<BucketRateLimit>, actor: string, factor: number): Promise<LimitInfo> {
|
||||
private parseBucketLimit(limit: Keyed<BucketRateLimit>, factor: number): ParsedLimit {
|
||||
if (limit.size < 1) throw new Error(`Invalid rate limit ${limit.key}: size is less than 1 (${limit.size})`);
|
||||
if (limit.dripRate != null && limit.dripRate < 1) throw new Error(`Invalid rate limit ${limit.key}: dripRate is less than 1 (${limit.dripRate})`);
|
||||
if (limit.dripSize != null && limit.dripSize < 1) throw new Error(`Invalid rate limit ${limit.key}: dripSize is less than 1 (${limit.dripSize})`);
|
||||
|
@ -199,7 +208,27 @@ export class SkRateLimiterService {
|
|||
const bucketSize = Math.max(Math.ceil(limit.size / factor), 1);
|
||||
const dripRate = Math.ceil(limit.dripRate ?? 1000);
|
||||
const dripSize = Math.ceil(limit.dripSize ?? 1);
|
||||
const expirationSec = Math.max(Math.ceil((dripRate * Math.ceil(bucketSize / dripSize)) / 1000), 1);
|
||||
const fullResetMs = dripRate * Math.ceil(bucketSize / dripSize);
|
||||
const fullResetSec = Math.max(Math.ceil(fullResetMs / 1000), 1);
|
||||
|
||||
return {
|
||||
key: limit.key,
|
||||
now,
|
||||
bucketSize,
|
||||
dripRate,
|
||||
dripSize,
|
||||
fullResetMs,
|
||||
fullResetSec,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Implementation of Leaky Bucket rate limiting - see SkRateLimiterService.md for details.
|
||||
*/
|
||||
private async limitBucket(limit: ParsedLimit, actor: string): Promise<LimitInfo> {
|
||||
// 0 - Calculate (extracted to other function)
|
||||
const { now, bucketSize, dripRate, dripSize } = limit;
|
||||
const expirationSec = limit.fullResetSec;
|
||||
|
||||
// 1 - Read
|
||||
const counterKey = createLimitKey(limit, actor, 'c');
|
||||
|
@ -319,13 +348,44 @@ export class SkRateLimiterService {
|
|||
|
||||
return responses;
|
||||
}
|
||||
|
||||
private incrementOverflow(actorKey: string, limit: ParsedLimit): LimitInfo | null {
|
||||
const oldCount = this.requestCounts.get(actorKey) ?? 0;
|
||||
|
||||
if (oldCount >= limit.bucketSize) {
|
||||
// Overflow, pre-emptively reject the request
|
||||
return {
|
||||
blocked: true,
|
||||
remaining: 0,
|
||||
resetMs: limit.fullResetMs,
|
||||
resetSec: limit.fullResetSec,
|
||||
fullResetMs: limit.fullResetMs,
|
||||
fullResetSec: limit.fullResetSec,
|
||||
};
|
||||
}
|
||||
|
||||
// No overflow, increment and continue to redis
|
||||
this.requestCounts.set(actorKey, oldCount + 1);
|
||||
return null;
|
||||
}
|
||||
|
||||
private decrementOverflow(actorKey: string): void {
|
||||
const count = this.requestCounts.get(actorKey);
|
||||
if (count) {
|
||||
if (count > 1) {
|
||||
this.requestCounts.set(actorKey, count - 1);
|
||||
} else {
|
||||
this.requestCounts.delete(actorKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Not correct, but good enough for the basic commands we use.
|
||||
type RedisResult = string | null;
|
||||
type RedisCommand = [command: string, ...args: unknown[]];
|
||||
|
||||
function createLimitKey(limit: Keyed<RateLimit>, actor: string, value: string): string {
|
||||
function createLimitKey(limit: ParsedLimit, actor: string, value: string): string {
|
||||
return `rl_${actor}_${limit.key}_${value}`;
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue