"""
Mongo
=====
Tools for working with MongoDB.
:Authors: Krisztian Balog, Faegheh Hasibi
"""
import argparse
from nordlys.config import MONGO_DB, MONGO_HOST
from pymongo import MongoClient
from nordlys.config import PLOGGER
[docs]class Mongo(object):
"""Manages the MongoDB connection and operations."""
ID_FIELD = "_id"
def __init__(self, host, db, collection):
self.__client = MongoClient(host)
self.__db = self.__client[db]
self.__collection = self.__db[collection]
self.__db_name = db
self.__collection_name = collection
# PLOGGER.info("Connected to " + self.__db_name + "." + self.__collection_name)
@staticmethod
def __escape(s):
"""Escapes string (to be used as key or fieldname).
Replaces . and $ with their unicode equivalents.
"""
return s.replace(".", "U+002E").replace("$", "U+0024")
[docs] @staticmethod
def unescape(s):
"""Unescapes string."""
return s.replace("U+002E", ".").replace("U+0024", "$")
[docs] @staticmethod
def unescape_doc(mdoc):
"""Unescapes document content."""
if mdoc is None:
return None
doc = {}
for f in mdoc:
if f == Mongo.ID_FIELD:
doc[f] = Mongo.unescape(mdoc[f])
elif type(mdoc[f]) == list: # for DBpedia collection
doc[Mongo.unescape(f)] = mdoc[f]
else: # for surface form collections
doc[Mongo.unescape(f)] = {}
for key, value in mdoc[f].items():
doc[Mongo.unescape(f)][Mongo.unescape(key)] = value
return doc
[docs] def add(self, doc_id, contents):
"""Adds a document or replaces the contents of an entire document."""
# escaping keys for content
c = {}
for key, value in contents.items():
c[self.__escape(key)] = value
try:
self.__collection.update({Mongo.ID_FIELD: self.__escape(doc_id)},
{'$set': c},
upsert=True)
except Exception as e:
PLOGGER.error("\nError (doc_id: " + str(doc_id) + ")\n" + str(e))
[docs] def set(self, doc_id, field, value):
"""Sets the value of a given document field (overwrites previously stored content)."""
self.__collection.update({Mongo.ID_FIELD: self.__escape(doc_id)},
{'$set': {self.__escape(field): value}},
upsert=True)
[docs] def append_list(self, doc_id, field, value):
"""Appends the value to a given field that stores a list.
If the field does not exist yet, it will be created.
The value should be a list.
:param doc_id: document id
:param field: field
:param value: list, a value to be appended to the current list
"""
self.__collection.update({Mongo.ID_FIELD: self.__escape(doc_id)},
{'$push': {self.__escape(field)
: {'$each': [value]}}},
upsert=True)
[docs] def append_set(self, doc_id, field, value):
"""Adds a list of values to a set.
If the field does not exist yet, it will be created.
The value should be a list.
:param doc_id: document id
:param field: field
:param value: list, a value to be appended to the current list
"""
try:
self.__collection.update({Mongo.ID_FIELD: self.__escape(doc_id)},
{'$addToSet': {self.__escape(field)
: {'$each': value}}},
upsert=True)
except Exception as e:
PLOGGER.error("\nError (doc_id: " + str(doc_id) + "), field: " + field + "\n" + str(e))
[docs] def append_dict(self, doc_id, field, dictkey, value):
"""Appends the value to a given field that stores a dict.
If the dictkey is already in use, the value stored there will be overwritten.
:param doc_id: document id
:param field: field
:param dictkey: key in the dictionary
:param value: value to be increased by
"""
key = self.__escape(field) + "." + self.__escape(dictkey)
self.__collection.update({Mongo.ID_FIELD: self.__escape(doc_id)},
{'$set': {key: value}},
upsert=True)
[docs] def inc(self, doc_id, field, value):
"""Increments the value of a specified field."""
self.__collection.update({Mongo.ID_FIELD: self.__escape(doc_id)},
{'$inc': {self.__escape(field): value}},
upsert=True)
[docs] def inc_in_dict(self, doc_id, field, dictkey, value=1):
"""Increments a value that is inside a dict.
:param doc_id: document id
:param field: field
:param dictkey: key in the dictionary
:param value: value to be increased by
"""
key = self.__escape(field) + "." + self.__escape(dictkey)
self.__collection.update({Mongo.ID_FIELD: self.__escape(doc_id)},
{'$inc': {key: value}},
upsert=True)
[docs] def find_by_id(self, doc_id):
"""Returns unescaped document content for a given document id."""
return self.unescape_doc(self.__collection.find_one({Mongo.ID_FIELD: self.__escape(doc_id)}))
[docs] def find_all(self, no_timeout=False):
"""Returns a Cursor instance that allows us to iterate over all documents."""
return self.__collection.find(no_cursor_timeout=no_timeout)
[docs] def drop(self):
"""Deletes the contents of the given collection (including indices)."""
self.__collection.drop()
PLOGGER.info(self.__collection_name + " dropped")
[docs] def get_num_docs(self):
"""Returns total number of documents in the mongo collection."""
return self.find_all().count()
[docs] @staticmethod
def print_doc(doc):
PLOGGER.info("_id: " + doc[Mongo.ID_FIELD])
for key, value in doc.items():
if key == Mongo.ID_FIELD: continue # ignore the id key
if type(value) is list:
PLOGGER.info(key + ":")
for v in value:
PLOGGER.info("\t" + str(v))
else:
PLOGGER.info(key + ": " + str(value))
[docs]def main():
parser = argparse.ArgumentParser()
parser.add_argument("collection", help="name of the collection")
parser.add_argument("doc_id", help="doc_id to be looked up")
args = parser.parse_args()
if args.collection:
coll = args.collection
if args.doc_id:
doc_id = args.doc_id
mongo = Mongo(MONGO_HOST, MONGO_DB, coll)
# currently, a single operation (lookup) is supported
res = mongo.find_by_id(doc_id)
if res is None:
PLOGGER.info("Document ID " + doc_id + " cannot be found")
else:
mongo.print_doc(res)
if __name__ == "__main__":
main()