Source code for nordlys.core.retrieval.elastic

"""
Elastic
=======

Utility class for working with Elasticsearch.
This class is to be instantiated for each index.


Indexing usage
--------------

To create an index, first you need to define field mappings and then build the index.
The sample code for creating an index is provided at :py:mod:`nordlys.core.retrieval.toy_indexer`.


Retrieval usage
---------------

The following statistics can be obtained from this class:

  - Number of documents: :func:`Elastic.num_docs`
  - Number of fields: :func:`Elastic.num_fields`
  - Document count: :func:`Elastic.doc_count`
  - Collection length: :func:`Elastic.coll_length`
  - Average length: :func:`Elastic.avg_len`
  - Document length: :func:`Elastic.doc_length`
  - Document frequency: :func:`Elastic.doc_freq`
  - Collection frequency: :func:`Elastic.coll_term_freq`
  - Term frequencies: :func:`Elastic.term_freqs`



Efficiency considerations
~~~~~~~~~~~~~~~~~~~~~~~~~

  - For efficiency reasons, we do not store term positions during indexing. To store them, see the corresponding mapping functions :func:`Elastic.analyzed_field`, :func:`Elastic.notanalyzed_searchable_field`.
  - Use :py:mod:`ElasticCache <nordlys.core.retrieval.elastic_cache>` for getting index statistics. This module caches the statistics into memory and boosts efficeicny.
  - Mind that :py:mod:`ElasticCache <nordlys.core.retrieval.elastic_cache>` does not empty the cache!


:Authors: Faegheh Hasibi, Krisztian Balog
"""

from pprint import pprint, pformat

from elasticsearch import Elasticsearch
from elasticsearch import helpers

from nordlys.config import ELASTIC_HOSTS, ELASTIC_SETTINGS


