Source code for nordlys.core.retrieval.elastic_cache

Elastic Cache

This is a cache for elastic index stats; a layer between an index and retrieval.
The statistics (such as document and term frequencies) are first read from the index and stay in the memory for further

Usage hints

  - Only one instance of Elastic cache needs to be created.
  - If running out of memory, you need to create a new object of ElasticCache.
  - The class also caches termvectors. To further boost efficiency, you can load term vectors for multiple documents using :func:`ElasticCache.multi_termvector`.

:Author: Faegheh Hasibi
from collections import defaultdict

from nordlys.core.retrieval.elastic import Elastic

[docs]class ElasticCache(Elastic): def __init__(self, index_name): super(ElasticCache, self).__init__(index_name) # Cached variables self.__num_docs = None self.__num_fields = None self.__doc_count = {} self.__coll_length = {} self.__avg_len = {} self.__doc_length = defaultdict(dict) self.__doc_freq = defaultdict(dict) self.__coll_term_freq = defaultdict(dict) self.__tv = defaultdict(dict) self.__coll_tv = defaultdict(dict) def __get_termvector(self, doc_id, field): """Returns a term vector for a given document and field.""" if self.__coll_tv.get(doc_id, {}).get(field, None): return self.__coll_tv[doc_id][field] if self.__tv.get(doc_id, {}).get(field, None) is None: self.__tv[doc_id][field] = self._get_termvector(doc_id, field) return self.__tv[doc_id][field] def __get_coll_termvector(self, term, field): """Returns a term vector containing collection stats of a term.""" body = {"query": {"bool": {"must": {"term": {field: term}}}}} hits = self.search_complex(body, num=1) doc_id = next(iter(hits.keys())) if len(hits) > 0 else None if self.__coll_tv.get(doc_id, {}).get(field, None) is None: self.__coll_tv[doc_id][field] = self._get_termvector(doc_id, field, term_stats=True) if doc_id else {} return self.__coll_tv[doc_id][field]
[docs] def num_docs(self): """Returns the number of documents in the index.""" if self.__num_docs is None: self.__num_docs = super(ElasticCache, self).num_docs() return self.__num_docs
[docs] def num_fields(self): """Returns number of fields in the index.""" if self.__num_fields is None: self.__num_fields = super(ElasticCache, self).num_fields() return self.__num_fields
[docs] def doc_count(self, field): """Returns number of documents with at least one term for the given field.""" if field not in self.__doc_count: self.__doc_count[field] = super(ElasticCache, self).doc_count(field) return self.__doc_count[field]
[docs] def coll_length(self, field): """Returns length of field in the collection.""" if field not in self.__coll_length: self.__coll_length[field] = super(ElasticCache, self).coll_length(field) return self.__coll_length[field]
[docs] def avg_len(self, field): """Returns average length of a field in the collection.""" if field not in self.__avg_len: self.__avg_len[field] = super(ElasticCache, self).avg_len(field) return self.__avg_len[field]
[docs] def doc_length(self, doc_id, field): """Returns length of a field in a document.""" if self.__doc_length.get(doc_id, {}).get(field, None) is None: self.__doc_length[doc_id][field] = sum(self.term_freqs(doc_id, field).values()) return self.__doc_length[doc_id][field]
[docs] def doc_freq(self, term, field, tv=None): """Returns document frequency for the given term and field.""" if self.__doc_freq.get(field, {}).get(term, None) is None: tv = self.__get_coll_termvector(term, field) self.__doc_freq[field][term] = super(ElasticCache, self).doc_freq(term, field, tv=tv) return self.__doc_freq[field][term]
[docs] def coll_term_freq(self, term, field, tv=None): """ Returns collection term frequency for the given field.""" if self.__coll_term_freq.get(field, {}).get(term, None) is None: tv = self.__get_coll_termvector(term, field) self.__coll_term_freq[field][term] = super(ElasticCache, self).coll_term_freq(term, field, tv=tv) return self.__coll_term_freq[field][term]
[docs] def term_freqs(self, doc_id, field, tv=None): """Returns term frequencies for a given document and field.""" tv = self.__get_termvector(doc_id, field) return super(ElasticCache, self).term_freqs(doc_id, field, tv)
[docs] def term_freq(self, doc_id, field, term): """Returns frequency of a term in a given document and field.""" return self.term_freqs(doc_id, field).get(term, 0)
[docs] def multi_termvector(self, doc_ids, field, batch=50): """Returns term vectors for a given document and field.""" i = 0 while i < len(doc_ids): j = i + batch if i + batch <= len(doc_ids) else len(doc_ids) tvs = self._get_multi_termvectors(doc_ids[i:j], field) for doc_id, tv in tvs.items(): if tv != {}: self.__tv[doc_id][field] = tv i += batch