diff --git a/apps/profile/tasks.py b/apps/profile/tasks.py index 70cdb043a..b50cb29f6 100644 --- a/apps/profile/tasks.py +++ b/apps/profile/tasks.py @@ -29,9 +29,9 @@ def FetchArchiveFeedsForUser(user_id): @app.task() -def FetchArchiveFeedsChunk(user_id, feed_ids): +def FetchArchiveFeedsChunk(feed_ids, user_id=None): # logging.debug(" ---> Fetching archive stories: %s for %s" % (feed_ids, user_id)) - UserSubscription.fetch_archive_feeds_chunk(user_id, feed_ids) + UserSubscription.fetch_archive_feeds_chunk(feed_ids, user_id) @app.task() diff --git a/apps/reader/models.py b/apps/reader/models.py index 6c89e888e..0b11d97c1 100644 --- a/apps/reader/models.py +++ b/apps/reader/models.py @@ -507,6 +507,9 @@ class UserSubscription(models.Model): feed.setup_feed_for_premium_subscribers(allow_skip_resync=allow_skip_resync) feed.count_subscribers() + if feed.archive_count: + feed.schedule_fetch_archive_feed() + r = redis.Redis(connection_pool=settings.REDIS_PUBSUB_POOL) r.publish(user.username, "reload:feeds") @@ -654,13 +657,15 @@ class UserSubscription(models.Model): celery.chord(search_chunks)(callback) @classmethod - def fetch_archive_feeds_chunk(cls, user_id, feed_ids): + def fetch_archive_feeds_chunk(cls, feed_ids, user_id=None): from apps.rss_feeds.models import Feed r = redis.Redis(connection_pool=settings.REDIS_PUBSUB_POOL) - user = User.objects.get(pk=user_id) - - logging.user(user, "~FCFetching archive stories from %s feeds..." % len(feed_ids)) + if user_id: + user = User.objects.get(pk=user_id) + logging.user(user, "~FCFetching archive stories from %s feeds..." % len(feed_ids)) + else: + logging.debug("~FCFetching archive stories from %s feeds..." % len(feed_ids)) for feed_id in feed_ids: feed = Feed.get_by_id(feed_id) @@ -669,7 +674,8 @@ class UserSubscription(models.Model): feed.fill_out_archive_stories() - r.publish(user.username, "fetch_archive:feeds:%s" % ",".join([str(f) for f in feed_ids])) + if user_id: + r.publish(user.username, "fetch_archive:feeds:%s" % ",".join([str(f) for f in feed_ids])) @classmethod def finish_fetch_archive_feeds(cls, user_id, start_time, starting_story_count): diff --git a/apps/rss_feeds/models.py b/apps/rss_feeds/models.py index a4eca1342..ac2eba2a1 100755 --- a/apps/rss_feeds/models.py +++ b/apps/rss_feeds/models.py @@ -715,6 +715,16 @@ class Feed(models.Model): self.set_next_scheduled_update(verbose=settings.DEBUG) self.sync_redis(allow_skip_resync=allow_skip_resync) + def schedule_fetch_archive_feed(self): + from apps.profile.tasks import FetchArchiveFeedsChunk + + logging.debug(f"~FC~SBScheduling fetch of archive feed ~SB{self.log_title}") + FetchArchiveFeedsChunk.apply_async( + kwargs=dict(feed_ids=[self.pk]), + queue="search_indexer", + time_limit=settings.MAX_SECONDS_ARCHIVE_FETCH_SINGLE_FEED, + ) + def check_feed_link_for_feed_address(self): @timelimit(10) def _1(): @@ -2850,8 +2860,8 @@ class MFeedPage(mongo.Document): data = zlib.decompress(page_data_z) except zlib.error as e: logging.debug(" ***> Zlib decompress error: %s" % e) - self.page_data = None - self.save() + feed_page.page_data = None + feed_page.save() return if not data: diff --git a/apps/search/models.py b/apps/search/models.py index 949c67542..653d8b6de 100644 --- a/apps/search/models.py +++ b/apps/search/models.py @@ -482,7 +482,7 @@ class SearchStory: try: result_ids = [r["_id"] for r in results["hits"]["hits"]] except Exception as e: - logging.info(' ---> ~FRInvalid search query "%s": %s' % (query, e)) + logging.info(' ---> ~FRInvalid more like this query "%s": %s' % (story_hash, e)) return [] return result_ids diff --git a/utils/youtube_fetcher.py b/utils/youtube_fetcher.py index 257082bf3..668fd5d1d 100644 --- a/utils/youtube_fetcher.py +++ b/utils/youtube_fetcher.py @@ -1,4 +1,5 @@ import datetime +import logging import re import urllib.error import urllib.parse @@ -14,7 +15,6 @@ from django.utils.html import linebreaks from apps.reader.models import UserSubscription from apps.social.models import MSocialServices from utils import json_functions as json -from utils import log as logging from utils.story_functions import linkify @@ -23,6 +23,7 @@ class YoutubeFetcher: self.feed = feed self.options = options or {} self.address = self.feed.feed_address + self._video_details_cache = {} # Cache for video details def fetch(self): username = self.extract_username(self.address) @@ -30,20 +31,26 @@ class YoutubeFetcher: list_id = self.extract_list_id(self.address) video_ids = None + # For archive pages, we want to fetch all pages up to the target page + target_page = self.options.get("archive_page", 1) + if channel_id: - video_ids, title, description = self.fetch_channel_videos(channel_id) + video_ids, title, description = self.fetch_channel_videos(channel_id, target_page=target_page) channel_url = "https://www.youtube.com/channel/%s" % channel_id elif list_id: - video_ids, title, description = self.fetch_playlist_videos(list_id) + video_ids, title, description = self.fetch_playlist_videos(list_id, target_page=target_page) channel_url = "https://www.youtube.com/playlist?list=%s" % list_id elif username: - video_ids, title, description = self.fetch_user_videos(username) + video_ids, title, description = self.fetch_user_videos(username, target_page=target_page) channel_url = "https://www.youtube.com/user/%s" % username if not video_ids: return videos = self.fetch_videos(video_ids) + if not videos: + return + data = {} if username: data["title"] = f"{username}'s YouTube Videos" @@ -77,12 +84,20 @@ class YoutubeFetcher: else: duration = "%s:%s" % (minutes, "{0:02d}".format(seconds)) duration = f"Duration: {duration}
" + + # Add view count if available + view_count = "" + if "statistics" in video and "viewCount" in video["statistics"]: + views = int(video["statistics"]["viewCount"]) + view_count = f"Views: {'{:,}'.format(views)}
" + content = """
From: %s
%s + %s

