Switching to prometheus middleware that cleans the prometheus archive local to the app. Also adding feed histogram to measure subscription counts for premium segmentation.

This commit is contained in:
Samuel Clay 2025-07-09 09:57:40 -04:00
parent cb74bbaf11
commit e812b4a599
4 changed files with 328 additions and 13 deletions

View file

@ -0,0 +1,152 @@
from datetime import timedelta
from django.contrib.auth.models import User
from django.core.management.base import BaseCommand
from django.db.models import Count, Q
from django.utils import timezone
from apps.profile.models import Profile
from apps.reader.models import UserSubscription
class Command(BaseCommand):
help = "Generate ASCII histograms of feed counts for different user tiers"
def handle(self, *args, **options):
self.stdout.write("Fetching user data...")
# Premium users (premium but not archive/pro)
premium_users = User.objects.filter(
profile__is_premium=True,
profile__is_archive=False,
profile__is_pro=False,
profile__premium_expire__gte=timezone.now(),
).values_list("id", flat=True)
premium_feeds = list(
UserSubscription.objects.filter(user_id__in=premium_users)
.values("user")
.annotate(feed_count=Count("id"))
.values_list("feed_count", flat=True)
)
# Free users
free_users = User.objects.filter(
Q(profile__is_premium=False) | Q(profile__premium_expire__lt=timezone.now())
).values_list("id", flat=True)
free_feeds = list(
UserSubscription.objects.filter(user_id__in=free_users)
.values("user")
.annotate(feed_count=Count("id"))
.values_list("feed_count", flat=True)
)
# Archive users (archive but not pro)
archive_users = User.objects.filter(
profile__is_premium=True,
profile__is_archive=True,
profile__is_pro=False,
profile__premium_expire__gte=timezone.now(),
).values_list("id", flat=True)
archive_feeds = list(
UserSubscription.objects.filter(user_id__in=archive_users)
.values("user")
.annotate(feed_count=Count("id"))
.values_list("feed_count", flat=True)
)
# Pro users
pro_users = User.objects.filter(
profile__is_premium=True, profile__is_pro=True, profile__premium_expire__gte=timezone.now()
).values_list("id", flat=True)
pro_feeds = list(
UserSubscription.objects.filter(user_id__in=pro_users)
.values("user")
.annotate(feed_count=Count("id"))
.values_list("feed_count", flat=True)
)
# Active old users (active in last 365 days, account 2+ years old)
one_year_ago = timezone.now() - timedelta(days=365)
two_years_ago = timezone.now() - timedelta(days=730)
active_old_users = User.objects.filter(
profile__last_seen_on__gte=one_year_ago, date_joined__lte=two_years_ago
).values_list("id", flat=True)
active_old_feeds = list(
UserSubscription.objects.filter(user_id__in=active_old_users)
.values("user")
.annotate(feed_count=Count("id"))
.values_list("feed_count", flat=True)
)
# Generate histograms
self._generate_histogram(free_feeds, "FREE USERS")
self._generate_histogram(premium_feeds, "PREMIUM USERS (not Archive/Pro)")
self._generate_histogram(archive_feeds, "ARCHIVE USERS (not Pro)")
self._generate_histogram(pro_feeds, "PRO USERS")
self._generate_histogram(
active_old_feeds, "ACTIVE OLD USERS (active last 365 days, account 2+ years)"
)
# Summary stats
self.stdout.write("\n" + "=" * 70)
self.stdout.write("SUMMARY STATISTICS")
self.stdout.write("=" * 70)
self.stdout.write(f"Free users: {len(free_feeds):,}")
self.stdout.write(f"Premium users: {len(premium_feeds):,}")
self.stdout.write(f"Archive users: {len(archive_feeds):,}")
self.stdout.write(f"Pro users: {len(pro_feeds):,}")
self.stdout.write(f"Active old users: {len(active_old_feeds):,}")
def _generate_histogram(self, data, title, max_width=60):
"""Generate ASCII histogram"""
if not data:
self.stdout.write(f"\n{title}: No data")
return
# Create buckets
min_val = min(data)
max_val = max(data)
# Define bucket ranges
if max_val <= 100:
buckets = [(0, 10), (11, 25), (26, 50), (51, 75), (76, 100)]
elif max_val <= 500:
buckets = [(0, 10), (11, 25), (26, 50), (51, 100), (101, 200), (201, 300), (301, 400), (401, 500)]
else:
buckets = [
(0, 10),
(11, 25),
(26, 50),
(51, 100),
(101, 250),
(251, 500),
(501, 750),
(751, 1000),
(1001, 1500),
(1501, max_val),
]
# Count users in each bucket
bucket_counts = []
for low, high in buckets:
count = sum(1 for x in data if low <= x <= high)
bucket_counts.append((f"{low:4d}-{high:4d}", count))
# Find max count for scaling
max_count = max(count for _, count in bucket_counts) if bucket_counts else 1
self.stdout.write(f"\n{title}")
self.stdout.write(f"Total users: {len(data)}")
self.stdout.write(f"Feed range: {min_val}-{max_val}")
self.stdout.write("-" * 70)
for label, count in bucket_counts:
bar_width = int((count / max_count) * max_width) if max_count > 0 else 0
bar = "" * bar_width
self.stdout.write(f"{label} feeds: {count:5d} |{bar}")

