diff --git a/apps/profile/middleware.py b/apps/profile/middleware.py index 169731b38..3f84f7166 100644 --- a/apps/profile/middleware.py +++ b/apps/profile/middleware.py @@ -31,8 +31,6 @@ class LastSeenMiddleware(object): logging.user(request, "~FG~BBRepeat visitor (ignored): ~SB%s (%s)" % ( request.user.profile.last_seen_on, ip)) - # if request.user.profile.last_seen_on < SUBSCRIBER_EXPIRE: - # request.user.profile.refresh_stale_feeds() request.user.profile.last_seen_on = datetime.datetime.utcnow() request.user.profile.last_seen_ip = ip[-15:] request.user.profile.save() diff --git a/apps/profile/models.py b/apps/profile/models.py index 75a68a0aa..2b2bca7e7 100644 --- a/apps/profile/models.py +++ b/apps/profile/models.py @@ -18,14 +18,14 @@ from django.core.mail import mail_admins from django.core.mail import EmailMultiAlternatives from django.core.urlresolvers import reverse from django.template.loader import render_to_string -from apps.reader.models import UserSubscription from apps.rss_feeds.models import Feed, MStory, MStarredStory -from apps.rss_feeds.tasks import NewFeeds from apps.rss_feeds.tasks import SchedulePremiumSetup from apps.feed_import.models import GoogleReaderImporter, OPMLExporter +from apps.reader.models import UserSubscription from utils import log as logging from utils import json_functions as json from utils.user_functions import generate_secret_token +from utils.feed_functions import chunks from vendor.timezones.fields import TimeZoneField from vendor.paypal.standard.ipn.signals import subscription_signup, payment_was_successful, recurring_payment from vendor.paypal.standard.ipn.signals import payment_was_flagged @@ -156,16 +156,9 @@ class Profile(models.Model): logging.user(self.user, "Deleting user: %s" % self.user) self.user.delete() - def check_if_spammer(self): - feed_opens = UserSubscription.objects.filter(user=self.user)\ - .aggregate(sum=Sum('feed_opens'))['sum'] - feed_count = UserSubscription.objects.filter(user=self.user).count() - - if not feed_opens and not feed_count: - return True - def activate_premium(self, never_expire=False): from apps.profile.tasks import EmailNewPremium + EmailNewPremium.delay(user_id=self.user.pk) self.is_premium = True @@ -190,7 +183,7 @@ class Profile(models.Model): len(scheduled_feeds)) SchedulePremiumSetup.apply_async(kwargs=dict(feed_ids=scheduled_feeds)) - self.queue_new_feeds() + UserSubscription.queue_new_feeds(self.user) self.setup_premium_history() if never_expire: @@ -400,34 +393,62 @@ class Profile(models.Model): return True - def queue_new_feeds(self, new_feeds=None): - if not new_feeds: - new_feeds = UserSubscription.objects.filter(user=self.user, - feed__fetched_once=False, - active=True).values('feed_id') - new_feeds = list(set([f['feed_id'] for f in new_feeds])) - logging.user(self.user, "~BB~FW~SBQueueing NewFeeds: ~FC(%s) %s" % (len(new_feeds), new_feeds)) - size = 4 - for t in (new_feeds[pos:pos + size] for pos in xrange(0, len(new_feeds), size)): - NewFeeds.apply_async(args=(t,), queue="new_feeds") - - def refresh_stale_feeds(self, exclude_new=False): - stale_cutoff = datetime.datetime.now() - datetime.timedelta(days=7) - stale_feeds = UserSubscription.objects.filter(user=self.user, active=True, feed__last_update__lte=stale_cutoff) - if exclude_new: - stale_feeds = stale_feeds.filter(feed__fetched_once=True) - all_feeds = UserSubscription.objects.filter(user=self.user, active=True) + @classmethod + def count_feed_subscribers(self, feed_id=None, user_id=None): + SUBSCRIBER_EXPIRE = datetime.datetime.now() - datetime.timedelta(days=settings.SUBSCRIBER_EXPIRE) + r = redis.Redis(connection_pool=settings.REDIS_FEED_SUB_POOL) + entire_feed_counted = False - logging.user(self.user, "~FG~BBRefreshing stale feeds: ~SB%s/%s" % ( - stale_feeds.count(), all_feeds.count())) - - for sub in stale_feeds: - sub.feed.fetched_once = False - sub.feed.save() + logging.debug(" ---> ~SN~FBCounting subscribers for feed:~SB~FM%s~SN~FB user:~SB~FM%s" % (feed_id, user_id)) - if stale_feeds: - stale_feeds = list(set([f.feed_id for f in stale_feeds])) - self.queue_new_feeds(new_feeds=stale_feeds) + if feed_id: + feed_ids = [feed_id] + elif user_id: + feed_ids = [us['feed_id'] for us in UserSubscription.objects.filter(user=user_id).values('feed_id')] + else: + assert False, "feed_id or user_id required" + + if feed_id and not user_id: + entire_feed_counted = True + + for feed_id in feed_ids: + total = 0 + premium = 0 + active = 0 + active_premium = 0 + key = 's:%s' % feed_id + premium_key = 'sp:%s' % feed_id + + if user_id: + user_ids = [user_id] + else: + user_ids = [us['user_id'] for us in UserSubscription.objects.filter(feed_id=feed_id).values('user_id')] + profiles = Profile.objects.filter(user_id__in=user_ids).values('user_id', 'last_seen_on', 'is_premium') + feed = Feed.get_by_id(feed_id) + + for profiles_group in chunks(profiles, 20): + pipeline = r.pipeline() + for profile in profiles_group: + last_seen_on = int(profile['last_seen_on'].strftime('%s')) + pipeline.zadd(key, profile['user_id'], last_seen_on) + total += 1 + if profile['is_premium']: + pipeline.zadd(premium_key, profile['user_id'], last_seen_on) + premium += 1 + if profile['last_seen_on'] > SUBSCRIBER_EXPIRE: + active += 1 + if profile['is_premium']: + active_premium += 1 + + pipeline.execute() + + if entire_feed_counted: + now = int(datetime.datetime.now().strftime('%s')) + r.zadd(key, -1, now) + r.zadd(premium_key, -1, now) + + logging.info(" ---> [%-30s] ~SN~FBCounting subscribers, storing in ~SBredis~SN: ~FMt:~SB~FM%s~SN a:~SB%s~SN p:~SB%s~SN ap:~SB%s" % + (feed.title[:30], total, active, premium, active_premium)) def import_reader_starred_items(self, count=20): importer = GoogleReaderImporter(self.user) diff --git a/apps/profile/tasks.py b/apps/profile/tasks.py index 229e76822..b7a783183 100644 --- a/apps/profile/tasks.py +++ b/apps/profile/tasks.py @@ -59,6 +59,7 @@ class CleanupUser(Task): def run(self, user_id): UserSubscription.trim_user_read_stories(user_id) UserSubscription.verify_feeds_scheduled(user_id) + UserSubscription.refresh_stale_feeds(user_id) try: ss = MSocialServices.objects.get(user_id=user_id) diff --git a/apps/reader/models.py b/apps/reader/models.py index b3987fd54..f68d8c059 100644 --- a/apps/reader/models.py +++ b/apps/reader/models.py @@ -17,6 +17,7 @@ from mongoengine.queryset import OperationError from mongoengine.queryset import NotUniqueError from apps.reader.managers import UserSubscriptionManager from apps.rss_feeds.models import Feed, MStory, DuplicateFeed +from apps.rss_feeds.tasks import NewFeeds from apps.analyzer.models import MClassifierFeed, MClassifierAuthor, MClassifierTag, MClassifierTitle from apps.analyzer.models import apply_classifier_titles, apply_classifier_feeds, apply_classifier_authors, apply_classifier_tags from apps.analyzer.tfidf import tfidf @@ -421,6 +422,49 @@ class UserSubscription(models.Model): return feeds + @classmethod + def queue_new_feeds(cls, user, new_feeds=None): + if not isinstance(user, User): + user = User.objects.get(pk=user) + + if not new_feeds: + new_feeds = cls.objects.filter(user=user, + feed__fetched_once=False, + active=True).values('feed_id') + new_feeds = list(set([f['feed_id'] for f in new_feeds])) + + if not new_feeds: + return + + logging.user(user, "~BB~FW~SBQueueing NewFeeds: ~FC(%s) %s" % (len(new_feeds), new_feeds)) + size = 4 + for t in (new_feeds[pos:pos + size] for pos in xrange(0, len(new_feeds), size)): + NewFeeds.apply_async(args=(t,), queue="new_feeds") + + @classmethod + def refresh_stale_feeds(cls, user, exclude_new=False): + if not isinstance(user, User): + user = User.objects.get(pk=user) + + stale_cutoff = datetime.datetime.now() - datetime.timedelta(days=settings.SUBSCRIBER_EXPIRE) + + # TODO: Refactor below using last_update from REDIS_FEED_UPDATE_POOL + stale_feeds = UserSubscription.objects.filter(user=user, active=True, feed__last_update__lte=stale_cutoff) + if exclude_new: + stale_feeds = stale_feeds.filter(feed__fetched_once=True) + all_feeds = UserSubscription.objects.filter(user=user, active=True) + + logging.user(user, "~FG~BBRefreshing stale feeds: ~SB%s/%s" % ( + stale_feeds.count(), all_feeds.count())) + + for sub in stale_feeds: + sub.feed.fetched_once = False + sub.feed.save() + + if stale_feeds: + stale_feeds = list(set([f.feed_id for f in stale_feeds])) + cls.queue_new_feeds(user, new_feeds=stale_feeds) + @classmethod def identify_deleted_feed_users(cls, old_feed_id): users = UserSubscriptionFolders.objects.filter(folders__contains=old_feed_id).only('user') diff --git a/apps/reader/views.py b/apps/reader/views.py index 730a12e4e..aeff07467 100644 --- a/apps/reader/views.py +++ b/apps/reader/views.py @@ -1948,8 +1948,8 @@ def save_feed_chooser(request): except Feed.DoesNotExist: pass - request.user.profile.queue_new_feeds() - request.user.profile.refresh_stale_feeds(exclude_new=True) + UserSubscription.queue_new_feeds(request.user) + UserSubscription.refresh_stale_feeds(request.user, exclude_new=True) r = redis.Redis(connection_pool=settings.REDIS_PUBSUB_POOL) r.publish(request.user.username, 'reload:feeds') diff --git a/apps/rss_feeds/models.py b/apps/rss_feeds/models.py index b83c32aa3..e92b7ac44 100644 --- a/apps/rss_feeds/models.py +++ b/apps/rss_feeds/models.py @@ -634,9 +634,17 @@ class Feed(models.Model): return redirects, non_redirects def count_subscribers(self, verbose=False): - SUBSCRIBER_EXPIRE = datetime.datetime.now() - datetime.timedelta(days=settings.SUBSCRIBER_EXPIRE) - from apps.reader.models import UserSubscription + SUBSCRIBER_EXPIRE_DATE = datetime.datetime.now() - datetime.timedelta(days=settings.SUBSCRIBER_EXPIRE) + subscriber_expire = int(SUBSCRIBER_EXPIRE_DATE.strftime('%s')) + now = int(datetime.datetime.now().strftime('%s')) + r = redis.Redis(connection_pool=settings.REDIS_FEED_SUB_POOL) + total = 0 + active = 0 + premium = 0 + active_premium = 0 + counts_converted_to_redis = False + # Include all branched feeds in counts if self.branch_from_feed: original_feed_id = self.branch_from_feed.pk else: @@ -645,35 +653,73 @@ class Feed(models.Model): feed_ids.append(original_feed_id) feed_ids = list(set(feed_ids)) - subs = UserSubscription.objects.filter(feed__in=feed_ids) - original_num_subscribers = self.num_subscribers - self.num_subscribers = subs.count() + counts_converted_to_redis = bool(r.zrank("s:%s" % original_feed_id, -1)) - active_subs = UserSubscription.objects.filter( - feed__in=feed_ids, - active=True, - user__profile__last_seen_on__gte=SUBSCRIBER_EXPIRE - ) - original_active_subs = self.active_subscribers - self.active_subscribers = active_subs.count() + if counts_converted_to_redis: + # For each branched feed, count different subscribers + pipeline = r.pipeline() + for feed_id in feed_ids: + total_key = "s:%s" % feed_id + premium_key = "sp:%s" % feed_id + print feed_id, total_key + pipeline.zcard(total_key) + pipeline.zcount(total_key, subscriber_expire, now-1) + pipeline.zcard(premium_key) + pipeline.zcount(premium_key, subscriber_expire, now-1) + + results = pipeline.execute() + + # -1 due to key=-1 signaling counts_converted_to_redis + total += results[0] - 1 + active += results[1] - 1 + premium += results[2] - 1 + active_premium += results[3] - 1 + + # If any counts have changed, save them + original_num_subscribers = self.num_subscribers + original_active_subs = self.active_subscribers + original_premium_subscribers = self.premium_subscribers + original_active_premium_subscribers = self.active_premium_subscribers + logging.info(" ---> [%-30s] ~SN~FBCounting subscribers from ~FCredis~FB: ~FMt:~SB~FM%s~SN a:~SB%s~SN p:~SB%s~SN ap:~SB%s" % + (self.title[:30], total, active, premium, active_premium)) + else: + from apps.reader.models import UserSubscription + + subs = UserSubscription.objects.filter(feed__in=feed_ids) + original_num_subscribers = self.num_subscribers + total = subs.count() - premium_subs = UserSubscription.objects.filter( - feed__in=feed_ids, - active=True, - user__profile__is_premium=True - ) - original_premium_subscribers = self.premium_subscribers - self.premium_subscribers = premium_subs.count() + active_subs = UserSubscription.objects.filter( + feed__in=feed_ids, + active=True, + user__profile__last_seen_on__gte=SUBSCRIBER_EXPIRE_DATE + ) + original_active_subs = self.active_subscribers + active = active_subs.count() - active_premium_subscribers = UserSubscription.objects.filter( - feed__in=feed_ids, - active=True, - user__profile__is_premium=True, - user__profile__last_seen_on__gte=SUBSCRIBER_EXPIRE - ) - original_active_premium_subscribers = self.active_premium_subscribers - self.active_premium_subscribers = active_premium_subscribers.count() + premium_subs = UserSubscription.objects.filter( + feed__in=feed_ids, + active=True, + user__profile__is_premium=True + ) + original_premium_subscribers = self.premium_subscribers + premium = premium_subs.count() + active_premium_subscribers = UserSubscription.objects.filter( + feed__in=feed_ids, + active=True, + user__profile__is_premium=True, + user__profile__last_seen_on__gte=SUBSCRIBER_EXPIRE_DATE + ) + original_active_premium_subscribers = self.active_premium_subscribers + active_premium = active_premium_subscribers.count() + logging.debug(" ---> [%-30s] ~SN~FBCounting subscribers from ~FYpostgres~FB: ~FMt:~SB~FM%s~SN a:~SB%s~SN p:~SB%s~SN ap:~SB%s" % + (self.title[:30], total, active, premium, active_premium)) + + self.num_subscribers = total + self.active_subscribers = active + self.premium_subscribers = premium + self.active_premium_subscribers = active_premium if (self.num_subscribers != original_num_subscribers or self.active_subscribers != original_active_subs or self.premium_subscribers != original_premium_subscribers or @@ -691,7 +737,7 @@ class Feed(models.Model): '' if self.num_subscribers == 1 else 's', self.feed_title, ), - + def _split_favicon_color(self): color = self.favicon_color if color: diff --git a/config/requirements.txt b/config/requirements.txt index 82a23a4a6..0a6869c83 100755 --- a/config/requirements.txt +++ b/config/requirements.txt @@ -36,7 +36,7 @@ python-gflags==2.0 pytz==2013b raven==3.1.17 readline==6.2.4.1 -redis==2.8.0 +redis==2.10.3 hiredis==0.1.1 requests==2.5.2 seacucumber==1.5