NewsBlur/apps/search/models.py

775 lines
28 KiB
Python
Raw Normal View History

2024-04-24 09:50:42 -04:00
import datetime
import html
2016-01-07 19:16:39 -08:00
import re
import time
2024-04-24 09:50:42 -04:00
import celery
import elasticsearch
2024-04-24 09:50:42 -04:00
import mongoengine as mongo
import numpy as np
2024-04-24 09:50:42 -04:00
import pymongo
import redis
import urllib3
from django.conf import settings
from django.contrib.auth.models import User
from sentence_transformers import SentenceTransformer
2024-04-24 09:50:42 -04:00
from apps.search.tasks import (
FinishIndexSubscriptionsForSearch,
IndexFeedsForSearch,
IndexSubscriptionsChunkForSearch,
IndexSubscriptionsForSearch,
)
2012-12-21 15:05:38 -08:00
from utils import log as logging
from utils.feed_functions import chunks
2024-04-24 09:43:56 -04:00
class MUserSearch(mongo.Document):
2024-04-24 09:43:56 -04:00
"""Search index state of a user's subscriptions."""
user_id = mongo.IntField(unique=True)
last_search_date = mongo.DateTimeField()
subscriptions_indexed = mongo.BooleanField()
subscriptions_indexing = mongo.BooleanField()
meta = {
2024-04-24 09:43:56 -04:00
"collection": "user_search",
"indexes": ["user_id"],
"allow_inheritance": False,
}
2024-04-24 09:43:56 -04:00
@classmethod
def get_user(cls, user_id, create=True):
try:
2024-04-24 09:43:56 -04:00
user_search = cls.objects.read_preference(pymongo.ReadPreference.PRIMARY).get(user_id=user_id)
except cls.DoesNotExist:
if create:
user_search = cls.objects.create(user_id=user_id)
else:
user_search = None
2024-04-24 09:43:56 -04:00
return user_search
2024-04-24 09:43:56 -04:00
def touch_search_date(self):
2014-04-23 15:05:47 -07:00
if not self.subscriptions_indexed and not self.subscriptions_indexing:
self.schedule_index_subscriptions_for_search()
self.subscriptions_indexing = True
self.last_search_date = datetime.datetime.now()
self.save()
def schedule_index_subscriptions_for_search(self):
2024-04-24 09:43:56 -04:00
IndexSubscriptionsForSearch.apply_async(kwargs=dict(user_id=self.user_id), queue="search_indexer")
# Should be run as a background task
def index_subscriptions_for_search(self):
from apps.reader.models import UserSubscription
2024-04-24 09:50:42 -04:00
from apps.rss_feeds.models import Feed
2024-04-24 09:43:56 -04:00
SearchStory.create_elasticsearch_mapping()
2024-04-24 09:43:56 -04:00
start = time.time()
user = User.objects.get(pk=self.user_id)
r = redis.Redis(connection_pool=settings.REDIS_PUBSUB_POOL)
2024-04-24 09:43:56 -04:00
r.publish(user.username, "search_index_complete:start")
subscriptions = UserSubscription.objects.filter(user=user).only("feed")
total = subscriptions.count()
2024-04-24 09:43:56 -04:00
feed_ids = []
for sub in subscriptions:
try:
feed_ids.append(sub.feed.pk)
except Feed.DoesNotExist:
continue
2024-04-24 09:43:56 -04:00
feed_id_chunks = [c for c in chunks(feed_ids, 6)]
2024-04-24 09:43:56 -04:00
logging.user(user, "~FCIndexing ~SB%s feeds~SN in %s chunks..." % (total, len(feed_id_chunks)))
search_chunks = [
IndexSubscriptionsChunkForSearch.s(feed_ids=feed_id_chunk, user_id=self.user_id).set(
queue="search_indexer"
)
for feed_id_chunk in feed_id_chunks
]
callback = FinishIndexSubscriptionsForSearch.s(user_id=self.user_id, start=start).set(
queue="search_indexer"
)
celery.chord(search_chunks)(callback)
def finish_index_subscriptions_for_search(self, start):
from apps.reader.models import UserSubscription
2024-04-24 09:43:56 -04:00
r = redis.Redis(connection_pool=settings.REDIS_PUBSUB_POOL)
user = User.objects.get(pk=self.user_id)
2024-04-24 09:43:56 -04:00
subscriptions = UserSubscription.objects.filter(user=user).only("feed")
total = subscriptions.count()
duration = time.time() - start
2024-04-24 09:43:56 -04:00
logging.user(user, "~FCIndexed ~SB%s feeds~SN in ~FM~SB%s~FC~SN sec." % (total, round(duration, 2)))
r.publish(user.username, "search_index_complete:done")
self.subscriptions_indexed = True
self.subscriptions_indexing = False
self.save()
2024-04-24 09:43:56 -04:00
def index_subscriptions_chunk_for_search(self, feed_ids):
from apps.rss_feeds.models import Feed
2024-04-24 09:43:56 -04:00
r = redis.Redis(connection_pool=settings.REDIS_PUBSUB_POOL)
user = User.objects.get(pk=self.user_id)
logging.user(user, "~FCIndexing %s feeds..." % len(feed_ids))
for feed_id in feed_ids:
feed = Feed.get_by_id(feed_id)
2024-04-24 09:43:56 -04:00
if not feed:
continue
feed.index_stories_for_search()
2024-04-24 09:43:56 -04:00
r.publish(user.username, "search_index_complete:feeds:%s" % ",".join([str(f) for f in feed_ids]))
@classmethod
def schedule_index_feeds_for_search(cls, feed_ids, user_id):
user_search = cls.get_user(user_id, create=False)
2024-04-24 09:43:56 -04:00
if not user_search or not user_search.subscriptions_indexed or user_search.subscriptions_indexing:
# User hasn't searched before.
return
2024-04-24 09:43:56 -04:00
if not isinstance(feed_ids, list):
feed_ids = [feed_ids]
2024-04-24 09:43:56 -04:00
IndexFeedsForSearch.apply_async(
kwargs=dict(feed_ids=feed_ids, user_id=user_id), queue="search_indexer"
)
@classmethod
def index_feeds_for_search(cls, feed_ids, user_id):
from apps.rss_feeds.models import Feed
2024-04-24 09:43:56 -04:00
user = User.objects.get(pk=user_id)
logging.user(user, "~SB~FCIndexing %s~FC by request..." % feed_ids)
for feed_id in feed_ids:
feed = Feed.get_by_id(feed_id)
2024-04-24 09:43:56 -04:00
if not feed:
continue
feed.index_stories_for_search()
2024-04-24 09:43:56 -04:00
@classmethod
def remove_all(cls, drop_index=False):
# You only need to drop the index if there is data you want to clear.
# A new search server won't need this, as there isn't anything to drop.
if drop_index:
logging.info(" ---> ~FRRemoving stories search index...")
SearchStory.drop()
2024-04-24 09:43:56 -04:00
user_searches = cls.objects.all()
logging.info(" ---> ~SN~FRRemoving ~SB%s~SN user searches..." % user_searches.count())
for user_search in user_searches:
try:
user_search.remove()
2020-06-15 03:35:08 -04:00
except Exception as e:
print(" ****> Error on search removal: %s" % e)
2024-04-24 09:43:56 -04:00
def remove(self):
from apps.reader.models import UserSubscription
2024-04-24 09:50:42 -04:00
from apps.rss_feeds.models import Feed
user = User.objects.get(pk=self.user_id)
2015-07-30 13:13:28 -07:00
subscriptions = UserSubscription.objects.filter(user=self.user_id)
total = subscriptions.count()
removed = 0
2024-04-24 09:43:56 -04:00
for sub in subscriptions:
try:
feed = sub.feed
except Feed.DoesNotExist:
continue
2015-07-30 13:10:42 -07:00
if not feed.search_indexed:
continue
feed.search_indexed = False
feed.save()
removed += 1
2024-04-24 09:43:56 -04:00
logging.user(
user,
"~FCRemoved ~SB%s/%s feed's search indexes~SN for ~SB~FB%s~FC~SN."
% (removed, total, user.username),
)
self.delete()
2024-04-24 09:43:56 -04:00
2014-04-11 15:40:58 -07:00
class SearchStory:
_es_client = None
2014-04-11 15:40:58 -07:00
name = "stories"
2024-04-24 09:43:56 -04:00
@classmethod
def ES(cls):
if cls._es_client is None:
cls._es_client = elasticsearch.Elasticsearch(settings.ELASTICSEARCH_STORY_HOST)
cls.create_elasticsearch_mapping()
return cls._es_client
2024-04-24 09:43:56 -04:00
2014-04-11 15:40:58 -07:00
@classmethod
def index_name(cls):
return "%s-index" % cls.name
2024-04-24 09:43:56 -04:00
@classmethod
def doc_type(cls):
2024-04-24 09:43:56 -04:00
if settings.DOCKERBUILD or getattr(settings, "ES_IGNORE_TYPE", True):
return None
return "%s-type" % cls.name
2024-04-24 09:43:56 -04:00
@classmethod
def create_elasticsearch_mapping(cls, delete=False):
if delete:
logging.debug(" ---> ~FRDeleting search index for ~FM%s" % cls.index_name())
try:
cls.ES().indices.delete(cls.index_name())
except elasticsearch.exceptions.NotFoundError:
logging.debug(f" ---> ~FBCan't delete {cls.index_name()} index, doesn't exist...")
if cls.ES().indices.exists(cls.index_name()):
return
2024-04-24 09:43:56 -04:00
try:
cls.ES().indices.create(cls.index_name())
logging.debug(" ---> ~FCCreating search index for ~FM%s" % cls.index_name())
except elasticsearch.exceptions.RequestError as e:
logging.debug(" ***> ~FRCould not create search index for ~FM%s: %s" % (cls.index_name(), e))
return
2024-04-24 09:43:56 -04:00
except (
elasticsearch.exceptions.ConnectionError,
urllib3.exceptions.NewConnectionError,
urllib3.exceptions.ConnectTimeoutError,
) as e:
logging.debug(f" ***> ~FRNo search server available for creating story mapping: {e}")
return
2021-08-13 12:07:02 -04:00
mapping = {
2024-04-24 09:43:56 -04:00
"title": {
"store": False,
"type": "text",
"analyzer": "snowball",
2021-08-13 12:07:02 -04:00
"term_vector": "yes",
},
2024-04-24 09:43:56 -04:00
"content": {
"store": False,
"type": "text",
"analyzer": "snowball",
2021-08-13 12:07:02 -04:00
"term_vector": "yes",
},
2024-04-24 09:43:56 -04:00
"tags": {
"store": False,
2021-08-13 12:07:02 -04:00
"type": "text",
2024-04-24 09:43:56 -04:00
"fields": {"raw": {"type": "text", "analyzer": "keyword", "term_vector": "yes"}},
2014-04-22 15:15:42 -07:00
},
2024-04-24 09:43:56 -04:00
"author": {
"store": False,
"type": "text",
"analyzer": "default",
},
2024-04-24 09:43:56 -04:00
"feed_id": {"store": False, "type": "integer"},
"date": {
"store": False,
"type": "date",
},
}
2024-04-24 09:43:56 -04:00
cls.ES().indices.put_mapping(
body={
"properties": mapping,
},
index=cls.index_name(),
)
cls.ES().indices.flush(cls.index_name())
@classmethod
2024-04-24 09:43:56 -04:00
def index(
cls, story_hash, story_title, story_content, story_tags, story_author, story_feed_id, story_date
):
cls.create_elasticsearch_mapping()
doc = {
2021-08-13 12:07:02 -04:00
"content": story_content,
"title": story_title,
2024-04-24 09:43:56 -04:00
"tags": ", ".join(story_tags),
2021-08-13 12:07:02 -04:00
"author": story_author,
"feed_id": story_feed_id,
"date": story_date,
}
try:
2024-04-24 09:43:56 -04:00
cls.ES().create(index=cls.index_name(), id=story_hash, body=doc, doc_type=cls.doc_type())
except (elasticsearch.exceptions.ConnectionError, urllib3.exceptions.NewConnectionError) as e:
logging.debug(f" ***> ~FRNo search server available for story indexing: {e}")
2021-04-27 15:14:40 -04:00
except elasticsearch.exceptions.ConflictError as e:
logging.debug(f" ***> ~FBAlready indexed story: {e}")
# if settings.DEBUG:
# logging.debug(f" ***> ~FBIndexed {story_hash}")
2021-04-27 15:14:40 -04:00
@classmethod
def remove(cls, story_hash):
if not cls.ES().exists(index=cls.index_name(), id=story_hash, doc_type=cls.doc_type()):
return
2021-08-13 12:07:02 -04:00
try:
cls.ES().delete(index=cls.index_name(), id=story_hash, doc_type=cls.doc_type())
except elasticsearch.exceptions.NotFoundError:
2024-04-24 09:43:56 -04:00
cls.ES().delete(index=cls.index_name(), id=story_hash, doc_type="story-type")
except elasticsearch.exceptions.NotFoundError as e:
logging.debug(f" ***> ~FRNo search server available for story deletion: {e}")
2024-04-24 09:43:56 -04:00
@classmethod
def drop(cls):
try:
cls.ES().indices.delete(cls.index_name())
except elasticsearch.exceptions.NotFoundError:
logging.debug(" ***> ~FBNo index found, nothing to drop.")
@classmethod
2017-01-08 19:45:53 -08:00
def query(cls, feed_ids, query, order, offset, limit, strip=False):
try:
cls.ES().indices.flush(cls.index_name())
except elasticsearch.exceptions.NotFoundError as e:
logging.debug(f" ***> ~FRNo search server available: {e}")
return []
2024-04-24 09:43:56 -04:00
2017-01-08 19:45:53 -08:00
if strip:
2024-04-24 09:43:56 -04:00
query = re.sub(r"([^\s\w_\-])+", " ", query) # Strip non-alphanumeric
query = html.unescape(query)
body = {
"query": {
"bool": {
"must": [
2024-04-24 09:43:56 -04:00
{"query_string": {"query": query, "default_operator": "AND"}},
{"terms": {"feed_id": feed_ids[:2000]}},
]
}
},
2024-04-24 09:43:56 -04:00
"sort": [{"date": {"order": "desc" if order == "newest" else "asc"}}],
"from": offset,
"size": limit,
}
try:
2024-04-24 09:43:56 -04:00
results = cls.ES().search(body=body, index=cls.index_name(), doc_type=cls.doc_type())
except elasticsearch.exceptions.RequestError as e:
logging.debug(" ***> ~FRNo search server available for querying: %s" % e)
return []
# s = elasticsearch_dsl.Search(using=cls.ES(), index=cls.index_name())
# string_q = elasticsearch_dsl.Q('query_string', query=query, default_operator="AND")
# feed_q = elasticsearch_dsl.Q('terms', feed_id=feed_ids[:2000])
# search_q = string_q & feed_q
# s = s.query(search_q)
# s = s.sort(sort)[offset:offset+limit]
# results = s.execute()
# string_q = pyes.query.QueryStringQuery(query, default_operator="AND")
# feed_q = pyes.query.TermsQuery('feed_id', feed_ids[:2000])
# q = pyes.query.BoolQuery(must=[string_q, feed_q])
# try:
# results = cls.ES().search(q, indices=cls.index_name(),
# partial_fields={}, sort=sort, start=offset, size=limit)
# except elasticsearch.exceptions.ConnectionError:
# logging.debug(" ***> ~FRNo search server available.")
# return []
2024-04-24 09:43:56 -04:00
logging.info(
" ---> ~FG~SNSearch ~FCstories~FG for: ~SB%s~SN, ~SB%s~SN results (across %s feed%s)"
% (query, len(results["hits"]["hits"]), len(feed_ids), "s" if len(feed_ids) != 1 else "")
)
2016-05-26 14:52:42 -07:00
try:
2024-04-24 09:43:56 -04:00
result_ids = [r["_id"] for r in results["hits"]["hits"]]
except Exception as e:
2024-04-24 09:43:56 -04:00
logging.info(' ---> ~FRInvalid search query "%s": %s' % (query, e))
2016-05-26 14:52:42 -07:00
return []
2024-04-24 09:43:56 -04:00
2016-05-26 14:52:42 -07:00
return result_ids
2024-04-24 09:43:56 -04:00
2016-10-06 14:37:28 -07:00
@classmethod
2017-01-08 19:45:53 -08:00
def global_query(cls, query, order, offset, limit, strip=False):
2016-10-06 14:37:28 -07:00
cls.create_elasticsearch_mapping()
cls.ES().indices.flush()
2024-04-24 09:43:56 -04:00
2017-01-08 19:45:53 -08:00
if strip:
2024-04-24 09:43:56 -04:00
query = re.sub(r"([^\s\w_\-])+", " ", query) # Strip non-alphanumeric
query = html.unescape(query)
body = {
"query": {
"bool": {
"must": [
2024-04-24 09:43:56 -04:00
{"query_string": {"query": query, "default_operator": "AND"}},
]
}
},
2024-04-24 09:43:56 -04:00
"sort": [{"date": {"order": "desc" if order == "newest" else "asc"}}],
"from": offset,
"size": limit,
}
try:
2024-04-24 09:43:56 -04:00
results = cls.ES().search(body=body, index=cls.index_name(), doc_type=cls.doc_type())
except elasticsearch.exceptions.RequestError as e:
logging.debug(" ***> ~FRNo search server available for querying: %s" % e)
return []
2024-04-24 09:43:56 -04:00
# sort = "date:desc" if order == "newest" else "date:asc"
# string_q = pyes.query.QueryStringQuery(query, default_operator="AND")
# try:
# results = cls.ES().search(string_q, indices=cls.index_name(),
# partial_fields={}, sort=sort, start=offset, size=limit)
# except elasticsearch.exceptions.ConnectionError:
# logging.debug(" ***> ~FRNo search server available.")
# return []
2016-10-06 14:37:28 -07:00
2024-04-24 09:43:56 -04:00
logging.info(" ---> ~FG~SNSearch ~FCstories~FG for: ~SB%s~SN (across all feeds)" % (query))
2016-10-06 14:37:28 -07:00
try:
2024-04-24 09:43:56 -04:00
result_ids = [r["_id"] for r in results["hits"]["hits"]]
except Exception as e:
2024-04-24 09:43:56 -04:00
logging.info(' ---> ~FRInvalid search query "%s": %s' % (query, e))
2016-10-06 14:37:28 -07:00
return []
2024-04-24 09:43:56 -04:00
2016-10-06 14:37:28 -07:00
return result_ids
2024-04-24 09:43:56 -04:00
@classmethod
def more_like_this(cls, feed_ids, story_hash, order, offset, limit):
try:
cls.ES().indices.flush(cls.index_name())
except elasticsearch.exceptions.NotFoundError as e:
logging.debug(f" ***> ~FRNo search server available: {e}")
return []
2024-04-24 09:43:56 -04:00
body = {
"query": {
"bool": {
2024-04-24 09:43:56 -04:00
"filter": [
{
"more_like_this": {
"fields": ["title", "content"],
"like": [
{
"_index": cls.index_name(),
"_id": story_hash,
}
],
"min_term_freq": 3,
"min_doc_freq": 2,
"min_word_length": 4,
},
},
2024-04-24 09:43:56 -04:00
{"terms": {"feed_id": feed_ids[:2000]}},
],
}
},
2024-04-24 09:43:56 -04:00
"sort": [{"date": {"order": "desc" if order == "newest" else "asc"}}],
"from": offset,
"size": limit,
}
try:
2024-04-24 09:43:56 -04:00
results = cls.ES().search(body=body, index=cls.index_name(), doc_type=cls.doc_type())
except elasticsearch.exceptions.RequestError as e:
logging.debug(" ***> ~FRNo search server available for querying: %s" % e)
return []
2024-04-24 09:43:56 -04:00
logging.info(
" ---> ~FG~SNMore like this ~FCstories~FG for: ~SB%s~SN, ~SB%s~SN results (across %s feed%s)"
% (story_hash, len(results["hits"]["hits"]), len(feed_ids), "s" if len(feed_ids) != 1 else "")
)
try:
2024-04-24 09:43:56 -04:00
result_ids = [r["_id"] for r in results["hits"]["hits"]]
except Exception as e:
2024-04-24 09:43:56 -04:00
logging.info(' ---> ~FRInvalid search query "%s": %s' % (query, e))
return []
2024-04-24 09:43:56 -04:00
return result_ids
2013-01-04 16:34:27 -08:00
class SearchFeed:
_es_client = None
2013-01-04 16:34:27 -08:00
name = "feeds"
model = None
@classmethod
def ES(cls):
if cls._es_client is None:
cls._es_client = elasticsearch.Elasticsearch(settings.ELASTICSEARCH_FEED_HOST)
cls.create_elasticsearch_mapping()
return cls._es_client
2024-04-24 09:43:56 -04:00
2014-04-11 15:40:58 -07:00
@classmethod
def index_name(cls):
# feeds-index
2014-04-11 15:40:58 -07:00
return "%s-index" % cls.name
2024-04-24 09:43:56 -04:00
2021-04-27 20:10:21 -04:00
@classmethod
def doc_type(cls):
2024-04-24 09:43:56 -04:00
if settings.DOCKERBUILD or getattr(settings, "ES_IGNORE_TYPE", True):
return None
2021-04-27 20:10:21 -04:00
return "%s-type" % cls.name
2024-04-24 09:43:56 -04:00
2013-01-04 16:34:27 -08:00
@classmethod
def create_elasticsearch_mapping(cls, delete=False):
if delete:
logging.debug(" ---> ~FRDeleting search index for ~FM%s" % cls.index_name())
try:
cls.ES().indices.delete(cls.index_name())
except elasticsearch.exceptions.NotFoundError:
logging.debug(f" ---> ~FBCan't delete {cls.index_name()} index, doesn't exist...")
2015-07-30 12:47:44 -07:00
if cls.ES().indices.exists(cls.index_name()):
return
index_settings = {
2024-04-24 09:43:56 -04:00
"index": {
"analysis": {
"analyzer": {
"edgengram_analyzer": {
"filter": ["edgengram_analyzer"],
"tokenizer": "lowercase",
2024-04-24 09:43:56 -04:00
"type": "custom",
},
},
"filter": {
2024-04-24 09:43:56 -04:00
"edgengram_analyzer": {"max_gram": "15", "min_gram": "1", "type": "edge_ngram"},
},
2014-04-11 16:04:30 -07:00
}
}
}
try:
cls.ES().indices.create(cls.index_name(), body={"settings": index_settings})
logging.debug(" ---> ~FCCreating search index for ~FM%s" % cls.index_name())
except elasticsearch.exceptions.RequestError as e:
logging.debug(" ***> ~FRCould not create search index for ~FM%s: %s" % (cls.index_name(), e))
return
2024-04-24 09:43:56 -04:00
except (
elasticsearch.exceptions.ConnectionError,
urllib3.exceptions.NewConnectionError,
urllib3.exceptions.ConnectTimeoutError,
) as e:
logging.debug(f" ***> ~FRNo search server available for creating feed mapping: {e}")
return
2024-04-24 09:43:56 -04:00
2014-04-11 16:04:30 -07:00
mapping = {
"feed_address": {
2024-04-24 09:43:56 -04:00
"analyzer": "snowball",
"store": False,
"term_vector": "with_positions_offsets",
2024-04-24 09:43:56 -04:00
"type": "text",
2013-01-04 16:34:27 -08:00
},
2024-04-24 09:43:56 -04:00
"feed_id": {"store": True, "type": "text"},
"num_subscribers": {"store": True, "type": "long"},
2014-04-11 16:04:30 -07:00
"title": {
"analyzer": "snowball",
"store": False,
"term_vector": "with_positions_offsets",
2024-04-24 09:43:56 -04:00
"type": "text",
},
"link": {
"analyzer": "snowball",
"store": False,
"term_vector": "with_positions_offsets",
2024-04-24 09:43:56 -04:00
"type": "text",
},
"content_vector": {
"type": "dense_vector",
"dims": 384, # Numbers of dims from all-MiniLM-L6-v2
},
2013-01-04 16:34:27 -08:00
}
2024-04-24 09:43:56 -04:00
cls.ES().indices.put_mapping(
body={
"properties": mapping,
},
index=cls.index_name(),
)
cls.ES().indices.flush(cls.index_name())
2013-01-04 16:34:27 -08:00
@classmethod
def index(cls, feed_id, title, address, link, num_subscribers, content_vector):
2013-01-04 16:34:27 -08:00
doc = {
"feed_id": feed_id,
"title": title,
"feed_address": address,
"link": link,
"num_subscribers": num_subscribers,
"content_vector": content_vector,
2013-01-04 16:34:27 -08:00
}
try:
cls.ES().create(index=cls.index_name(), id=feed_id, body=doc, doc_type=cls.doc_type())
2024-04-24 09:43:56 -04:00
except (elasticsearch.exceptions.ConnectionError, urllib3.exceptions.NewConnectionError) as e:
logging.debug(f" ***> ~FRNo search server available for feed indexing: {e}")
@classmethod
def drop(cls):
try:
cls.ES().indices.delete(cls.index_name())
except elasticsearch.exceptions.NotFoundError:
logging.debug(" ***> ~FBNo index found, nothing to drop.")
2013-01-04 16:34:27 -08:00
@classmethod
def query(cls, text, max_subscribers=5):
try:
cls.ES().indices.flush(index=cls.index_name())
except elasticsearch.exceptions.NotFoundError as e:
logging.debug(f" ***> ~FRNo search server available: {e}")
return []
if settings.DEBUG:
max_subscribers = 1
2024-04-24 09:43:56 -04:00
body = {
"query": {
"bool": {
"should": [
2024-04-24 09:43:56 -04:00
{
"match": {
"address": {
"query": text,
"cutoff_frequency": "0.0005",
"minimum_should_match": "75%",
}
}
},
{
"match": {
"title": {
"query": text,
"cutoff_frequency": "0.0005",
"minimum_should_match": "75%",
}
}
},
{
"match": {
"link": {
"query": text,
"cutoff_frequency": "0.0005",
"minimum_should_match": "75%",
}
}
},
]
}
},
2024-04-24 09:43:56 -04:00
"sort": [{"num_subscribers": {"order": "desc"}}],
}
try:
2024-04-24 09:43:56 -04:00
results = cls.ES().search(body=body, index=cls.index_name(), doc_type=cls.doc_type())
except elasticsearch.exceptions.RequestError as e:
logging.debug(" ***> ~FRNo search server available for querying: %s" % e)
return []
# s = elasticsearch_dsl.Search(using=cls.ES(), index=cls.index_name())
# address = elasticsearch_dsl.Q('match', address=text)
# link = elasticsearch_dsl.Q('match', link=text)
# title = elasticsearch_dsl.Q('match', title=text)
# search_q = address | link | title
# s = s.query(search_q).extra(cutoff_frequency="0.0005", minimum_should_match="75%")
# s = s.sort("-num_subscribers")
# body = s.to_dict()
# print(f"Before: {body}")
# results = s.execute()
# q = pyes.query.BoolQuery()
# q.add_should(pyes.query.MatchQuery('address', text, analyzer="simple", cutoff_frequency=0.0005, minimum_should_match="75%"))
# q.add_should(pyes.query.MatchQuery('link', text, analyzer="simple", cutoff_frequency=0.0005, minimum_should_match="75%"))
# q.add_should(pyes.query.MatchQuery('title', text, analyzer="simple", cutoff_frequency=0.0005, minimum_should_match="75%"))
# q = pyes.Search(q, min_score=1)
# results = cls.ES().search(query=q, size=max_subscribers, sort="num_subscribers:desc")
2024-04-24 09:43:56 -04:00
logging.info(
"~FGSearch ~FCfeeds~FG: ~SB%s~SN, ~SB%s~SN results" % (text, len(results["hits"]["hits"]))
)
return results["hits"]["hits"]
@classmethod
def vector_query(cls, query_vector, max_results=10):
try:
cls.ES().indices.flush(index=cls.index_name())
except elasticsearch.exceptions.NotFoundError as e:
logging.debug(f" ***> ~FRNo search server available: {e}")
return []
body = {
"query": {
"script_score": {
"query": {"match_all": {}},
"script": {
"source": "cosineSimilarity(params.query_vector, 'content_vector') + 1.0",
"params": {"query_vector": query_vector},
},
}
},
"size": max_results,
}
try:
results = cls.ES().search(body=body, index=cls.index_name(), doc_type=cls.doc_type())
except elasticsearch.exceptions.RequestError as e:
logging.debug(" ***> ~FRNo search server available for querying: %s" % e)
return []
logging.info(
f"~FGVector search ~FCfeeds~FG: ~SB{max_results}~SN requested, ~SB{len(results['hits']['hits'])}~SN results"
)
return results["hits"]["hits"]
@classmethod
def generate_feed_content_vector(cls, feed_id, text=None):
from apps.rss_feeds.models import Feed
if cls.model is None:
cls.model = SentenceTransformer("all-MiniLM-L6-v2")
if text is None:
feed = Feed.objects.get(id=feed_id)
# cross_encoder = CrossEncoder("BAAI/bge-large-zh-v2", device="cpu")
# cross_encoder.encode([feed.feed_title, feed.feed_content], convert_to_tensors="all")
stories = feed.get_stories()
stories_text = ""
for story in stories:
stories_text += f"{story['story_title']} {story['story_authors']} {story['story_content']}"
text = f"{feed.feed_title} {stories_text}"
# Remove URLs
text = re.sub(r"http\S+", "", text)
# Remove special characters
text = re.sub(r"[^\w\s]", "", text)
# Convert to lowercase
text = text.lower()
# Remove extra whitespace
text = " ".join(text.split())
encoded_text = cls.model.encode(text)
normalized_embedding = encoded_text / np.linalg.norm(encoded_text)
# logging.debug(f" ---> ~FGNormalized embedding for feed {feed_id}: {normalized_embedding}")
return normalized_embedding
2015-07-14 16:20:46 -07:00
@classmethod
def export_csv(cls):
import djqscsv
2024-04-24 09:50:42 -04:00
2017-01-08 19:45:53 -08:00
from apps.rss_feeds.models import Feed
2015-07-14 16:20:46 -07:00
2024-04-24 09:43:56 -04:00
qs = Feed.objects.filter(num_subscribers__gte=20).values(
"id", "feed_title", "feed_address", "feed_link", "num_subscribers"
)
2015-07-14 16:20:46 -07:00
csv = djqscsv.render_to_csv_response(qs).content
2024-04-24 09:43:56 -04:00
f = open("feeds.csv", "w+")
2015-07-14 16:20:46 -07:00
f.write(csv)
f.close()