[docs]class Elastic(object): FIELD_CATCHALL = "catchall" FIELD_ELASTIC_CATCHALL = "_all" DOC_TYPE = "doc" # we don't make use of types ANALYZER_STOP_STEM = "english" ANALYZER_STOP = "stop_en" BM25 = "BM25" SIMILARITY = "sim" # Used when other similarities are used def __init__(self, index_name): self.__es = Elasticsearch(hosts=ELASTIC_HOSTS) self.__index_name = index_name
[docs] @staticmethod def analyzed_field(analyzer=ANALYZER_STOP): """Returns the mapping for analyzed fields. For efficiency considerations, term positions are not stored. To store term positions, change ``"term_vector": "with_positions_offsets"`` :param analyzer: name of the analyzer; valid options: [ANALYZER_STOP, ANALYZER_STOP_STEM] """ if analyzer not in {Elastic.ANALYZER_STOP, Elastic.ANALYZER_STOP_STEM}: # PLOGGER.error("Error: Analyzer", analyzer, "is not valid.") exit(0) return {"type": "text", "term_vector": "yes", "analyzer": analyzer}
[docs] @staticmethod def notanalyzed_field(): """Returns the mapping for not-analyzed fields.""" return {"type": "text", "index": "not_analyzed"}
[docs] @staticmethod def notanalyzed_searchable_field(): """Returns the mapping for not-analyzed fields.""" return {"type": "text", "term_vector": "yes", "analyzer": "keyword"}
def __gen_similarity(self, model, params=None): """Gets the custom similarity function.""" similarity = params if params else {} similarity["type"] = model return {Elastic.SIMILARITY: similarity} def __gen_analyzers(self): """Gets custom analyzers. We include customized analyzers in the index setting, a field may or may not use it. """ analyzer = {"type": "standard", "stopwords": "_english_"} analyzers = {"analyzer": {Elastic.ANALYZER_STOP: analyzer}} return analyzers
[docs] def analyze_query(self, query, analyzer=ANALYZER_STOP): """Analyzes the query. :param query: raw query :param analyzer: name of analyzer """ if query.strip() == "": return "" body = {"analyzer": analyzer, "text": query} tokens = self.__es.indices.analyze(index=self.__index_name, body=body)["tokens"] query_terms = [] for t in sorted(tokens, key=lambda x: x["position"]): query_terms.append(t["token"]) return " ".join(query_terms)
[docs] def get_mapping(self): """Returns mapping definition for the index.""" mapping = self.__es.indices.get_mapping(index=self.__index_name, doc_type=self.DOC_TYPE) return mapping[self.__index_name]["mappings"][self.DOC_TYPE]["properties"]
[docs] def get_settings(self): """Returns index settings.""" return self.__es.indices.get_settings(index=self.__index_name)[self.__index_name]["settings"]["index"]
def __update_settings(self, settings): """Updates the index settings.""" self.__es.indices.close(index=self.__index_name) self.__es.indices.put_settings(index=self.__index_name, body=settings) self.__es.indices.open(index=self.__index_name) self.__es.indices.refresh(index=self.__index_name)
[docs] def update_similarity(self, model=BM25, params=None): """Updates the similarity function "sim", which is fixed for all index fields. The method and param should match elastic settings: https://www.elastic.co/guide/en/elasticsearch/reference/2.3/index-modules-similarity.html :param model: name of the elastic model :param params: dictionary of params based on elastic """ old_similarity = self.get_settings()["similarity"] new_similarity = self.__gen_similarity(model, params) # We only update the similarity if it is different from the old one. # this avoids unnecessary closing of the index if old_similarity != new_similarity: self.__update_settings({"similarity": new_similarity})
[docs] def delete_index(self): """Deletes an index.""" self.__es.indices.delete(index=self.__index_name) print("Index <" + self.__index_name + "> has been deleted.")
[docs] def create_index(self, mappings, model=BM25, model_params=None, force=False): """Creates index (if it doesn't exist). :param mappings: field mappings :param model: name of elastic search similarity :param model_params: name of elastic search similarity :param force: forces index creation (overwrites if already exists) """ if self.__es.indices.exists(self.__index_name): if force: self.delete_index() else: # PLOGGER.info("Index already exists. No changes were made.") return # sets general elastic settings body = ELASTIC_SETTINGS # sets the global index settings # number of shards should be always set to 1; otherwise the stats would not be correct body["settings"] = {"analysis": self.__gen_analyzers(), "index": {"number_of_shards": 1, "number_of_replicas": 0}, } # sets similarity function # If model is not BM25, a similarity module with the given model and params is defined if model != Elastic.BM25: body["settings"]["similarity"] = self.__gen_similarity(model, model_params) sim = model if model == Elastic.BM25 else Elastic.SIMILARITY for mapping in mappings.values(): mapping["similarity"] = sim # sets the field mappings body["mappings"] = {self.DOC_TYPE: {"properties": mappings}} # creates the index self.__es.indices.create(index=self.__index_name, body=body)
# PLOGGER.info(pformat(body)) # PLOGGER.info("New index <" + self.__index_name + "> is created.")
[docs] def add_docs_bulk(self, docs): """Adds a set of documents to the index in a bulk. :param docs: dictionary {doc_id: doc} """ actions = [] for doc_id, doc in docs.items(): action = { "_index": self.__index_name, "_type": self.DOC_TYPE, "_id": doc_id, "_source": doc } actions.append(action) if len(actions) > 0: helpers.bulk(self.__es, actions)
[docs] def add_doc(self, doc_id, contents): """Adds a document with the specified contents to the index. :param doc_id: document ID :param contents: content of document """ self.__es.index(index=self.__index_name, doc_type=self.DOC_TYPE, id=doc_id, body=contents)
[docs] def get_doc(self, doc_id, fields=None, source=True): """Gets a document from the index based on its ID. :param doc_id: document ID :param fields: list of fields to return (default: all) :param source: return document source as well (default: yes) """ return self.__es.get(index=self.__index_name, doc_type=self.DOC_TYPE, id=doc_id, fields=fields, _source=source)
[docs] def search(self, query, field, num=100, fields_return="", start=0): """Searches in a given field using the similarity method configured in the index for that field. :param query: query string :param field: field to search in :param num: number of hits to return (default: 100) :param fields_return: additional document fields to be returned :param start: starting offset (default: 0) :return: dictionary of document IDs with scores """ hits = self.__es.search(index=self.__index_name, q=query, df=field, _source=False, size=num, fielddata_fields=fields_return, from_=start)["hits"]["hits"] results = {} for hit in hits: results[hit["_id"]] = {"score": hit["_score"], "fields": hit.get("fields", {})} return results
[docs] def search_complex(self, body, num=10, fields_return="", start=0): """ Supports complex structured queries, which are sent as a ``body`` field in Elastic search. For detailed information on formulating structured queries, see the `official instructions. <https://www.elastic.co/guide/en/elasticsearch/guide/current/search-in-depth.html>`_ Below is an example to search in two particular fields that each must contain a specific term. :Example: .. code-block:: python # [explanation of the query] term_1 = "hello" term_2 = "world" body = { "query": { "bool": { "must": [ { "match": {"title": term_1} }, { "match_phrase": {"content": term_2} } ] } } } :param body: query body :param field: field to search in :param num: number of hits to return (default: 100) :param fields_return: additional document fields to be returned :param start: starting offset (default: 0) :return: dictionary of document IDs with scores """ hits = self.__es.search(index=self.__index_name, body=body, _source=False, size=num, fielddata_fields=fields_return, from_=start)["hits"]["hits"] results = {} for hit in hits: results[hit["_id"]] = {"score": hit["_score"], "fields": hit.get("fields", {})} return results
[docs] def get_field_stats(self, field): """Returns stats of the given field.""" return self.__es.field_stats(index=self.__index_name, fields=[field])["indices"]["_all"]["fields"][field]
[docs] def get_fields(self): """Returns name of fields in the index.""" return list(self.get_mapping().keys())
# ========================================= # ================= Stats ================= # ========================================= def _get_termvector(self, doc_id, field, term_stats=False): """Returns a term vector for a given document field, including global field and term statistics. Term stats can have a serious performance impact; should be set to true only if it is needed! :param doc_id: document ID :param field: field name :param term_stats: if True, returns term statistics :return: Term vector dictionary """ tv = self.__es.termvectors(index=self.__index_name, doc_type=self.DOC_TYPE, id=doc_id, fields=field, term_statistics=term_stats) return tv.get("term_vectors", {}).get(field, {}).get("terms", {}) def _get_multi_termvectors(self, doc_ids, field, term_stats=False): """Returns multiple term vectors for a given document field (similar to a single term vector) :param doc_ids: document ID :param field: field name :param term_stats: if True, returns term statistics :return: {'doc_id': {tv}, ..} """ tv_all = self.__es.mtermvectors(index=self.__index_name, doc_type=self.DOC_TYPE, ids=",".join(doc_ids), fields=field, term_statistics=term_stats) result = {} for tv in tv_all["docs"]: result[tv["_id"]] = tv.get("term_vectors", {}).get(field, {}).get("terms", {}) return result def _get_coll_termvector(self, term, field): """Returns a term vector containing collection stats of a term.""" body = {"query": {"bool": {"must": {"term": {field: term}}}}} hits = self.search_complex(body, num=1) # hits = self.search(term, field, num=1) doc_id = next(iter(hits.keys())) if len(hits) > 0 else None return self._get_termvector(doc_id, field, term_stats=True) if doc_id else {}
[docs] def num_docs(self): """Returns the number of documents in the index.""" return self.__es.count(index=self.__index_name, doc_type=self.DOC_TYPE)["count"]
[docs] def num_fields(self): """Returns number of fields in the index.""" return len(self.get_mapping())
[docs] def doc_count(self, field): """Returns number of documents with at least one term for the given field.""" return self.get_field_stats(field)["doc_count"]
[docs] def coll_length(self, field): """Returns length of field in the collection.""" return self.get_field_stats(field)["sum_total_term_freq"]
[docs] def avg_len(self, field): """Returns average length of a field in the collection.""" return self.coll_length(field) / self.doc_count(field)
[docs] def doc_length(self, doc_id, field): """Returns length of a field in a document.""" return sum(self.term_freqs(doc_id, field).values())
[docs] def doc_freq(self, term, field, tv=None): """Returns document frequency for the given term and field.""" coll_tv = tv if tv else self._get_coll_termvector(term, field) return coll_tv.get(term, {}).get("doc_freq", 0)
[docs] def coll_term_freq(self, term, field, tv=None): """ Returns collection term frequency for the given field.""" coll_tv = tv if tv else self._get_coll_termvector(term, field) return coll_tv.get(term, {}).get("ttf", 0)
[docs] def term_freqs(self, doc_id, field, tv=None): """Returns term frequencies of all terms for a given document and field.""" doc_tv = tv if tv else self._get_termvector(doc_id, field) term_freqs = {} for term, val in doc_tv.items(): term_freqs[term] = val["term_freq"] return term_freqs
[docs] def term_freq(self, doc_id, field, term): """Returns frequency of a term in a given document and field.""" return self.term_freqs(doc_id, field).get(term, 0)
if __name__ == "__main__": # example usage of index statistics doc_id = 1 field = "title" term = "gonna" es = Elastic("toy_index") pprint(es._get_termvector(doc_id, field=field, term_stats=True)) pprint(es.search(term, field)) print("================= Stats =================") print("[FIELD]: %s [TERM]: %s" % (field, term)) print("- Number of documents: %d" % es.num_docs()) print("- Number of fields: %d" % es.num_fields()) print("- Document count: %d" % es.doc_count(field)) print("- Collection length: %d" % es.coll_length(field)) print("- Average length: %.2f" % es.avg_len(field)) print("- Document length: %d" % es.doc_length(doc_id, field)) print("- Number of fields: %d" % es.num_fields()) print("- Document frequency: %d" % es.doc_freq(term, field)) print("- Collection frequency: %d" % es.coll_term_freq(term, field)) print("- Term frequencies:") pprint(es.term_freqs(doc_id, field))