NewsBlur/utils/twitter_fetcher.py

486 lines
22 KiB
Python
Raw Normal View History

import re
import datetime
2022-06-21 16:07:12 -04:00
from jmespath import search
from urllib.parse import urlparse, parse_qs
import tweepy
import dateutil.parser
from qurl import qurl
from django.conf import settings
from django.utils import feedgenerator
from django.utils.html import linebreaks
from django.utils.dateformat import DateFormat
from apps.social.models import MSocialServices
from apps.reader.models import UserSubscription
from utils import log as logging
class TwitterFetcher:
def __init__(self, feed, options=None):
self.feed = feed
self.address = self.feed.feed_address
self.options = options or {}
def fetch(self, address=None):
2020-06-04 16:57:04 -04:00
data = {}
if not address:
address = self.feed.feed_address
self.address = address
twitter_user = None
2020-06-04 16:57:04 -04:00
if '/lists/' in address:
list_id = self.extract_list_id()
if not list_id:
return
2020-06-04 16:57:04 -04:00
tweets, list_info = self.fetch_list_timeline(list_id)
if not tweets:
return
data['title'] = "%s on Twitter" % list_info.full_name
data['link'] = "https://twitter.com%s" % list_info.uri
data['description'] = "%s on Twitter" % list_info.full_name
2022-06-21 16:07:12 -04:00
elif '/search' in address:
search_query = self.extract_search_query()
if not search_query:
return
tweets = self.fetch_search_query(search_query)
if not tweets:
return
data['title'] = "\"%s\" on Twitter" % search_query
data['link'] = "%s" % address
data['description'] = "Searching \"%s\" on Twitter" % search_query
2020-06-04 16:57:04 -04:00
else:
username = self.extract_username()
if not username:
logging.debug(u' ***> [%-30s] ~FRTwitter fetch failed: %s: No active user API access' %
(self.feed.log_title[:30], self.address))
2020-06-04 16:57:04 -04:00
return
2020-06-04 16:57:04 -04:00
twitter_user = self.fetch_user(username)
if not twitter_user:
return
tweets = self.user_timeline(twitter_user)
data['title'] = "%s on Twitter" % username
data['link'] = "https://twitter.com/%s" % username
data['description'] = "%s on Twitter" % username
data['lastBuildDate'] = datetime.datetime.utcnow()
data['generator'] = 'NewsBlur Twitter API Decrapifier - %s' % settings.NEWSBLUR_URL
data['docs'] = None
data['feed_url'] = address
rss = feedgenerator.Atom1Feed(**data)
for tweet in tweets:
story_data = self.tweet_story(tweet.__dict__)
rss.add_item(**story_data)
return rss.writeString('utf-8')
def extract_username(self):
username = None
try:
address = qurl(self.address, remove=['_'])
username_groups = re.search('twitter.com/(\w+)/?$', address)
if not username_groups:
return
username = username_groups.group(1)
except IndexError:
return
return username
2020-06-04 16:57:04 -04:00
def extract_list_id(self):
list_id = None
try:
list_groups = re.search('twitter.com/i/lists/(\w+)/?', self.address)
if not list_groups:
return
list_id = list_groups.group(1)
except IndexError:
return
return list_id
2022-06-21 16:07:12 -04:00
def extract_search_query(self):
search_query = None
address = qurl(self.address, remove=['_'])
query = urlparse(address).query
query_dict = parse_qs(query)
if 'q' in query_dict:
search_query = query_dict['q'][0]
return search_query
def twitter_api(self, include_social_services=False):
twitter_api = None
social_services = None
if self.options.get('requesting_user_id', None):
social_services = MSocialServices.get_user(self.options.get('requesting_user_id'))
try:
twitter_api = social_services.twitter_api()
2020-06-19 02:27:48 -04:00
except tweepy.error.TweepError as e:
logging.debug(' ***> [%-30s] ~FRTwitter fetch failed: %s: %s' %
(self.feed.log_title[:30], self.address, e))
return
else:
usersubs = UserSubscription.objects.filter(feed=self.feed)
if not usersubs:
2020-06-19 02:27:48 -04:00
logging.debug(' ***> [%-30s] ~FRTwitter fetch failed: %s: No subscriptions' %
(self.feed.log_title[:30], self.address))
return
for sub in usersubs:
social_services = MSocialServices.get_user(sub.user_id)
if not social_services.twitter_uid: continue
try:
twitter_api = social_services.twitter_api()
if not twitter_api:
continue
else:
break
2020-06-19 02:27:48 -04:00
except tweepy.error.TweepError as e:
logging.debug(' ***> [%-30s] ~FRTwitter fetch failed: %s: %s' %
(self.feed.log_title[:30], self.address, e))
continue
if not twitter_api:
2020-06-19 02:27:48 -04:00
logging.debug(' ***> [%-30s] ~FRTwitter fetch failed: %s: No twitter API for %s' %
(self.feed.log_title[:30], self.address, usersubs[0].user.username))
return
if include_social_services:
return twitter_api, social_services
2020-06-04 16:57:04 -04:00
return twitter_api
2021-06-14 14:22:27 -04:00
def disconnect_twitter(self):
_, social_services = self.twitter_api(include_social_services=True)
social_services.disconnect_twitter()
2020-06-04 16:57:04 -04:00
def fetch_user(self, username):
twitter_api = self.twitter_api()
if not twitter_api:
return
try:
twitter_user = twitter_api.get_user(username)
2020-06-19 02:27:48 -04:00
except TypeError as e:
logging.debug(' ***> [%-30s] ~FRTwitter fetch failed, disconnecting twitter: %s: %s' %
(self.feed.log_title[:30], self.address, e))
self.feed.save_feed_history(560, "Twitter Error: %s" % (e))
return
2020-06-19 02:27:48 -04:00
except tweepy.error.TweepError as e:
message = str(e).lower()
if 'suspended' in message:
2020-06-19 02:27:48 -04:00
logging.debug(' ***> [%-30s] ~FRTwitter user suspended, disconnecting twitter: %s: %s' %
(self.feed.log_title[:30], self.address, e))
2021-06-14 13:24:02 -04:00
self.feed.save_feed_history(562, "Twitter Error: User suspended")
# self.disconnect_twitter()
return
elif 'expired token' in message:
2020-06-19 02:27:48 -04:00
logging.debug(' ***> [%-30s] ~FRTwitter user expired, disconnecting twitter: %s: %s' %
(self.feed.log_title[:30], self.address, e))
2021-06-14 13:24:02 -04:00
self.feed.save_feed_history(563, "Twitter Error: Expired token")
self.disconnect_twitter()
return
elif 'not found' in message:
2020-06-19 02:27:48 -04:00
logging.debug(' ***> [%-30s] ~FRTwitter user not found, disconnecting twitter: %s: %s' %
(self.feed.log_title[:30], self.address, e))
2021-06-14 13:24:02 -04:00
self.feed.save_feed_history(564, "Twitter Error: User not found")
return
2018-07-16 10:51:37 -04:00
elif 'over capacity' in message or 'Max retries' in message:
2020-06-19 02:27:48 -04:00
logging.debug(' ***> [%-30s] ~FRTwitter over capacity, ignoring... %s: %s' %
(self.feed.log_title[:30], self.address, e))
self.feed.save_feed_history(460, "Twitter Error: Over capacity")
return
elif '503' in message:
logging.debug(' ***> [%-30s] ~FRTwitter throwing a 503, ignoring... %s: %s' %
(self.feed.log_title[:30], self.address, e))
self.feed.save_feed_history(463, "Twitter Error: Twitter's down")
return
else:
raise e
return twitter_user
def user_timeline(self, twitter_user):
try:
tweets = twitter_user.timeline(tweet_mode='extended')
2020-06-19 02:27:48 -04:00
except tweepy.error.TweepError as e:
message = str(e).lower()
if 'not authorized' in message:
2020-06-19 02:27:48 -04:00
logging.debug(' ***> [%-30s] ~FRTwitter timeline failed, disconnecting twitter: %s: %s' %
(self.feed.log_title[:30], self.address, e))
2021-06-14 13:24:02 -04:00
self.feed.save_feed_history(565, "Twitter Error: Not authorized")
2018-01-18 16:12:59 -08:00
return []
elif 'user not found' in message:
2020-06-19 02:27:48 -04:00
logging.debug(' ***> [%-30s] ~FRTwitter user not found, disconnecting twitter: %s: %s' %
(self.feed.log_title[:30], self.address, e))
2021-06-14 13:24:02 -04:00
self.feed.save_feed_history(566, "Twitter Error: User not found")
2020-06-04 16:57:04 -04:00
return []
elif '429' in message:
2020-06-19 02:27:48 -04:00
logging.debug(' ***> [%-30s] ~FRTwitter rate limited: %s: %s' %
2020-06-04 16:57:04 -04:00
(self.feed.log_title[:30], self.address, e))
2021-06-14 13:24:02 -04:00
self.feed.save_feed_history(567, "Twitter Error: Rate limited")
2020-06-04 16:57:04 -04:00
return []
elif 'blocked from viewing' in message:
2020-06-19 02:27:48 -04:00
logging.debug(' ***> [%-30s] ~FRTwitter user blocked, ignoring: %s' %
2020-06-04 16:57:04 -04:00
(self.feed.log_title[:30], e))
2021-06-14 13:24:02 -04:00
self.feed.save_feed_history(568, "Twitter Error: Blocked from viewing")
2020-06-04 16:57:04 -04:00
return []
2020-11-30 18:59:19 -05:00
elif 'over capacity' in message:
logging.debug(u' ***> [%-30s] ~FRTwitter over capacity, ignoring: %s' %
(self.feed.log_title[:30], e))
2021-06-14 13:24:02 -04:00
self.feed.save_feed_history(569, "Twitter Error: Over capacity")
2020-11-30 18:59:19 -05:00
return []
2020-06-04 16:57:04 -04:00
else:
raise e
if not tweets:
return []
return tweets
def fetch_list_timeline(self, list_id):
twitter_api = self.twitter_api()
if not twitter_api:
2020-06-24 09:34:49 -04:00
return None, None
2020-06-04 16:57:04 -04:00
try:
list_timeline = twitter_api.list_timeline(list_id=list_id, tweet_mode='extended')
2020-06-19 02:27:48 -04:00
except TypeError as e:
logging.debug(' ***> [%-30s] ~FRTwitter list fetch failed, disconnecting twitter: %s: %s' %
2020-06-04 16:57:04 -04:00
(self.feed.log_title[:30], self.address, e))
2021-06-14 13:24:02 -04:00
self.feed.save_feed_history(570, "Twitter Error: %s" % (e))
2020-06-24 09:34:49 -04:00
return None, None
2020-06-19 02:27:48 -04:00
except tweepy.error.TweepError as e:
2020-06-04 16:57:04 -04:00
message = str(e).lower()
if 'suspended' in message:
2020-06-19 02:27:48 -04:00
logging.debug(' ***> [%-30s] ~FRTwitter user suspended, disconnecting twitter: %s: %s' %
2020-06-04 16:57:04 -04:00
(self.feed.log_title[:30], self.address, e))
2021-06-14 13:24:02 -04:00
self.feed.save_feed_history(572, "Twitter Error: User suspended")
# self.disconnect_twitter()
2020-06-24 09:34:49 -04:00
return None, None
2020-06-04 16:57:04 -04:00
elif 'expired token' in message:
2020-06-19 02:27:48 -04:00
logging.debug(' ***> [%-30s] ~FRTwitter user expired, disconnecting twitter: %s: %s' %
2020-06-04 16:57:04 -04:00
(self.feed.log_title[:30], self.address, e))
2021-06-14 13:24:02 -04:00
self.feed.save_feed_history(573, "Twitter Error: Expired token")
self.disconnect_twitter()
2020-06-24 09:34:49 -04:00
return None, None
2020-06-04 16:57:04 -04:00
elif 'not found' in message:
2020-06-19 02:27:48 -04:00
logging.debug(' ***> [%-30s] ~FRTwitter user not found, disconnecting twitter: %s: %s' %
2020-06-04 16:57:04 -04:00
(self.feed.log_title[:30], self.address, e))
2021-06-14 13:24:02 -04:00
self.feed.save_feed_history(574, "Twitter Error: User not found")
2020-06-24 09:34:49 -04:00
return None, None
2020-06-04 16:57:04 -04:00
elif 'over capacity' in message or 'Max retries' in message:
2020-06-19 02:27:48 -04:00
logging.debug(' ***> [%-30s] ~FRTwitter over capacity, ignoring... %s: %s' %
2020-06-04 16:57:04 -04:00
(self.feed.log_title[:30], self.address, e))
2021-06-14 13:24:02 -04:00
self.feed.save_feed_history(470, "Twitter Error: Over capacity")
2020-06-24 09:34:49 -04:00
return None, None
2020-06-04 16:57:04 -04:00
else:
raise e
list_info = twitter_api.get_list(list_id=list_id)
if not list_timeline:
return [], list_info
return list_timeline, list_info
2022-06-21 16:07:12 -04:00
def fetch_search_query(self, search_query):
twitter_api = self.twitter_api()
if not twitter_api:
return None
try:
list_timeline = twitter_api.search(search_query, tweet_mode='extended')
except TypeError as e:
logging.debug(' ***> [%-30s] ~FRTwitter list fetch failed, disconnecting twitter: %s: %s' %
(self.feed.log_title[:30], self.address, e))
self.feed.save_feed_history(570, "Twitter Error: %s" % (e))
return None
except tweepy.error.TweepError as e:
message = str(e).lower()
if 'suspended' in message:
logging.debug(' ***> [%-30s] ~FRTwitter user suspended, disconnecting twitter: %s: %s' %
(self.feed.log_title[:30], self.address, e))
self.feed.save_feed_history(572, "Twitter Error: User suspended")
# self.disconnect_twitter()
return None
elif 'expired token' in message:
logging.debug(' ***> [%-30s] ~FRTwitter user expired, disconnecting twitter: %s: %s' %
(self.feed.log_title[:30], self.address, e))
self.feed.save_feed_history(573, "Twitter Error: Expired token")
self.disconnect_twitter()
return None
elif 'not found' in message:
logging.debug(' ***> [%-30s] ~FRTwitter user not found, disconnecting twitter: %s: %s' %
(self.feed.log_title[:30], self.address, e))
self.feed.save_feed_history(574, "Twitter Error: User not found")
return None
elif 'over capacity' in message or 'Max retries' in message:
logging.debug(' ***> [%-30s] ~FRTwitter over capacity, ignoring... %s: %s' %
(self.feed.log_title[:30], self.address, e))
self.feed.save_feed_history(470, "Twitter Error: Over capacity")
return None
else:
raise e
if not list_timeline:
return []
return list_timeline
def tweet_story(self, user_tweet):
categories = set()
if user_tweet['full_text'].startswith('RT @'):
categories.add('retweet')
elif user_tweet['in_reply_to_status_id'] or user_tweet['full_text'].startswith('@'):
categories.add('reply')
else:
categories.add('tweet')
if user_tweet['full_text'].startswith('RT @'):
categories.add('retweet')
if user_tweet['favorite_count']:
categories.add('liked')
if user_tweet['retweet_count']:
categories.add('retweeted')
if 'http' in user_tweet['full_text']:
categories.add('link')
story = {}
content_tweet = user_tweet
entities = ""
author = user_tweet.get('author') or user_tweet.get('user')
if not isinstance(author, dict): author = author.__dict__
author_screen_name = author['screen_name']
author_name = author['name']
author_fullname = "%s (%s)" % (author_name, author_screen_name) if author_screen_name != author_name else author_screen_name
original_author_screen_name = author_screen_name
if user_tweet['in_reply_to_user_id'] == author['id']:
categories.add('reply-to-self')
retweet_author = ""
tweet_link = "https://twitter.com/%s/status/%s" % (original_author_screen_name, user_tweet['id'])
if 'retweeted_status' in user_tweet:
retweet_author = """Retweeted by
<a href="https://twitter.com/%s"><img src="%s" style="height: 20px" /></a>
<a href="https://twitter.com/%s">%s</a>
on %s""" % (
author_screen_name,
author['profile_image_url_https'],
author_screen_name,
author_fullname,
DateFormat(user_tweet['created_at']).format('l, F jS, Y g:ia').replace('.',''),
)
content_tweet = user_tweet['retweeted_status'].__dict__
author = content_tweet['author']
if not isinstance(author, dict): author = author.__dict__
author_screen_name = author['screen_name']
author_name = author['name']
author_fullname = "%s (%s)" % (author_name, author_screen_name) if author_screen_name != author_name else author_screen_name
tweet_link = "https://twitter.com/%s/status/%s" % (author_screen_name, user_tweet['retweeted_status'].id)
tweet_title = user_tweet['full_text']
tweet_text = linebreaks(content_tweet['full_text'])
# Remove unserializable control characters
tweet_text = re.sub(r'[\x00-\x08\x0B-\x0C\x0E-\x1F]', '', tweet_text)
replaced = {}
entities_media = content_tweet['entities'].get('media', [])
if 'extended_entities' in content_tweet:
entities_media = content_tweet['extended_entities'].get('media', [])
for media in entities_media:
if 'media_url_https' not in media: continue
if media['type'] == 'photo':
if media.get('url') and media['url'] in tweet_text:
tweet_title = tweet_title.replace(media['url'], media['display_url'])
replacement = "<a href=\"%s\">%s</a>" % (media['expanded_url'], media['display_url'])
if not replaced.get(media['url']):
tweet_text = tweet_text.replace(media['url'], replacement)
replaced[media['url']] = True
entities += "<img src=\"%s\"> <hr>" % media['media_url_https']
categories.add('photo')
if media['type'] == 'video' or media['type'] == 'animated_gif':
2018-04-01 13:24:15 -07:00
if media.get('url') and media['url'] in tweet_text:
tweet_title = tweet_title.replace(media['url'], media['display_url'])
replacement = "<a href=\"%s\">%s</a>" % (media['expanded_url'], media['display_url'])
if not replaced.get(media['url']):
tweet_text = tweet_text.replace(media['url'], replacement)
replaced[media['url']] = True
bitrate = 0
chosen_variant = None
for variant in media['video_info']['variants']:
if not chosen_variant:
chosen_variant = variant
if variant.get('bitrate', 0) > bitrate:
bitrate = variant['bitrate']
chosen_variant = variant
if chosen_variant:
2020-11-09 19:29:58 -05:00
entities += "<video src=\"%s\" autoplay loop muted playsinline controls> <hr>" % chosen_variant['url']
categories.add(media['type'])
# Replace all shortened urls with their full urls
for url in content_tweet['entities'].get('urls', []):
if url['url'] in tweet_text:
replacement = "<a href=\"%s\">%s</a>" % (url['expanded_url'], url['display_url'])
if not replaced.get(url['url']):
tweet_text = tweet_text.replace(url['url'], replacement)
replaced[url['url']] = True
tweet_title = tweet_title.replace(url['url'], url['display_url'])
# Replace @username's with an <a> link
for word in re.findall("@\w+", tweet_text, re.MULTILINE):
replacement = "<a href=\"https://twitter.com/%s\">%s</a>" % (word[1:], word)
tweet_text = tweet_text.replace(word, replacement)
quote_tweet_content = ""
if 'quoted_status' in content_tweet:
quote_tweet_content = "<blockquote>"+self.tweet_story(content_tweet['quoted_status'].__dict__)['description']+"</blockquote>"
created_date = content_tweet['created_at']
2020-06-19 02:27:48 -04:00
if isinstance(created_date, str):
created_date = dateutil.parser.parse(created_date)
content = """<div class="NB-twitter-rss">
<div class="NB-twitter-rss-tweet">%s</div>
<div class="NB-twitter-rss-quote-tweet">%s</div>
<hr />
<div class="NB-twitter-rss-entities">%s</div>
<div class="NB-twitter-rss-author">
Posted by
<a href="https://twitter.com/%s"><img src="%s" style="height: 32px" /></a>
<a href="https://twitter.com/%s">%s</a>
on <a href="%s">%s</a></div>
<div class="NB-twitter-rss-retweet">%s</div>
<div class="NB-twitter-rss-stats">%s %s%s %s</div>
</div>""" % (
tweet_text,
quote_tweet_content,
entities,
author_screen_name,
author['profile_image_url_https'],
author_screen_name,
author_fullname,
tweet_link,
DateFormat(created_date).format('l, F jS, Y g:ia').replace('.',''),
retweet_author,
("<br /><br />" if content_tweet['favorite_count'] or content_tweet['retweet_count'] else ""),
("<b>%s</b> %s" % (content_tweet['favorite_count'], "like" if content_tweet['favorite_count'] == 1 else "likes")) if content_tweet['favorite_count'] else "",
(", " if content_tweet['favorite_count'] and content_tweet['retweet_count'] else ""),
("<b>%s</b> %s" % (content_tweet['retweet_count'], "retweet" if content_tweet['retweet_count'] == 1 else "retweets")) if content_tweet['retweet_count'] else "",
)
story = {
'title': tweet_title,
'link': "https://twitter.com/%s/status/%s" % (original_author_screen_name, user_tweet['id']),
'description': content,
'author_name': author_fullname,
'categories': list(categories),
'unique_id': "tweet:%s" % user_tweet['id'],
'pubdate': user_tweet['created_at'],
}
return story