Source code for nordlys.core.retrieval.elastic


Utility class for working with Elasticsearch.
This class is to be instantiated for each index.

Indexing usage

To create an index, first you need to define field mappings and then build the index.
The sample code for creating an index is provided at :py:mod:`nordlys.core.retrieval.toy_indexer`.

Retrieval usage

The following statistics can be obtained from this class:

  - Number of documents: :func:`Elastic.num_docs`
  - Number of fields: :func:`Elastic.num_fields`
  - Document count: :func:`Elastic.doc_count`
  - Collection length: :func:`Elastic.coll_length`
  - Average length: :func:`Elastic.avg_len`
  - Document length: :func:`Elastic.doc_length`
  - Document frequency: :func:`Elastic.doc_freq`
  - Collection frequency: :func:`Elastic.coll_term_freq`
  - Term frequencies: :func:`Elastic.term_freqs`

Efficiency considerations

  - For efficiency reasons, we do not store term positions during indexing. To store them, see the corresponding mapping functions :func:`Elastic.analyzed_field`, :func:`Elastic.notanalyzed_searchable_field`.
  - Use :py:mod:`ElasticCache <nordlys.core.retrieval.elastic_cache>` for getting index statistics. This module caches the statistics into memory and boosts efficeicny.
  - Mind that :py:mod:`ElasticCache <nordlys.core.retrieval.elastic_cache>` does not empty the cache!

:Authors: Faegheh Hasibi, Krisztian Balog

from pprint import pprint, pformat

from elasticsearch import Elasticsearch
from elasticsearch import helpers

from nordlys.config import ELASTIC_HOSTS, ELASTIC_SETTINGS