%s
""" % ( @@ -90,6 +105,7 @@ class YoutubeFetcher: channel_url, username or title, duration, + view_count, linkify(linebreaks(video["snippet"]["description"])), thumbnail["url"] if thumbnail else "", ) @@ -153,17 +169,48 @@ class YoutubeFetcher: return def fetch_videos(self, video_ids): - videos_json = requests.get( - "https://www.googleapis.com/youtube/v3/videos?part=contentDetails%%2Csnippet&id=%s&key=%s" - % (",".join(video_ids), settings.YOUTUBE_API_KEY) - ) - videos = json.decode(videos_json.content) - if "error" in videos: - logging.debug(" ***> ~FRYoutube returned an error: ~FM~SB%s" % (videos)) - return - return videos + """Fetch video details in batches of 50, using cache.""" + all_videos = {"items": []} + uncached_video_ids = [vid for vid in video_ids if vid not in self._video_details_cache] - def fetch_channel_videos(self, channel_id): + # Add cached videos first + cached_videos = [ + self._video_details_cache[vid] for vid in video_ids if vid in self._video_details_cache + ] + all_videos["items"].extend(cached_videos) + if cached_videos: + logging.debug(" ***> Using %d cached video details" % len(cached_videos)) + + # Split uncached video_ids into chunks of 50 + for i in range(0, len(uncached_video_ids), 50): + chunk = uncached_video_ids[i : i + 50] + videos_json = requests.get( + "https://www.googleapis.com/youtube/v3/videos?part=contentDetails%%2Csnippet%%2Cstatistics&id=%s&key=%s" + % (",".join(chunk), settings.YOUTUBE_API_KEY) + ) + videos = json.decode(videos_json.content) + if "error" in videos: + logging.debug( + " ***> ~FRYoutube returned an error for chunk %d-%d: ~FM~SB%s" % (i, i + 50, videos) + ) + continue + if "items" in videos: + # Cache the new video details + for video in videos["items"]: + self._video_details_cache[video["id"]] = video + all_videos["items"].extend(videos["items"]) + logging.debug( + " ***> Fetched details for %d videos (total: %d)" + % (len(videos["items"]), len(all_videos["items"])) + ) + + if not all_videos["items"]: + logging.debug(" ***> ~FRNo video details could be fetched") + return None + + return all_videos + + def fetch_channel_videos(self, channel_id, target_page=1): logging.debug(" ***> ~FBFetching YouTube channel: ~SB%s" % channel_id) channel_json = requests.get( "https://www.googleapis.com/youtube/v3/channels?part=snippet,contentDetails&id=%s&key=%s" @@ -178,10 +225,11 @@ class YoutubeFetcher: logging.debug(" ***> ~FRYoutube channel returned an error: ~FM~SB%s: %s" % (channel, e)) return None, None, None - return self.fetch_playlist_videos(uploads_list_id, title, description) + return self.fetch_playlist_videos(uploads_list_id, title, description, target_page=target_page) - def fetch_playlist_videos(self, list_id, title=None, description=None): - logging.debug(" ***> ~FBFetching YouTube playlist: ~SB%s" % list_id) + def fetch_playlist_videos(self, list_id, title=None, description=None, page_token=None, target_page=None): + """Fetch videos from a playlist.""" + logging.debug(" ***> ~FBFetching YouTube playlist: ~SB%s with page token: %s" % (list_id, page_token)) if not title and not description: playlist_json = requests.get( "https://www.googleapis.com/youtube/v3/playlists?part=snippet&id=%s&key=%s" @@ -194,19 +242,63 @@ class YoutubeFetcher: except (IndexError, KeyError): return None, None, None - playlist_json = requests.get( - "https://www.googleapis.com/youtube/v3/playlistItems?part=snippet&playlistId=%s&key=%s" - % (list_id, settings.YOUTUBE_API_KEY) - ) - playlist = json.decode(playlist_json.content) - try: - video_ids = [video["snippet"]["resourceId"]["videoId"] for video in playlist["items"]] - except (IndexError, KeyError): - return None, None, None + video_ids = [] + current_page_token = page_token + current_page = 1 + target_page = target_page or 1 # Default to 1 if target_page is None + while current_page <= target_page: + url = ( + "https://www.googleapis.com/youtube/v3/playlistItems?part=snippet&playlistId=%s&key=%s&maxResults=50" + % (list_id, settings.YOUTUBE_API_KEY) + ) + if current_page_token: + url += "&pageToken=%s" % current_page_token + + logging.debug( + " ---> [Playlist] Fetching videos from: %s (page %s/%s)" % (url, current_page, target_page) + ) + playlist_json = requests.get(url) + playlist = json.decode(playlist_json.content) + + if "error" in playlist: + logging.debug(" ---> [Playlist] Error fetching videos: %s" % playlist["error"]) + return None, None, None + + try: + page_video_ids = [video["snippet"]["resourceId"]["videoId"] for video in playlist["items"]] + video_ids.extend(page_video_ids) + logging.debug( + " ---> [Playlist] Found %s videos on page %s" % (len(page_video_ids), current_page) + ) + + current_page_token = playlist.get("nextPageToken") + if current_page == target_page or not current_page_token: + logging.debug( + " ---> [Playlist] %s at page %s" + % ( + ( + "Target page reached" + if current_page == target_page + else "No more pages available" + ), + current_page, + ) + ) + break + + current_page += 1 + + except (IndexError, KeyError): + logging.debug(" ---> [Playlist] Failed to extract video IDs from response") + return None, None, None + + logging.debug( + " ---> [Playlist] Retrieved total of %s videos across %s pages" % (len(video_ids), current_page) + ) return video_ids, title, description - def fetch_user_videos(self, username, username_key="forUsername"): + def fetch_user_videos(self, username, username_key="forUsername", target_page=1): logging.debug(" ***> ~FBFetching YouTube user: ~SB%s" % username) channel_json = requests.get( "https://www.googleapis.com/youtube/v3/channels?part=snippet,contentDetails&%s=%s&key=%s" @@ -222,7 +314,60 @@ class YoutubeFetcher: if not uploads_list_id: if username_key == "forUsername": - return self.fetch_user_videos(username, username_key="forHandle") + return self.fetch_user_videos(username, username_key="forHandle", target_page=target_page) return None, None, None - return self.fetch_playlist_videos(uploads_list_id, title, description) + return self.fetch_playlist_videos(uploads_list_id, title, description, target_page=target_page) + + def get_next_page_token(self, channel_id=None, list_id=None, username=None, page_token=None): + """Get the next page token for pagination.""" + if channel_id: + channel_json = requests.get( + "https://www.googleapis.com/youtube/v3/channels?part=snippet,contentDetails&id=%s&key=%s" + % (channel_id, settings.YOUTUBE_API_KEY) + ) + channel = json.decode(channel_json.content) + try: + uploads_list_id = channel["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"] + return self._get_playlist_page_token(uploads_list_id, page_token) + except (IndexError, KeyError): + return None + elif list_id: + return self._get_playlist_page_token(list_id, page_token) + elif username: + channel_json = requests.get( + "https://www.googleapis.com/youtube/v3/channels?part=contentDetails&forUsername=%s&key=%s" + % (username, settings.YOUTUBE_API_KEY) + ) + channel = json.decode(channel_json.content) + try: + uploads_list_id = channel["items"][0]["contentDetails"]["relatedPlaylists"]["uploads"] + return self._get_playlist_page_token(uploads_list_id, page_token) + except (IndexError, KeyError): + return None + return None + + def _get_playlist_page_token(self, list_id, page_token=None): + """Helper method to get next page token for a playlist.""" + url = ( + "https://www.googleapis.com/youtube/v3/playlistItems?part=snippet&playlistId=%s&key=%s&maxResults=50" + % ( + list_id, + settings.YOUTUBE_API_KEY, + ) + ) + if page_token: + url += "&pageToken=%s" % page_token + + logging.debug(" ---> [Playlist] Fetching next page token from: %s" % url) + playlist_json = requests.get(url) + playlist = json.decode(playlist_json.content) + + next_token = playlist.get("nextPageToken") + logging.debug(" ---> [Playlist] Next page token: %s" % next_token) + + if "error" in playlist: + logging.debug(" ---> [Playlist] Error getting next page token: %s" % playlist["error"]) + return None + + return next_token