View file

@ -125,7 +125,7 @@ SHELL_PLUS_IMPORTS = [
# SHELL_PLUS_PRINT_SQL = True
MIDDLEWARE = (
"django_prometheus.middleware.PrometheusBeforeMiddleware",
"utils.prometheus_middleware.PrometheusBeforeMiddlewareWrapper",
"django.middleware.gzip.GZipMiddleware",
"django.contrib.sessions.middleware.SessionMiddleware",
"subdomains.middleware.SubdomainMiddleware",
@ -144,7 +144,7 @@ MIDDLEWARE = (
"apps.profile.middleware.DBProfilerMiddleware",
"apps.profile.middleware.SQLLogToConsoleMiddleware",
"utils.redis_raw_log_middleware.RedisDumpMiddleware",
"django_prometheus.middleware.PrometheusAfterMiddleware",
"utils.prometheus_middleware.PrometheusAfterMiddlewareWrapper",
)
AUTHENTICATION_BACKENDS = (
@ -175,7 +175,10 @@ LOGGING = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"verbose": {"format": "[%(asctime)-12s] %(message)s", "datefmt": "%b %d %H:%M:%S"},
"verbose": {
"format": "[%(asctime)-12s] %(message)s",
"datefmt": "%b %d %H:%M:%S",
},
"simple": {"format": "%(message)s"},
},
"handlers": {
@ -183,8 +186,16 @@ LOGGING = {
"level": "DEBUG",
"class": "logging.NullHandler",
},
"console": {"level": "DEBUG", "class": "logging.StreamHandler", "formatter": "verbose"},
"vendor.apns": {"level": "DEBUG", "class": "logging.StreamHandler", "formatter": "verbose"},
"console": {
"level": "DEBUG",
"class": "logging.StreamHandler",
"formatter": "verbose",
},
"vendor.apns": {
"level": "DEBUG",
"class": "logging.StreamHandler",
"formatter": "verbose",
},
"log_file": {
"level": "DEBUG",
"class": "logging.handlers.RotatingFileHandler",
@ -378,7 +389,10 @@ CELERY_TASK_ROUTES = {
"update-feeds": {"queue": "update_feeds", "binding_key": "update_feeds"},
"beat-tasks": {"queue": "cron_queue", "binding_key": "cron_queue"},
"search-indexer": {"queue": "search_indexer", "binding_key": "search_indexer"},
"discover-indexer": {"queue": "discover_indexer", "binding_key": "discover_indexer"},
"discover-indexer": {
"queue": "discover_indexer",
"binding_key": "discover_indexer",
},
}
CELERY_TASK_QUEUES = {
"work_queue": {
@ -386,10 +400,26 @@ CELERY_TASK_QUEUES = {
"exchange_type": "direct",
"binding_key": "work_queue",
},
"new_feeds": {"exchange": "new_feeds", "exchange_type": "direct", "binding_key": "new_feeds"},
"push_feeds": {"exchange": "push_feeds", "exchange_type": "direct", "binding_key": "push_feeds"},
"update_feeds": {"exchange": "update_feeds", "exchange_type": "direct", "binding_key": "update_feeds"},
"cron_queue": {"exchange": "cron_queue", "exchange_type": "direct", "binding_key": "cron_queue"},
"new_feeds": {
"exchange": "new_feeds",
"exchange_type": "direct",
"binding_key": "new_feeds",
},
"push_feeds": {
"exchange": "push_feeds",
"exchange_type": "direct",
"binding_key": "push_feeds",
},
"update_feeds": {
"exchange": "update_feeds",
"exchange_type": "direct",
"binding_key": "update_feeds",
},
"cron_queue": {
"exchange": "cron_queue",
"exchange_type": "direct",
"binding_key": "cron_queue",
},
"beat_feeds_task": {
"exchange": "beat_feeds_task",
"exchange_type": "direct",
@ -747,7 +777,9 @@ if "username" in MONGO_ANALYTICS_DB:
)
else:
MONGOANALYTICSDB = connect(
db=MONGO_ANALYTICS_DB["name"], host=f"mongodb://{MONGO_ANALYTICS_DB['host']}/", alias="nbanalytics"
db=MONGO_ANALYTICS_DB["name"],
host=f"mongodb://{MONGO_ANALYTICS_DB['host']}/",
alias="nbanalytics",
)
@ -772,9 +804,18 @@ if REDIS_USER is None:
CELERY_REDIS_DB_NUM = 4
SESSION_REDIS_DB = 5
CELERY_BROKER_URL = "redis://%s:%s/%s" % (REDIS_USER["host"], REDIS_USER_PORT, CELERY_REDIS_DB_NUM)
CELERY_BROKER_URL = "redis://%s:%s/%s" % (
REDIS_USER["host"],
REDIS_USER_PORT,
CELERY_REDIS_DB_NUM,
)
CELERY_RESULT_BACKEND = CELERY_BROKER_URL
BROKER_TRANSPORT_OPTIONS = {"max_retries": 3, "interval_start": 0, "interval_step": 0.2, "interval_max": 0.5}
BROKER_TRANSPORT_OPTIONS = {
"max_retries": 3,
"interval_start": 0,
"interval_step": 0.2,
"interval_max": 0.5,
}
SESSION_REDIS = {
"host": REDIS_SESSIONS["host"],

View file

@ -0,0 +1,122 @@
"""
Prometheus middleware wrapper that handles mmap errors and performs cleanup
"""
import logging
import os
import time
from django_prometheus.middleware import (
PrometheusAfterMiddleware,
PrometheusBeforeMiddleware,
)
from prometheus_client import values
logger = logging.getLogger(__name__)
class PrometheusBeforeMiddlewareWrapper(PrometheusBeforeMiddleware):
"""Wrapper for PrometheusBeforeMiddleware that handles mmap errors"""
_last_cleanup = 0
_cleanup_interval = 300 # 5 minutes
def __call__(self, request):
try:
# Periodic cleanup of old files
current_time = time.time()
if current_time - self._last_cleanup > self._cleanup_interval:
self._cleanup_old_files()
PrometheusBeforeMiddlewareWrapper._last_cleanup = current_time
return super().__call__(request)
except IndexError as e:
if "mmap slice assignment is wrong size" in str(e):
logger.warning("Prometheus mmap corruption detected, attempting recovery")
self._reset_prometheus_mmap()
# Try once more after reset
try:
return super().__call__(request)
except Exception as retry_error:
logger.error(f"Prometheus retry failed: {retry_error}")
# Continue without metrics rather than crash
return self.get_response(request)
raise
def _cleanup_old_files(self):
"""Clean up old prometheus files from dead processes"""
prom_dir = os.environ.get("PROMETHEUS_MULTIPROC_DIR", "/srv/newsblur/.prom_cache")
if not os.path.exists(prom_dir):
return
cleaned = 0
current_time = time.time()
try:
for filename in os.listdir(prom_dir):
if not filename.endswith(".db"):
continue
filepath = os.path.join(prom_dir, filename)
# Check if file is older than 1 hour
if current_time - os.path.getmtime(filepath) > 3600:
# Extract PID from filename (e.g., counter_12345.db)
parts = filename.split("_")
if len(parts) >= 2:
try:
pid = int(parts[1].replace(".db", ""))
# Check if process exists
os.kill(pid, 0)
# Process exists, skip
continue
except (ValueError, OSError):
# Process doesn't exist, safe to delete
pass
try:
os.unlink(filepath)
cleaned += 1
except Exception as e:
logger.debug(f"Could not remove {filepath}: {e}")
if cleaned > 0:
logger.info(f"Cleaned up {cleaned} old prometheus files")
except Exception as e:
logger.error(f"Error during prometheus cleanup: {e}")
def _reset_prometheus_mmap(self):
"""Reset prometheus mmap cache when corruption is detected"""
try:
# Clear the mmap cache
if hasattr(values, "_ValueClass") and hasattr(values._ValueClass, "_mmap_dict_cache"):
values._ValueClass._mmap_dict_cache.clear()
logger.info("Reset prometheus mmap cache")
except Exception as e:
logger.error(f"Error resetting prometheus mmap: {e}")
class PrometheusAfterMiddlewareWrapper(PrometheusAfterMiddleware):
"""Wrapper for PrometheusAfterMiddleware that handles mmap errors"""
def __call__(self, request):
try:
return super().__call__(request)
except IndexError as e:
if "mmap slice assignment is wrong size" in str(e):
logger.warning("Prometheus mmap corruption in after middleware, continuing without metrics")
# Get the response without metrics
if hasattr(self, "_response"):
return self._response
return None
raise
def process_response(self, request, response):
# Store response in case we need it during error recovery
self._response = response
try:
return super().process_response(request, response)
except IndexError as e:
if "mmap slice assignment is wrong size" in str(e):
logger.warning("Prometheus mmap corruption in process_response, continuing without metrics")
return response
raise