mirror of
https://github.com/viq/NewsBlur.git
synced 2025-09-18 21:43:31 +00:00
Adding user sub counts to redis, using new redis cache to count subscribers only when feed is entirely cached. Still need automatic conversion from postgres to redis sub counts.
This commit is contained in:
parent
6a20478a7f
commit
cf43747b20
7 changed files with 180 additions and 70 deletions
|
@ -31,8 +31,6 @@ class LastSeenMiddleware(object):
|
|||
logging.user(request, "~FG~BBRepeat visitor (ignored): ~SB%s (%s)" % (
|
||||
request.user.profile.last_seen_on, ip))
|
||||
|
||||
# if request.user.profile.last_seen_on < SUBSCRIBER_EXPIRE:
|
||||
# request.user.profile.refresh_stale_feeds()
|
||||
request.user.profile.last_seen_on = datetime.datetime.utcnow()
|
||||
request.user.profile.last_seen_ip = ip[-15:]
|
||||
request.user.profile.save()
|
||||
|
|
|
@ -18,14 +18,14 @@ from django.core.mail import mail_admins
|
|||
from django.core.mail import EmailMultiAlternatives
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.template.loader import render_to_string
|
||||
from apps.reader.models import UserSubscription
|
||||
from apps.rss_feeds.models import Feed, MStory, MStarredStory
|
||||
from apps.rss_feeds.tasks import NewFeeds
|
||||
from apps.rss_feeds.tasks import SchedulePremiumSetup
|
||||
from apps.feed_import.models import GoogleReaderImporter, OPMLExporter
|
||||
from apps.reader.models import UserSubscription
|
||||
from utils import log as logging
|
||||
from utils import json_functions as json
|
||||
from utils.user_functions import generate_secret_token
|
||||
from utils.feed_functions import chunks
|
||||
from vendor.timezones.fields import TimeZoneField
|
||||
from vendor.paypal.standard.ipn.signals import subscription_signup, payment_was_successful, recurring_payment
|
||||
from vendor.paypal.standard.ipn.signals import payment_was_flagged
|
||||
|
@ -156,16 +156,9 @@ class Profile(models.Model):
|
|||
logging.user(self.user, "Deleting user: %s" % self.user)
|
||||
self.user.delete()
|
||||
|
||||
def check_if_spammer(self):
|
||||
feed_opens = UserSubscription.objects.filter(user=self.user)\
|
||||
.aggregate(sum=Sum('feed_opens'))['sum']
|
||||
feed_count = UserSubscription.objects.filter(user=self.user).count()
|
||||
|
||||
if not feed_opens and not feed_count:
|
||||
return True
|
||||
|
||||
def activate_premium(self, never_expire=False):
|
||||
from apps.profile.tasks import EmailNewPremium
|
||||
|
||||
EmailNewPremium.delay(user_id=self.user.pk)
|
||||
|
||||
self.is_premium = True
|
||||
|
@ -190,7 +183,7 @@ class Profile(models.Model):
|
|||
len(scheduled_feeds))
|
||||
SchedulePremiumSetup.apply_async(kwargs=dict(feed_ids=scheduled_feeds))
|
||||
|
||||
self.queue_new_feeds()
|
||||
UserSubscription.queue_new_feeds(self.user)
|
||||
self.setup_premium_history()
|
||||
|
||||
if never_expire:
|
||||
|
@ -400,34 +393,62 @@ class Profile(models.Model):
|
|||
|
||||
return True
|
||||
|
||||
def queue_new_feeds(self, new_feeds=None):
|
||||
if not new_feeds:
|
||||
new_feeds = UserSubscription.objects.filter(user=self.user,
|
||||
feed__fetched_once=False,
|
||||
active=True).values('feed_id')
|
||||
new_feeds = list(set([f['feed_id'] for f in new_feeds]))
|
||||
logging.user(self.user, "~BB~FW~SBQueueing NewFeeds: ~FC(%s) %s" % (len(new_feeds), new_feeds))
|
||||
size = 4
|
||||
for t in (new_feeds[pos:pos + size] for pos in xrange(0, len(new_feeds), size)):
|
||||
NewFeeds.apply_async(args=(t,), queue="new_feeds")
|
||||
|
||||
def refresh_stale_feeds(self, exclude_new=False):
|
||||
stale_cutoff = datetime.datetime.now() - datetime.timedelta(days=7)
|
||||
stale_feeds = UserSubscription.objects.filter(user=self.user, active=True, feed__last_update__lte=stale_cutoff)
|
||||
if exclude_new:
|
||||
stale_feeds = stale_feeds.filter(feed__fetched_once=True)
|
||||
all_feeds = UserSubscription.objects.filter(user=self.user, active=True)
|
||||
@classmethod
|
||||
def count_feed_subscribers(self, feed_id=None, user_id=None):
|
||||
SUBSCRIBER_EXPIRE = datetime.datetime.now() - datetime.timedelta(days=settings.SUBSCRIBER_EXPIRE)
|
||||
r = redis.Redis(connection_pool=settings.REDIS_FEED_SUB_POOL)
|
||||
entire_feed_counted = False
|
||||
|
||||
logging.user(self.user, "~FG~BBRefreshing stale feeds: ~SB%s/%s" % (
|
||||
stale_feeds.count(), all_feeds.count()))
|
||||
|
||||
for sub in stale_feeds:
|
||||
sub.feed.fetched_once = False
|
||||
sub.feed.save()
|
||||
logging.debug(" ---> ~SN~FBCounting subscribers for feed:~SB~FM%s~SN~FB user:~SB~FM%s" % (feed_id, user_id))
|
||||
|
||||
if stale_feeds:
|
||||
stale_feeds = list(set([f.feed_id for f in stale_feeds]))
|
||||
self.queue_new_feeds(new_feeds=stale_feeds)
|
||||
if feed_id:
|
||||
feed_ids = [feed_id]
|
||||
elif user_id:
|
||||
feed_ids = [us['feed_id'] for us in UserSubscription.objects.filter(user=user_id).values('feed_id')]
|
||||
else:
|
||||
assert False, "feed_id or user_id required"
|
||||
|
||||
if feed_id and not user_id:
|
||||
entire_feed_counted = True
|
||||
|
||||
for feed_id in feed_ids:
|
||||
total = 0
|
||||
premium = 0
|
||||
active = 0
|
||||
active_premium = 0
|
||||
key = 's:%s' % feed_id
|
||||
premium_key = 'sp:%s' % feed_id
|
||||
|
||||
if user_id:
|
||||
user_ids = [user_id]
|
||||
else:
|
||||
user_ids = [us['user_id'] for us in UserSubscription.objects.filter(feed_id=feed_id).values('user_id')]
|
||||
profiles = Profile.objects.filter(user_id__in=user_ids).values('user_id', 'last_seen_on', 'is_premium')
|
||||
feed = Feed.get_by_id(feed_id)
|
||||
|
||||
for profiles_group in chunks(profiles, 20):
|
||||
pipeline = r.pipeline()
|
||||
for profile in profiles_group:
|
||||
last_seen_on = int(profile['last_seen_on'].strftime('%s'))
|
||||
pipeline.zadd(key, profile['user_id'], last_seen_on)
|
||||
total += 1
|
||||
if profile['is_premium']:
|
||||
pipeline.zadd(premium_key, profile['user_id'], last_seen_on)
|
||||
premium += 1
|
||||
if profile['last_seen_on'] > SUBSCRIBER_EXPIRE:
|
||||
active += 1
|
||||
if profile['is_premium']:
|
||||
active_premium += 1
|
||||
|
||||
pipeline.execute()
|
||||
|
||||
if entire_feed_counted:
|
||||
now = int(datetime.datetime.now().strftime('%s'))
|
||||
r.zadd(key, -1, now)
|
||||
r.zadd(premium_key, -1, now)
|
||||
|
||||
logging.info(" ---> [%-30s] ~SN~FBCounting subscribers, storing in ~SBredis~SN: ~FMt:~SB~FM%s~SN a:~SB%s~SN p:~SB%s~SN ap:~SB%s" %
|
||||
(feed.title[:30], total, active, premium, active_premium))
|
||||
|
||||
def import_reader_starred_items(self, count=20):
|
||||
importer = GoogleReaderImporter(self.user)
|
||||
|
|
|
@ -59,6 +59,7 @@ class CleanupUser(Task):
|
|||
def run(self, user_id):
|
||||
UserSubscription.trim_user_read_stories(user_id)
|
||||
UserSubscription.verify_feeds_scheduled(user_id)
|
||||
UserSubscription.refresh_stale_feeds(user_id)
|
||||
|
||||
try:
|
||||
ss = MSocialServices.objects.get(user_id=user_id)
|
||||
|
|
|
@ -17,6 +17,7 @@ from mongoengine.queryset import OperationError
|
|||
from mongoengine.queryset import NotUniqueError
|
||||
from apps.reader.managers import UserSubscriptionManager
|
||||
from apps.rss_feeds.models import Feed, MStory, DuplicateFeed
|
||||
from apps.rss_feeds.tasks import NewFeeds
|
||||
from apps.analyzer.models import MClassifierFeed, MClassifierAuthor, MClassifierTag, MClassifierTitle
|
||||
from apps.analyzer.models import apply_classifier_titles, apply_classifier_feeds, apply_classifier_authors, apply_classifier_tags
|
||||
from apps.analyzer.tfidf import tfidf
|
||||
|
@ -421,6 +422,49 @@ class UserSubscription(models.Model):
|
|||
|
||||
return feeds
|
||||
|
||||
@classmethod
|
||||
def queue_new_feeds(cls, user, new_feeds=None):
|
||||
if not isinstance(user, User):
|
||||
user = User.objects.get(pk=user)
|
||||
|
||||
if not new_feeds:
|
||||
new_feeds = cls.objects.filter(user=user,
|
||||
feed__fetched_once=False,
|
||||
active=True).values('feed_id')
|
||||
new_feeds = list(set([f['feed_id'] for f in new_feeds]))
|
||||
|
||||
if not new_feeds:
|
||||
return
|
||||
|
||||
logging.user(user, "~BB~FW~SBQueueing NewFeeds: ~FC(%s) %s" % (len(new_feeds), new_feeds))
|
||||
size = 4
|
||||
for t in (new_feeds[pos:pos + size] for pos in xrange(0, len(new_feeds), size)):
|
||||
NewFeeds.apply_async(args=(t,), queue="new_feeds")
|
||||
|
||||
@classmethod
|
||||
def refresh_stale_feeds(cls, user, exclude_new=False):
|
||||
if not isinstance(user, User):
|
||||
user = User.objects.get(pk=user)
|
||||
|
||||
stale_cutoff = datetime.datetime.now() - datetime.timedelta(days=settings.SUBSCRIBER_EXPIRE)
|
||||
|
||||
# TODO: Refactor below using last_update from REDIS_FEED_UPDATE_POOL
|
||||
stale_feeds = UserSubscription.objects.filter(user=user, active=True, feed__last_update__lte=stale_cutoff)
|
||||
if exclude_new:
|
||||
stale_feeds = stale_feeds.filter(feed__fetched_once=True)
|
||||
all_feeds = UserSubscription.objects.filter(user=user, active=True)
|
||||
|
||||
logging.user(user, "~FG~BBRefreshing stale feeds: ~SB%s/%s" % (
|
||||
stale_feeds.count(), all_feeds.count()))
|
||||
|
||||
for sub in stale_feeds:
|
||||
sub.feed.fetched_once = False
|
||||
sub.feed.save()
|
||||
|
||||
if stale_feeds:
|
||||
stale_feeds = list(set([f.feed_id for f in stale_feeds]))
|
||||
cls.queue_new_feeds(user, new_feeds=stale_feeds)
|
||||
|
||||
@classmethod
|
||||
def identify_deleted_feed_users(cls, old_feed_id):
|
||||
users = UserSubscriptionFolders.objects.filter(folders__contains=old_feed_id).only('user')
|
||||
|
|
|
@ -1948,8 +1948,8 @@ def save_feed_chooser(request):
|
|||
except Feed.DoesNotExist:
|
||||
pass
|
||||
|
||||
request.user.profile.queue_new_feeds()
|
||||
request.user.profile.refresh_stale_feeds(exclude_new=True)
|
||||
UserSubscription.queue_new_feeds(request.user)
|
||||
UserSubscription.refresh_stale_feeds(request.user, exclude_new=True)
|
||||
|
||||
r = redis.Redis(connection_pool=settings.REDIS_PUBSUB_POOL)
|
||||
r.publish(request.user.username, 'reload:feeds')
|
||||
|
|
|
@ -634,9 +634,17 @@ class Feed(models.Model):
|
|||
return redirects, non_redirects
|
||||
|
||||
def count_subscribers(self, verbose=False):
|
||||
SUBSCRIBER_EXPIRE = datetime.datetime.now() - datetime.timedelta(days=settings.SUBSCRIBER_EXPIRE)
|
||||
from apps.reader.models import UserSubscription
|
||||
SUBSCRIBER_EXPIRE_DATE = datetime.datetime.now() - datetime.timedelta(days=settings.SUBSCRIBER_EXPIRE)
|
||||
subscriber_expire = int(SUBSCRIBER_EXPIRE_DATE.strftime('%s'))
|
||||
now = int(datetime.datetime.now().strftime('%s'))
|
||||
r = redis.Redis(connection_pool=settings.REDIS_FEED_SUB_POOL)
|
||||
total = 0
|
||||
active = 0
|
||||
premium = 0
|
||||
active_premium = 0
|
||||
counts_converted_to_redis = False
|
||||
|
||||
# Include all branched feeds in counts
|
||||
if self.branch_from_feed:
|
||||
original_feed_id = self.branch_from_feed.pk
|
||||
else:
|
||||
|
@ -645,35 +653,73 @@ class Feed(models.Model):
|
|||
feed_ids.append(original_feed_id)
|
||||
feed_ids = list(set(feed_ids))
|
||||
|
||||
subs = UserSubscription.objects.filter(feed__in=feed_ids)
|
||||
original_num_subscribers = self.num_subscribers
|
||||
self.num_subscribers = subs.count()
|
||||
counts_converted_to_redis = bool(r.zrank("s:%s" % original_feed_id, -1))
|
||||
|
||||
active_subs = UserSubscription.objects.filter(
|
||||
feed__in=feed_ids,
|
||||
active=True,
|
||||
user__profile__last_seen_on__gte=SUBSCRIBER_EXPIRE
|
||||
)
|
||||
original_active_subs = self.active_subscribers
|
||||
self.active_subscribers = active_subs.count()
|
||||
if counts_converted_to_redis:
|
||||
# For each branched feed, count different subscribers
|
||||
pipeline = r.pipeline()
|
||||
for feed_id in feed_ids:
|
||||
total_key = "s:%s" % feed_id
|
||||
premium_key = "sp:%s" % feed_id
|
||||
print feed_id, total_key
|
||||
pipeline.zcard(total_key)
|
||||
pipeline.zcount(total_key, subscriber_expire, now-1)
|
||||
pipeline.zcard(premium_key)
|
||||
pipeline.zcount(premium_key, subscriber_expire, now-1)
|
||||
|
||||
results = pipeline.execute()
|
||||
|
||||
# -1 due to key=-1 signaling counts_converted_to_redis
|
||||
total += results[0] - 1
|
||||
active += results[1] - 1
|
||||
premium += results[2] - 1
|
||||
active_premium += results[3] - 1
|
||||
|
||||
# If any counts have changed, save them
|
||||
original_num_subscribers = self.num_subscribers
|
||||
original_active_subs = self.active_subscribers
|
||||
original_premium_subscribers = self.premium_subscribers
|
||||
original_active_premium_subscribers = self.active_premium_subscribers
|
||||
logging.info(" ---> [%-30s] ~SN~FBCounting subscribers from ~FCredis~FB: ~FMt:~SB~FM%s~SN a:~SB%s~SN p:~SB%s~SN ap:~SB%s" %
|
||||
(self.title[:30], total, active, premium, active_premium))
|
||||
else:
|
||||
from apps.reader.models import UserSubscription
|
||||
|
||||
subs = UserSubscription.objects.filter(feed__in=feed_ids)
|
||||
original_num_subscribers = self.num_subscribers
|
||||
total = subs.count()
|
||||
|
||||
premium_subs = UserSubscription.objects.filter(
|
||||
feed__in=feed_ids,
|
||||
active=True,
|
||||
user__profile__is_premium=True
|
||||
)
|
||||
original_premium_subscribers = self.premium_subscribers
|
||||
self.premium_subscribers = premium_subs.count()
|
||||
active_subs = UserSubscription.objects.filter(
|
||||
feed__in=feed_ids,
|
||||
active=True,
|
||||
user__profile__last_seen_on__gte=SUBSCRIBER_EXPIRE_DATE
|
||||
)
|
||||
original_active_subs = self.active_subscribers
|
||||
active = active_subs.count()
|
||||
|
||||
active_premium_subscribers = UserSubscription.objects.filter(
|
||||
feed__in=feed_ids,
|
||||
active=True,
|
||||
user__profile__is_premium=True,
|
||||
user__profile__last_seen_on__gte=SUBSCRIBER_EXPIRE
|
||||
)
|
||||
original_active_premium_subscribers = self.active_premium_subscribers
|
||||
self.active_premium_subscribers = active_premium_subscribers.count()
|
||||
premium_subs = UserSubscription.objects.filter(
|
||||
feed__in=feed_ids,
|
||||
active=True,
|
||||
user__profile__is_premium=True
|
||||
)
|
||||
original_premium_subscribers = self.premium_subscribers
|
||||
premium = premium_subs.count()
|
||||
|
||||
active_premium_subscribers = UserSubscription.objects.filter(
|
||||
feed__in=feed_ids,
|
||||
active=True,
|
||||
user__profile__is_premium=True,
|
||||
user__profile__last_seen_on__gte=SUBSCRIBER_EXPIRE_DATE
|
||||
)
|
||||
original_active_premium_subscribers = self.active_premium_subscribers
|
||||
active_premium = active_premium_subscribers.count()
|
||||
logging.debug(" ---> [%-30s] ~SN~FBCounting subscribers from ~FYpostgres~FB: ~FMt:~SB~FM%s~SN a:~SB%s~SN p:~SB%s~SN ap:~SB%s" %
|
||||
(self.title[:30], total, active, premium, active_premium))
|
||||
|
||||
self.num_subscribers = total
|
||||
self.active_subscribers = active
|
||||
self.premium_subscribers = premium
|
||||
self.active_premium_subscribers = active_premium
|
||||
if (self.num_subscribers != original_num_subscribers or
|
||||
self.active_subscribers != original_active_subs or
|
||||
self.premium_subscribers != original_premium_subscribers or
|
||||
|
@ -691,7 +737,7 @@ class Feed(models.Model):
|
|||
'' if self.num_subscribers == 1 else 's',
|
||||
self.feed_title,
|
||||
),
|
||||
|
||||
|
||||
def _split_favicon_color(self):
|
||||
color = self.favicon_color
|
||||
if color:
|
||||
|
|
|
@ -36,7 +36,7 @@ python-gflags==2.0
|
|||
pytz==2013b
|
||||
raven==3.1.17
|
||||
readline==6.2.4.1
|
||||
redis==2.8.0
|
||||
redis==2.10.3
|
||||
hiredis==0.1.1
|
||||
requests==2.5.2
|
||||
seacucumber==1.5
|
||||
|
|
Loading…
Add table
Reference in a new issue