"""
Target Type Identification
==========================
The command-line application for target type identification.
Usage
-----
::
python -m nordlys.services.tti <config_file> -q <query>
If `-q <query>` is passed, it returns the results for the specified query and prints them in terminal.
Config parameters
------------------
- **method**: name of TTI method; accepted values: ["tc", "ec", "ltr"]
- **num_docs**: number of documents to return
- **start**: starting offset for ranked documents
- **model**: retrieval model, if method is "tc" or "ec"; accepted values: ["lm", "bm25"]
- **ec_cutoff**: if method is "ec", rank cut-off of top-*K* entities for EC TTI
- **field**: field name, if method is "tc" or "ec"
- **smoothing_method**: accepted values: ["jm", "dirichlet"]
- **smoothing_param**: value of lambda or mu; accepted values: [float or "avg_len"]
- **query_file**: path to query file (JSON)
- **output_file**: path to output file (JSON)
- **trec_output_file**: path to output file (trec_eval-formatted)
Example config
---------------
.. code:: python
{ "method": "ec",
"num_docs": 10,
"model": "lm",
"first_pass": {
"num_docs": 50
},
"smoothing_method": "dirichlet",
"smoothing_param": 2000,
"ec_cutoff": 20,
"query_file": "path/to/queries.json",
"output_file": "path/to/output.txt",
}
------------------------
:Author: Dario Garigliotti
"""
# -------
# Standard imports
from os.path import expanduser
import argparse
import json
from pprint import pprint
# Cross-ref imports
from nordlys.config import ELASTIC_INDICES, ELASTIC_TTI_INDICES
from nordlys.core.retrieval.elastic_cache import ElasticCache
from nordlys.core.utils.file_utils import FileUtils
from nordlys.core.retrieval.retrieval import Retrieval # for TC TTI
from nordlys.core.retrieval.scorer import Scorer # for TC TTI
from nordlys.logic.fusion.late_fusion_scorer import LateFusionScorer # for EC TTI
from nordlys.logic.entity.entity import Entity # for defining the higher-order entity-centric late-fusion assoc func
from nordlys.core.utils.file_utils import FileUtils # for outputting
from nordlys.core.retrieval.retrieval_results import RetrievalResults # for TREC-formatted outputting
from nordlys.config import PLOGGER # for logging
# -------
# DBpedia distinguished metadata
RDF_TYPE_PROP = "<rdf:type>"
OWL_THING_TYPE = "<owl:Thing>"
DBO_TYPE_PREFIX = "<dbo:"
# Methods and models
TTI_METHOD_TC = "tc"
TTI_METHOD_EC = "ec"
TTI_MODEL_BM25 = "bm25"
TTI_MODEL_LM = "lm"
# Default values for several parameters
DEFAULT_1ST_PASS_NUM_DOCS = 50 # Efficiency-related setting; it should be enough for types
DEFAULT_1ST_PASS_FIELD = "content"
DEFAULT_TTI_METHOD = TTI_METHOD_TC
DEFAULT_TTI_NUM_DOCS = 10 # enough for displaying top types
DEFAULT_TTI_START = 0
DEFAULT_TTI_TC_INDEX = ELASTIC_TTI_INDICES[0]
DEFAULT_TTI_EC_INDEX = ELASTIC_INDICES[0]
DEFAULT_TTI_EC_K_CUTOFF = 20 # Known to be a sufficient cut-off
# -------
[docs]class TTI(object):
def __init__(self, config):
self.__check_config(config)
self.__config = config
self.__method = config["method"]
self.__num_docs = config["num_docs"]
self.__start = config["start"]
self.__tc_config = { # only for TC TTI
"index_name": self.__config["index"],
"first_pass": {
"1st_num_docs": DEFAULT_1ST_PASS_NUM_DOCS,
"field": DEFAULT_1ST_PASS_FIELD
},
}
self.__query_file = config.get("query_file", None)
self.__output_file = config.get("output_file", None)
@staticmethod
def __check_config(config):
"""Checks config parameters and set default values."""
config["method"] = config.get("method", TTI_METHOD_TC) # TODO decide
config["index"] = DEFAULT_TTI_TC_INDEX if config["method"] == TTI_METHOD_TC else DEFAULT_TTI_EC_INDEX
config["num_docs"] = int(config.get("num_docs", DEFAULT_TTI_NUM_DOCS))
config["start"] = int(config.get("start", DEFAULT_TTI_START))
config["run_id"] = config.get("run_id", "tti")
return config
def __valid_final_ec_type(self, t):
"""Assesses whether a DBpedia type t is valid to be returned for the entity-centric mapper.
:param t: a DBpedia type shortly-prefixed URI, e.g., "<dbo:City>".
:type t: str
:return: a Boolean value assessing whether t is valid.
"""
return t is not OWL_THING_TYPE and t.startswith(DBO_TYPE_PREFIX)
def __entity_centric_mapper(self, entity_id):
"""Gets the list of DBpedia types for a given entityID."""
en = Entity()
all_types = en.lookup_en(entity_id).get(RDF_TYPE_PROP, [])
final_types = [t for t in all_types if self.__valid_final_ec_type(t)] # filtering
return final_types
def __entity_centric(self, query):
"""Entity-centric TTI.
:param query: query string
:type query: str
"""
types = dict() # to be returned
# Set the configurations
model = self.__config.get("model", TTI_MODEL_BM25)
ec_cutoff = self.__config.get("ec_cutoff", DEFAULT_TTI_EC_K_CUTOFF)
self.__ec_retr_config = dict()
for param in ["smoothing_method", "smoothing_param"]:
if self.__config.get(param, None) is not None:
self.__ec_retr_config[param] = self.__config.get(param)
# Perform EC TTI using late fusion support
late_fusion_scorer = LateFusionScorer(self.__config["index"], model, self.__ec_retr_config,
num_docs=ec_cutoff, field="catchall", run_id=self.__config["run_id"],
num_objs=self.__config["num_docs"])
ret_res = late_fusion_scorer.score_query(query, assoc_fun=self.__entity_centric_mapper)
for doc_id, score in ret_res.get_scores_sorted():
types[doc_id] = {"score": score}
PLOGGER.info("done")
return types
def __type_centric(self, query):
"""Type-centric TTI.
:param query: query string
:type query: str
"""
types = dict()
model = self.__config.get("model", TTI_MODEL_BM25)
elastic = ElasticCache(self.__tc_config.get("index", DEFAULT_TTI_TC_INDEX))
if model == TTI_MODEL_BM25:
PLOGGER.info("TTI, TC, BM25")
self.__tc_config["model"] = "bm25"
# scorer = Scorer.get_scorer(elastic, query, self.__tc_config)
types = Retrieval(self.__tc_config).retrieve(query)
elif model == TTI_MODEL_LM:
PLOGGER.debug("TTI, TC, LM")
self.__tc_config["model"] = "lm" # Needed for 2nd-pass
self.__tc_config["field"] = "content" # Needed for 2nd-pass
self.__tc_config["second_pass"] = {
"field": "content"
}
for param in ["smoothing_method", "smoothing_param"]:
if self.__config.get(param, None) is not None:
self.__tc_config["second_pass"][param] = self.__config.get(param)
scorer = Scorer.get_scorer(elastic, query, self.__tc_config)
types = Retrieval(self.__tc_config).retrieve(query, scorer)
PLOGGER.info(types)
return types
[docs] def identify(self, query):
"""Performs target type identification for the query.
:param query: query string
:type query: str
:return: annotated query
"""
# obtains types according to requested method
method = self.__config.get("method", None)
if method == TTI_METHOD_EC: # Entity-centric TTI
types = self.__entity_centric(query)
else: # default Type-centric TTI
types = self.__type_centric(query)
# sorts types
sorted_types = dict()
i = 0
for type_id, en in sorted(types.items(), key=lambda item: item[1]["score"], reverse=True):
rank = i + self.__start
sorted_types[rank] = {"type": type_id, "score": en["score"]}
i += 1
if i == self.__num_docs:
break
# converts to output format
res = {"query": query,
"results": sorted_types}
return res
[docs] def batch_identification(self):
"""Annotates, in a batch, queries with identified target types, and outputs results."""
queries = json.load(FileUtils.open_file_by_type(self.__query_file))
f_trec_out = None
if "trec_output_file" in self.__config: # for TREC-formatted outputting
f_trec_out = FileUtils.open_file_by_type(self.__config["trec_output_file"], mode="w")
results = dict()
for query_id in sorted(queries):
PLOGGER.info("Identifying target types for [{}] {}".format(query_id, queries[query_id]))
results[query_id] = self.identify(queries[query_id])
# Output resulting scores in TREC format if required
if f_trec_out:
type_to_score = dict()
for d in results.get(query_id, {}).get("results", {}).values():
type_to_score[d["type"]] = d["score"]
ret_res = RetrievalResults(type_to_score)
ret_res.write_trec_format(query_id,
self.__config["run_id"],
f_trec_out,
max_rank=self.__config["num_docs"])
json.dump(results, FileUtils.open_file_by_type(self.__output_file, mode="w"), indent=4, sort_keys=True)
PLOGGER.info("Output file: {}".format(self.__output_file))
if f_trec_out:
f_trec_out.close()
[docs]def arg_parser():
parser = argparse.ArgumentParser()
parser.add_argument("-q", "--query", help="query string", type=str, default=None)
parser.add_argument("-c", "--config", help="config file", type=str, default=dict())
args = parser.parse_args()
return args
[docs]def main(args):
config = FileUtils.load_config(args.config)
tti = TTI(config)
if args.query:
res = tti.identify(args.query)
pprint(res)
else:
tti.batch_identification()
if __name__ == '__main__':
main(arg_parser())