#!/usr/bin/env python3
"""
MongoDBShell
===============
Author : joe@joedrumgoole.com
Follow me on twitter like `@jdrumgoole <https://twitter.com/jdrumgoole>`_. for
updates on this package.
`MongoDBShell <https://pypi.org/project/mongodbshell/>`_ is a module that
provides more natural interaction with MongoDB via the Python shell.
Install using `pip3` (`MongoDBShell` only supports Python 3).
``$pip3 install mongodbshell``
To use::
>>> import mongodbshell
>>> client = mongodbshell.MongoDB()
>>> client.collection="test.test"
>>> client
mongodbshell.MongoDB('test', 'test', 'mongodb://localhost:27017')
>>> client.insert_one({"msg" : "MongoDBShell is great"})
ObjectId('5cb30cfa72a4ae3b105afa1c')
>>> client.find_one()
1 {'_id': ObjectId('5cb30cfa72a4ae3b105afa1c'), 'msg': 'MongoDBShell is great'}
>>> client.line_numbers = 0
>>> client.find_one()
{'_id': ObjectId('5cb30cfa72a4ae3b105afa1c'), 'msg': 'MongoDBShell is great'}
>>> # note the line number is no longer present
>>> client.output_file="output.txt" # send all output to this file
>>> client.find_one()
Output is also going to 'output.txt'
{'_id': ObjectId('5cb30cfa72a4ae3b105afa1c'), 'msg': 'MongoDBShell is great'}
>>> print(open("output.txt").read(), end="")
{'_id': ObjectId('5cb30cfa72a4ae3b105afa1c'), 'msg': 'MongoDBShell is great'}
>>>
This will give you a prebuilt :py:class:`~MongoDBShell.MongoDB` object.
"""
import pymongo
import pprint
import shutil
import sys
[docs]class ShellError(ValueError):
pass
if sys.platform == "Windows":
db_name_excluded_chars = r'/\. "$*<>:|?'
else:
db_name_excluded_chars = r'/\. "$'
[docs]class MongoDBShellError(ValueError):
pass
[docs]class MongoDB:
"""
Simple command line MongoDB proxy for use in the Python shell.
"""
def __init__(self,
database_name="test",
collection_name="test",
host="mongodb://localhost:27017",
*args,
**kwargs):
"""
Creat a new client object with a default database and
collection.
:param database_name: The name of the database to be opened
:param collection_name: The collection name to be opened
:param mongodb_uri: A properly formatted MongoDB URI
:param *args, *kwargs : Passed through to MongoClient
>>> from mongodbshell import MongoDB
>>> client = MongoDB()
>>> client.database = "demo"
>>> client.collection = "zipcodes"
>>> client.collection = "demo.zipcodes"
>>> client.collection = "db$.test"
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/jdrumgoole/GIT/mongodbshell/mongodbshell/__init__.py", line 152, in collection
else:
mongodbshell.ShellError: 'db$' is not a valid database name
>>>
"""
self._mongodb_uri = host
self._client = pymongo.MongoClient(host=self._mongodb_uri, *args, **kwargs)
self._database_name = database_name
self._collection_name = collection_name
self._database = self._client[self._database_name]
self._set_collection(collection_name)
#
# self._collection = self._database[self._collection_name]
self._output_filename = None
self._output_file = None
self._line_numbers = True
self._pretty_print = True
self._paginate = True
self._overlap = 0
[docs] @staticmethod
def valid_mongodb_name(name):
"""
Check that the name for a database has no illegal
characters
:param name: the name of the database
:return: True if the name is valid
"""
for char in db_name_excluded_chars:
if char in name:
return None
return name
@property
def client(self):
"""
:return: the MongoDBClient object
"""
return self._client
@property
def uri(self):
"""
:return: The URI used to create the Proxy object
"""
return self._mongodb_uri
@property
def database(self):
"""
Assign to this property to set the current default database.
:return: Return the default database object associated with the Proxy
"""
return self._database
@database.setter
def database(self, database_name):
"""
Set the default database for this Proxy object.
:param database_name: A string naming the database
"""
if database_name and MongoDB.valid_mongodb_name(database_name):
self._database = self.client[database_name]
else:
raise ShellError(f"'{database_name}' is not a valid database name")
@property
def database_name(self):
"""
:return: The name of the default database
"""
return self._database_name
def _set_collection(self, name):
if "." in name:
database_name, dot, collection_name = name.partition(".")
if self.valid_mongodb_name(database_name):
if self.valid_mongodb_name(collection_name):
self._database = self._client[database_name]
self._database_name = database_name
self._collection = self._database[collection_name]
self._collection_name = collection_name
else:
raise ShellError(f"'{collection_name}' is not a valid collection name")
else:
raise ShellError(f"'{database_name}' is not a valid database name")
else:
if self.valid_mongodb_name(name):
self._collection = self._database[name]
self._collection_name = name
else:
raise ShellError(f"'{name}' is not a valid collection name")
@property
def collection(self):
"""
Assign to `collection` to reset the current default collection.
Return the default collection object associated with the `MongoDB` object.
"""
return self._collection
@property
def collection_name(self):
"""
:return: The name of the default collection
"""
return self._collection_name
@collection.setter
def collection(self, db_collection_name):
"""
Set the default collection for the database associated with the `MongoDB`
object. The user can specify a database and a collection by using a dot
notation <database_name.collection_name>.
:param db_collection_name: the name of the database and collection
"""
self._set_collection(db_collection_name)
# if "." in db_collection_name:
# database_name, dot, collection_name = db_collection_name.partition(".")
# if self.valid_database_name(database_name):
# if self.valid_database_name(collection_name):
# self._database = self._client[database_name]
# self._collection = self._database[collection_name]
# else:
# raise ShellError(f"'{collection_name}' is not a valid collection name")
# else:
# raise ShellError(f"'{database_name}' is not a valid database name")
# else:
# if self.valid_database_name(db_collection_name):
# self._collection = self._database[db_collection_name]
# else:
# raise ShellError(f"'{db_collection_name}' is not a valid collection name")
[docs] def is_master(self):
"""
Run the pymongo is_master command for the current server.
:return: the is_master result doc.
"""
result = self.database.command("ismaster")
pprint.pprint(result)
def _cursor_to_line(self, cursor):
for i in cursor:
yield from self.doc_to_lines(i)
[docs] def find(self, *args, **kwargs):
"""
Run the pymongo find command against the default database and collection
and paginate the output to the screen.
"""
# print(f"database.collection: '{self.database.name}.{self.collection.name}'")
self.print_cursor(self.collection.find(*args, **kwargs))
[docs] def find_one(self, *args, **kwargs):
"""
Run the pymongo find_one command against the default database and collection
and paginate the output to the screen.
"""
# print(f"database.collection: '{self.database.name}.{self.collection.name}'")
self.print_doc(self.collection.find_one(*args, **kwargs))
[docs] def insert_one(self, *args, **kwargs):
"""
Run the pymongo insert_one command against the default database and collection
and returne the inserted ID.
"""
# print(f"database.collection: '{self.database.name}.{self.collection.name}'")
result = self.collection.insert_one(*args, **kwargs)
return result.inserted_id
[docs] def insert_many(self, *args, **kwargs):
"""
Run the pymongo insert_many command against the default database and collection
and return the list of inserted IDs.
"""
# print(f"database.collection: '{self.database.name}.{self.collection.name}'")
result = self.collection.insert_many(*args, **kwargs)
return result.inserted_ids
[docs] def delete_one(self, *args, **kwargs):
"""
Run the pymongo delete_one command against the default database and collection
and return the deleted IDs.
"""
result = self.collection.delete_one(*args, **kwargs)
return result.raw_result
[docs] def delete_many(self, *args, **kwargs):
"""
Run the pymongo delete_many command against the default database and collection
and return the deleted IDs.
"""
result = self.collection.delete_many(*args, **kwargs)
return result.raw_result
[docs] def count_documents(self, filter={}, *args, **kwargs):
"""
Count all the documents in a collection accurately
"""
result = self.collection.count_documents(filter, *args, **kwargs)
return result
[docs] def aggregate(self,pipeline, session=None, **kwargs):
"""
Run the aggregation pipeline
"""
self.print_cursor(self.collection.aggregate(pipeline, session, **kwargs))
[docs] def list_database_names(self):
"""
List all the databases on the default server.
"""
self.pager(self.client.list_database_names())
[docs] def dbstats(self):
"""
Run dbstats command for database
See https://docs.mongodb.com/manual/reference/method/db.stats/
"""
pprint.pprint(self.database.command("dbstats"))
[docs] def collstats(self, scale=1024, verbose=False):
"""
Run collection stats for collection.
see https://docs.mongodb.com/manual/reference/command/collStats/
:param scale: Scale at which to report sizes
:param verbose: used for extended report on legacy MMAPV1 storage engine
:return: JSON doc with stats
"""
self.print_doc(self.database.command(
{"collStats": self._collection_name,
"scale": scale,
"verbose": verbose}))
# def __getattr__(self, item):
# if hasattr(self._collection, item):
# return getattr(self.collection, item)
# else:
# raise MongoDBShellError(f"No such item {item} in PyMongo collection object")
def _get_collections(self, db_names=None):
"""
Internal function to return all the collections for every database.
include a list of db_names to filter the list of collections.
"""
if db_names:
db_list = db_names
else:
db_list = self.client.list_database_names()
for db_name in db_list:
db = self.client.get_database(db_name)
for col_name in db.list_collection_names():
size = db[col_name].g
yield f"{db_name}.{col_name}"
def list_collection_names(self, database_name):
self.pager(self._get_collections())
[docs] @staticmethod
def confirm_yes(message):
"""
Return true if user confirms yes. A correct response
is 'y' or 'Y'. All other chars will return false.
:param message: A string
:return: bool.
"""
response = input(f"{message}[ y/Y]:")
response.upper()
return response == "Y"
def drop_collection(self, confirm=True):
if confirm and self.confirm_yes(f'Drop collection:{self._database_name}.{self._collection_name}'):
return self._collection.drop()
else:
return self._collection.drop()
def drop_database(self, confirm=True):
if confirm and self.confirm_yes(f'Drop database:{self._database_name}'):
return self._client.drop_database(self.database)
else:
return self._client.drop_database(self.database)
@property
def overlap(self):
"""
Get and set the line_numbers boolean
:return: `line_numbers` (True|False)
"""
return self._overlap
@overlap.setter
def overlap(self, value):
self._overlap = value
@property
def line_numbers(self):
"""
Get and set the line_numbers boolean
:return: `line_numbers` (True|False)
"""
return self._line_numbers
@line_numbers.setter
def line_numbers(self, state):
self._line_numbers = state
@property
def pretty_print(self):
"""
Get and set the pretty print boolean
:return: `pretty_print` (True|False)
"""
return self._pretty_print
@pretty_print.setter
def pretty_print(self, state):
self._pretty_print = state
@property
def paginate(self):
return self._paginate
@paginate.setter
def paginate(self, state):
"""
:param state: True, turn on pagination
:return:
"""
self._paginate = state
@property
def output_file(self):
"""
:return: The name of the output file
"""
return self._output_filename
@output_file.setter
def output_file(self, filename):
"""
:param filename: file to output `pager` output to.
:return:
"""
if not filename or filename == "":
self._output_filename = None
self._output_file = None
else:
self._output_filename = filename
[docs] def paginate_doc(self, doc):
"""
:param doc: a dictionary of data
:return:
"""
[docs] def doc_to_lines(self, doc, format_func=None):
"""
Generator that converts a doc to a sequence of lines.
:param doc: A dictionary
:param format_func: customisable formatter defaults to pformat
:return: a generator yielding a line at a time
"""
if format_func:
for l in format_func(doc).splitlines():
yield l
elif self.pretty_print:
for l in pprint.pformat(doc).splitlines():
yield l
else:
for l in str(doc).splitlines():
yield l
[docs] def cursor_to_lines(self, cursor, format_func=None):
"""
Take a cursor that returns a list of docs and returns a
generator yield each line of each doc a line at a time.
:param cursor: A mongod cursor yielding docs (dictonaries)
:param format_func: A customisable format function
:return: a generator yielding a line at a time
"""
for doc in cursor:
yield from self.doc_to_lines(doc, format_func)
def print_cursor(self, cursor, format_func=None):
return self.pager(self.cursor_to_lines(cursor, format_func))
def print_doc(self, doc, format_func=None):
return self.pager(self.doc_to_lines(doc, format_func))
def __str__(self):
return f"client : '{self.uri}'\n" +\
f"db : '{self.database.name}'\n" +\
f"collection : '{self.collection.name}'"
def __repr__(self):
return f"mongodbshell.MongoDB('{self.database.name}', '{self.collection.name}', '{self.uri}')"
client = MongoDB()