[docs]class Elastic(object): FIELD_CATCHALL = "catchall" FIELD_ELASTIC_CATCHALL = "_all" DOC_TYPE = "doc" # we don't make use of types ANALYZER_STOP_STEM = "english" ANALYZER_STOP = "stop_en" BM25 = "BM25" SIMILARITY = "sim" # Used when other similarities are used def __init__(self, index_name): self.__es = Elasticsearch(hosts=ELASTIC_HOSTS) self.__index_name = index_name
[docs] @staticmethod def analyzed_field(analyzer=ANALYZER_STOP): """Returns the mapping for analyzed fields. For efficiency considerations, term positions are not stored. To store term positions, change ``"term_vector": "with_positions_offsets"`` :param analyzer: name of the analyzer; valid options: [ANALYZER_STOP, ANALYZER_STOP_STEM] """ if analyzer not in {Elastic.ANALYZER_STOP, Elastic.ANALYZER_STOP_STEM}: # PLOGGER.error("Error: Analyzer", analyzer, "is not valid.") exit(0) return {"type": "text", "term_vector": "yes", "analyzer": analyzer}
[docs] @staticmethod def notanalyzed_field(): """Returns the mapping for not-analyzed fields.""" return {"type": "text", "index": "not_analyzed"}
[docs] @staticmethod def notanalyzed_searchable_field(): """Returns the mapping for not-analyzed fields.""" return {"type": "text", "term_vector": "yes", "analyzer": "keyword"}
def __gen_similarity(self, model, params=None): """Gets the custom similarity function.""" similarity = params if params else {} similarity["type"] = model return {Elastic.SIMILARITY: similarity} def __gen_analyzers(self): """Gets custom analyzers. We include customized analyzers in the index setting, a field may or may not use it. """ analyzer = {"type": "standard", "stopwords": "_english_"} analyzers = {"analyzer": {Elastic.ANALYZER_STOP: analyzer}} return analyzers
[docs] def analyze_query(self, query, analyzer=ANALYZER_STOP): """Analyzes the query. :param query: raw query :param analyzer: name of analyzer """ if query.strip() == "": return "" body = {"analyzer": analyzer, "text": query} tokens = self.__es.indices.analyze(index=self.__index_name, body=body)["tokens"] query_terms = [] for t in sorted(tokens, key=lambda x: x["position"]): query_terms.append(t["token"]) return " ".join(query_terms)
[docs] def get_mapping(self): """Returns mapping definition for the index.""" mapping = self.__es.indices.get_mapping(index=self.__index_name, doc_type=self.DOC_TYPE) return mapping[self.__index_name]["mappings"][self.DOC_TYPE]["properties"]
[docs] def get_settings(self): """Returns index settings.""" return self.__es.indices.get_settings(index=self.__index_name)[self.__index_name]["settings"]["index"]
def __update_settings(self, settings): """Updates the index settings.""" self.__es.indices.close(index=self.__index_name) self.__es.indices.put_settings(index=self.__index_name, body=settings) self.__es.indices.refresh(index=self.__index_name)
[docs] def update_similarity(self, model=BM25, params=None): """Updates the similarity function "sim", which is fixed for all index fields. The method and param should match elastic settings: :param model: name of the elastic model :param params: dictionary of params based on elastic """ old_similarity = self.get_settings()["similarity"] new_similarity = self.__gen_similarity(model, params) # We only update the similarity if it is different from the old one. # this avoids unnecessary closing of the index if old_similarity != new_similarity: self.__update_settings({"similarity": new_similarity})
[docs] def delete_index(self): """Deletes an index.""" self.__es.indices.delete(index=self.__index_name) print("Index <" + self.__index_name + "> has been deleted.")
[docs] def create_index(self, mappings, model=BM25, model_params=None, force=False): """Creates index (if it doesn't exist). :param mappings: field mappings :param model: name of elastic search similarity :param model_params: name of elastic search similarity :param force: forces index creation (overwrites if already exists) """ if self.__es.indices.exists(self.__index_name): if force: self.delete_index() else: #"Index already exists. No changes were made.") return # sets general elastic settings body = ELASTIC_SETTINGS # sets the global index settings # number of shards should be always set to 1; otherwise the stats would not be correct body["settings"] = {"analysis": self.__gen_analyzers(), "index": {"number_of_shards": 1, "number_of_replicas": 0}, } # sets similarity function # If model is not BM25, a similarity module with the given model and params is defined if model != Elastic.BM25: body["settings"]["similarity"] = self.__gen_similarity(model, model_params) sim = model if model == Elastic.BM25 else Elastic.SIMILARITY for mapping in mappings.values(): mapping["similarity"] = sim # sets the field mappings body["mappings"] = {self.DOC_TYPE: {"properties": mappings}} # creates the index self.__es.indices.create(index=self.__index_name, body=body)
# #"New index <" + self.__index_name + "> is created.")
[docs] def add_docs_bulk(self, docs): """Adds a set of documents to the index in a bulk. :param docs: dictionary {doc_id: doc} """ actions = [] for doc_id, doc in docs.items(): action = { "_index": self.__index_name, "_type": self.DOC_TYPE, "_id": doc_id, "_source": doc } actions.append(action) if len(actions) > 0: helpers.bulk(self.__es, actions)
[docs] def add_doc(self, doc_id, contents): """Adds a document with the specified contents to the index. :param doc_id: document ID :param contents: content of document """ self.__es.index(index=self.__index_name, doc_type=self.DOC_TYPE, id=doc_id, body=contents)
[docs] def get_doc(self, doc_id, fields=None, source=True): """Gets a document from the index based on its ID. :param doc_id: document ID :param fields: list of fields to return (default: all) :param source: return document source as well (default: yes) """ return self.__es.get(index=self.__index_name, doc_type=self.DOC_TYPE, id=doc_id, fields=fields, _source=source)
[docs] def search(self, query, field, num=100, fields_return="", start=0): """Searches in a given field using the similarity method configured in the index for that field. :param query: query string :param field: field to search in :param num: number of hits to return (default: 100) :param fields_return: additional document fields to be returned :param start: starting offset (default: 0) :return: dictionary of document IDs with scores """ hits =, q=query, df=field, _source=False, size=num, fielddata_fields=fields_return, from_=start)["hits"]["hits"] results = {} for hit in hits: results[hit["_id"]] = {"score": hit["_score"], "fields": hit.get("fields", {})} return results
[docs] def search_complex(self, body, num=10, fields_return="", start=0): """ Supports complex structured queries, which are sent as a ``body`` field in Elastic search. For detailed information on formulating structured queries, see the `official instructions. <>`_ Below is an example to search in two particular fields that each must contain a specific term. :Example: .. code-block:: python # [explanation of the query] term_1 = "hello" term_2 = "world" body = { "query": { "bool": { "must": [ { "match": {"title": term_1} }, { "match_phrase": {"content": term_2} } ] } } } :param body: query body :param field: field to search in :param num: number of hits to return (default: 100) :param fields_return: additional document fields to be returned :param start: starting offset (default: 0) :return: dictionary of document IDs with scores """ hits =, body=body, _source=False, size=num, fielddata_fields=fields_return, from_=start)["hits"]["hits"] results = {} for hit in hits: results[hit["_id"]] = {"score": hit["_score"], "fields": hit.get("fields", {})} return results
[docs] def get_field_stats(self, field): """Returns stats of the given field.""" return self.__es.field_stats(index=self.__index_name, fields=[field])["indices"]["_all"]["fields"][field]
[docs] def get_fields(self): """Returns name of fields in the index.""" return list(self.get_mapping().keys())
# ========================================= # ================= Stats ================= # ========================================= def _get_termvector(self, doc_id, field, term_stats=False): """Returns a term vector for a given document field, including global field and term statistics. Term stats can have a serious performance impact; should be set to true only if it is needed! :param doc_id: document ID :param field: field name :param term_stats: if True, returns term statistics :return: Term vector dictionary """ tv = self.__es.termvectors(index=self.__index_name, doc_type=self.DOC_TYPE, id=doc_id, fields=field, term_statistics=term_stats) return tv.get("term_vectors", {}).get(field, {}).get("terms", {}) def _get_multi_termvectors(self, doc_ids, field, term_stats=False): """Returns multiple term vectors for a given document field (similar to a single term vector) :param doc_ids: document ID :param field: field name :param term_stats: if True, returns term statistics :return: {'doc_id': {tv}, ..} """ tv_all = self.__es.mtermvectors(index=self.__index_name, doc_type=self.DOC_TYPE, ids=",".join(doc_ids), fields=field, term_statistics=term_stats) result = {} for tv in tv_all["docs"]: result[tv["_id"]] = tv.get("term_vectors", {}).get(field, {}).get("terms", {}) return result def _get_coll_termvector(self, term, field): """Returns a term vector containing collection stats of a term.""" body = {"query": {"bool": {"must": {"term": {field: term}}}}} hits = self.search_complex(body, num=1) # hits =, field, num=1) doc_id = next(iter(hits.keys())) if len(hits) > 0 else None return self._get_termvector(doc_id, field, term_stats=True) if doc_id else {}
[docs] def num_docs(self): """Returns the number of documents in the index.""" return self.__es.count(index=self.__index_name, doc_type=self.DOC_TYPE)["count"]
[docs] def num_fields(self): """Returns number of fields in the index.""" return len(self.get_mapping())
[docs] def doc_count(self, field): """Returns number of documents with at least one term for the given field.""" return self.get_field_stats(field)["doc_count"]
[docs] def coll_length(self, field): """Returns length of field in the collection.""" return self.get_field_stats(field)["sum_total_term_freq"]
[docs] def avg_len(self, field): """Returns average length of a field in the collection.""" return self.coll_length(field) / self.doc_count(field)
[docs] def doc_length(self, doc_id, field): """Returns length of a field in a document.""" return sum(self.term_freqs(doc_id, field).values())
[docs] def doc_freq(self, term, field, tv=None): """Returns document frequency for the given term and field.""" coll_tv = tv if tv else self._get_coll_termvector(term, field) return coll_tv.get(term, {}).get("doc_freq", 0)
[docs] def coll_term_freq(self, term, field, tv=None): """ Returns collection term frequency for the given field.""" coll_tv = tv if tv else self._get_coll_termvector(term, field) return coll_tv.get(term, {}).get("ttf", 0)
[docs] def term_freqs(self, doc_id, field, tv=None): """Returns term frequencies of all terms for a given document and field.""" doc_tv = tv if tv else self._get_termvector(doc_id, field) term_freqs = {} for term, val in doc_tv.items(): term_freqs[term] = val["term_freq"] return term_freqs
[docs] def term_freq(self, doc_id, field, term): """Returns frequency of a term in a given document and field.""" return self.term_freqs(doc_id, field).get(term, 0)
if __name__ == "__main__": # example usage of index statistics doc_id = 1 field = "title" term = "gonna" es = Elastic("toy_index") pprint(es._get_termvector(doc_id, field=field, term_stats=True)) pprint(, field)) print("================= Stats =================") print("[FIELD]: %s [TERM]: %s" % (field, term)) print("- Number of documents: %d" % es.num_docs()) print("- Number of fields: %d" % es.num_fields()) print("- Document count: %d" % es.doc_count(field)) print("- Collection length: %d" % es.coll_length(field)) print("- Average length: %.2f" % es.avg_len(field)) print("- Document length: %d" % es.doc_length(doc_id, field)) print("- Number of fields: %d" % es.num_fields()) print("- Document frequency: %d" % es.doc_freq(term, field)) print("- Collection frequency: %d" % es.coll_term_freq(term, field)) print("- Term frequencies:") pprint(es.term_freqs(doc_id, field))