"""
Scorer
======
Various retrieval models for scoring a individual document for a given query.
:Authors: Faegheh Hasibi, Krisztian Balog
"""
import math
import sys
from nordlys.core.retrieval.elastic import Elastic
from nordlys.core.retrieval.elastic_cache import ElasticCache
from nordlys.config import PLOGGER
[docs]class Scorer(object):
"""Base scorer class."""
SCORER_DEBUG = 0
def __init__(self, elastic, query, params):
self._elastic = elastic
self._query = query
self._params = params
# The analyser might return terms that are not in the collection.
# These terms are filtered out later in the score_doc functions.
if self._query:
self._query_terms = elastic.analyze_query(self._query).split()
else:
self._query_terms = []
# def score_doc(self, doc_id):
# """Scorer method to be implemented in each subclass."""
# # should use elastic scoring
# query = self._elastic.analyze_query(self._query)
# field = params["first_pass"]["field"]
# res = self._elastic.search(query, field, num=self.__first_pass_num_docs, start=start)
# return
[docs] @staticmethod
def get_scorer(elastic, query, config):
"""Returns Scorer object (Scorer factory).
:param elastic: Elastic object
:param query: raw query (to be analyzed)
:param config: dict with models parameters
"""
model = config.get("model", None)
if model == "lm":
PLOGGER.debug("\tLM scoring ... ")
return ScorerLM(elastic, query, config)
elif model == "mlm":
PLOGGER.debug("\tMLM scoring ...")
return ScorerMLM(elastic, query, config)
elif model == "prms":
PLOGGER.debug("\tPRMS scoring ...")
return ScorerPRMS(elastic, query, config)
elif model is None:
return None
else:
raise Exception("Unknown model " + model)
# =========================================
# ================== LM ==================
# =========================================
[docs]class ScorerLM(Scorer):
"""Language Model (LM) scorer."""
JM = "jm"
DIRICHLET = "dirichlet"
def __init__(self, elastic, query, params):
super(ScorerLM, self).__init__(elastic, query, params)
self._field = params.get("fields", Elastic.FIELD_CATCHALL)
self._smoothing_method = params.get("smoothing_method", self.DIRICHLET).lower()
if self._smoothing_method == self.DIRICHLET:
self._smoothing_param = params.get("smoothing_param", 2000)
elif self._smoothing_method == ScorerLM.JM:
self._smoothing_param = params.get("smoothing_param", 0.1)
# self._smoothing_param = params.get("smoothing_param", None)
else:
PLOGGER.error(self._smoothing_method + " smoothing method is not supported!")
sys.exit(0)
self._tf = {}
[docs] @staticmethod
def get_jm_prob(tf_t_d, len_d, tf_t_C, len_C, lambd):
"""Computes JM-smoothed probability.
p(t|theta_d) = [(1-lambda) tf(t, d)/|d|] + [lambda tf(t, C)/|C|]
:param tf_t_d: tf(t,d)
:param len_d: |d|
:param tf_t_C: tf(t,C)
:param len_C: |C| = \sum_{d \in C} |d|
:param lambd: \lambda
:return: JM-smoothed probability
"""
p_t_d = tf_t_d / len_d if len_d > 0 else 0
p_t_C = tf_t_C / len_C if len_C > 0 else 0
if Scorer.SCORER_DEBUG:
print("\t\t\tp(t|d) = {}\tp(t|C) = {}".format(p_t_d, p_t_C))
return (1 - lambd) * p_t_d + lambd * p_t_C
[docs] @staticmethod
def get_dirichlet_prob(tf_t_d, len_d, tf_t_C, len_C, mu):
"""Computes Dirichlet-smoothed probability.
P(t|theta_d) = [tf(t, d) + mu P(t|C)] / [|d| + mu]
:param tf_t_d: tf(t,d)
:param len_d: |d|
:param tf_t_C: tf(t,C)
:param len_C: |C| = \sum_{d \in C} |d|
:param mu: \mu
:return: Dirichlet-smoothed probability
"""
if mu == 0: # i.e. field does not have any content in the collection
return 0
else:
p_t_C = tf_t_C / len_C if len_C > 0 else 0
return (tf_t_d + mu * p_t_C) / (len_d + mu)
def __get_term_freq(self, doc_id, field, term):
"""Returns the (cached) term frequency."""
if doc_id not in self._tf:
self._tf[doc_id] = {}
if field not in self._tf[doc_id]:
self._tf[doc_id][field] = self._elastic.term_freqs(doc_id, field)
return self._tf[doc_id][field].get(term, 0)
[docs] def get_lm_term_prob(self, doc_id, field, t, tf_t_d_f=None, tf_t_C_f=None):
"""Returns term probability for a document and field.
:param doc_id: document ID
:param field: field name
:param t: term
:return: P(t|d_f)
"""
len_d_f = self._elastic.doc_length(doc_id, field)
len_C_f = self._elastic.coll_length(field)
tf_t_C_f = self._elastic.coll_term_freq(t, field) if tf_t_C_f is None else tf_t_C_f
tf_t_d_f = self.__get_term_freq(doc_id, field, t) if tf_t_d_f is None else tf_t_d_f
if self.SCORER_DEBUG:
print("\t\tt = {}\t f = {}".format(t, field))
print("\t\t\tDoc: tf(t,f) = {}\t|f| = {}".format(tf_t_d_f, len_d_f))
print("\t\t\tColl: tf(t,f) = {}\t|f| = ".format(tf_t_C_f, len_C_f))
p_t_d_f = 0
# JM smoothing: p(t|theta_d_f) = [(1-lambda) tf(t, d_f)/|d_f|] + [lambda tf(t, C_f)/|C_f|]
if self._smoothing_method == self.JM:
lambd = self._smoothing_param
p_t_d_f = self.get_jm_prob(tf_t_d_f, len_d_f, tf_t_C_f, len_C_f, lambd)
if self.SCORER_DEBUG:
print("\t\t\tJM smoothing:")
print("\t\t\tDoc: p(t|theta_d_f)= ", p_t_d_f)
# Dirichlet smoothing
elif self._smoothing_method == self.DIRICHLET:
mu = self._smoothing_param if self._smoothing_param != "avg_len" else self._elastic.avg_len(field)
p_t_d_f = self.get_dirichlet_prob(tf_t_d_f, len_d_f, tf_t_C_f, len_C_f, mu)
if self.SCORER_DEBUG:
print("\t\t\tDirichlet smoothing:")
print("\t\t\tmu: ", mu)
print("\t\t\tDoc: p(t|theta_d_f)= ", p_t_d_f)
return p_t_d_f
[docs] def get_lm_term_probs(self, doc_id, field):
"""Returns probability of all query terms for a document and field; i.e. p(t|theta_d)
:param doc_id: document ID
:param field: field name
:return: dictionary of terms with their probabilities
"""
p_t_theta_d_f = {}
for t in set(self._query_terms):
p_t_theta_d_f[t] = self.get_lm_term_prob(doc_id, field, t)
return p_t_theta_d_f
[docs] def score_doc(self, doc_id):
"""Scores the given document using LM.
p(q|theta_d) = \sum log(p(t|theta_d))
:param doc_id: document id
:return: LM score
"""
if self.SCORER_DEBUG:
print("Scoring doc ID=" + doc_id)
p_t_theta_d = self.get_lm_term_probs(doc_id, self._field)
if sum(p_t_theta_d.values()) == 0: # none of query terms are in the field collection
if self.SCORER_DEBUG:
print("\t\tP(q|{}) = None".format(self._field))
return None
# p(q|theta_d) = sum log(p(t|theta_d)); we return log-scale values
p_q_theta_d = 0
for t in self._query_terms:
# Skips the term if it is not in the field collection
if p_t_theta_d[t] == 0:
continue
if self.SCORER_DEBUG:
print("\t\tP({}|{}) = {}".format(t, self._field, p_t_theta_d[t]))
p_q_theta_d += math.log(p_t_theta_d[t])
if self.SCORER_DEBUG:
print("P(d|q) = {}".format(p_q_theta_d))
return p_q_theta_d
# =========================================
# ================== MLM =================
# =========================================
[docs]class ScorerMLM(ScorerLM):
"""Mixture of Language Model (MLM) scorer.
Implemented based on:
Ogilvie, Callan. Combining document representations for known-item search. SIGIR 2003.
"""
def __init__(self, elastic, query, params):
super(ScorerMLM, self).__init__(elastic, query, params)
self._field_weights = params.get("fields", {})
if "fields" not in params:
raise Exception("Field weights are not defined for MLM scoring!")
[docs] def get_mlm_term_prob(self, doc_id, t):
"""Returns MLM probability for the given term and field-weights.
p(t|theta_d) = sum(mu_f * p(t|theta_d_f))
:param lucene_doc_id: internal Lucene document ID
:param t: term
:return: P(t|theta_d)
"""
p_t_theta_d = 0
for f, mu_f in self._field_weights.items():
p_t_theta_d_f = self.get_lm_term_prob(doc_id, f, t)
p_t_theta_d += mu_f * p_t_theta_d_f
if self.SCORER_DEBUG:
print("\t\tP(t|theta_d)=" + str(p_t_theta_d))
return p_t_theta_d
[docs] def get_mlm_term_probs(self, doc_id):
""" Returns probability of all query terms for a document; i.e. p(t|theta_d)
:param doc_id: internal Lucene document ID
:return: dictionary of terms with their probabilities
"""
p_t_theta_d = {}
for t in set(self._query_terms):
p_t_theta_d[t] = self.get_mlm_term_prob(doc_id, t)
return p_t_theta_d
[docs] def score_doc(self, doc_id):
"""Scores the given document using MLM model.
p(q|theta_d) = \sum log(p(t|theta_d))
:param doc_id: document ID
:return: MLM score of document and query
"""
if self.SCORER_DEBUG:
print("Scoring doc ID=" + doc_id)
p_t_theta_d = self.get_mlm_term_probs(doc_id)
if sum(p_t_theta_d.values()) == 0: # none of query terms are in the field collection
if self.SCORER_DEBUG:
print("\t\tP_mlm(q|theta_d) = None")
return None
# p(q|theta_d) = sum(log(p(t|theta_d))) ; we return log-scale values
p_q_theta_d = 0
for t in self._query_terms:
if p_t_theta_d[t] == 0: # Skips the term if it is not in the field collection
continue
if self.SCORER_DEBUG:
print("\tP_mlm({}|theta_d) = {}".format(t, p_t_theta_d[t]))
p_q_theta_d += math.log(p_t_theta_d[t])
if self.SCORER_DEBUG:
print("P(d|q) = {}".format(p_q_theta_d))
return p_q_theta_d
# =========================================
# ================= PRMS ==================
# =========================================
[docs]class ScorerPRMS(ScorerLM):
"""PRMS scorer."""
# @todo: make this class similar to MLM scorer, add get_term_prob(s) functions
def __init__(self, elastic, query, params):
super(ScorerPRMS, self).__init__(elastic, query, params)
self._fields = params["fields"]
self.total_field_freq = None
self.mapping_probs = None
[docs] def score_doc(self, doc_id):
"""
Scores the given document using PRMS model.
:param doc_id: document id
:param lucene_doc_id: internal Lucene document ID
:return: float, PRMS score of document and query
"""
if self.SCORER_DEBUG:
print("Scoring doc ID=" + doc_id)
# gets mapping probs: p(f|t)
p_f_t = self.get_mapping_probs()
# gets term probs: p(t|theta_d_f)
p_t_theta_d_f = {}
for field in self._fields:
p_t_theta_d_f[field] = self.get_lm_term_probs(doc_id, field)
# none of query terms are in the field collection
if sum([sum(p_t_theta_d_f[field].values()) for field in p_t_theta_d_f]) == 0:
return None
# p(q|theta_d) = prod(p(t|theta_d)) ; we return log(p(q|theta_d))
p_q_theta_d = 0
for t in self._query_terms:
if self.SCORER_DEBUG:
print("\tt=" + t)
# p(t|theta_d) = sum(p(f|t) * p(t|theta_d_f))
p_t_theta_d = 0
for f in self._fields:
if f in p_f_t[t]:
p_t_theta_d += p_f_t[t][f] * p_t_theta_d_f[f][t]
if self.SCORER_DEBUG:
print("\t\t\tf = {}\tp(t|f) = {}\tP(t|theta_d,f) = {}".format(
f, p_f_t[t][f], p_t_theta_d_f[f][t]))
if p_t_theta_d == 0:
continue
p_q_theta_d += math.log(p_t_theta_d)
if self.SCORER_DEBUG:
print("\t\tP(t|theta_d)= {}".format(p_t_theta_d))
return p_q_theta_d
[docs] def get_mapping_probs(self):
"""Gets (cached) mapping probabilities for all query terms."""
if self.mapping_probs is None:
self.mapping_probs = {}
for t in set(self._query_terms):
self.mapping_probs[t] = self.get_mapping_prob(t)
return self.mapping_probs
[docs] def get_mapping_prob(self, t, coll_termfreq_fields=None):
"""
Computes PRMS field mapping probability.
p(f|t) = P(t|f)P(f) / sum_f'(P(t|C_{f'_c})P(f'))
:param t: str
:param coll_termfreq_fields: {field: freq, ...}
:return: a dictionary {field: prms_prob, ...}
"""
if coll_termfreq_fields is None:
coll_termfreq_fields = {}
for f in self._fields:
coll_termfreq_fields[f] = self._elastic.coll_term_freq(t, f)
# calculates numerators for all fields: P(t|f)P(f)
numerators = {}
for f in self._fields:
p_t_f = coll_termfreq_fields[f] / self._elastic.coll_length(f)
p_f = self._elastic.doc_count(f) / self.get_total_field_freq()
numerator = p_t_f * p_f
if numerator > 0:
numerators[f] = numerator
if self.SCORER_DEBUG:
print("\tf= {}\tt= {}\tP(t|f)= {}\tP(f)= {}".format(f, t, p_t_f, p_f))
# calculates denominator: sum_f'(P(t|C_{f'_c})P(f'))
denominator = sum(numerators.values())
mapping_probs = {}
if denominator > 0: # if the term is present in the collection
for f in numerators:
mapping_probs[f] = numerators[f] / denominator
if self.SCORER_DEBUG:
print("\t\tf= {}\tt= {}\tp(f|t)= {}/{} = {}".format(
f, t, numerators[f], sum(numerators.values()), mapping_probs[f]))
return mapping_probs
[docs] def get_total_field_freq(self):
"""Returns total occurrences of all fields"""
if self.total_field_freq is None:
total_field_freq = 0
for f in self._fields:
total_field_freq += self._elastic.doc_count(f)
self.total_field_freq = total_field_freq
return self.total_field_freq
if __name__ == "__main__":
query = "gonna friends"
doc_id = "4"
es = ElasticCache("toy_index")
params = {"fields": "content",
"__fields": {"title": 0.2, "content": 0.8},
"__fields": ["content", "title"]
}
score = ScorerPRMS(es, query, params).score_doc(doc_id)
print(score)