"""
:py:mod:`pymco.utils`
---------------------
python-mcollective utils that don't fit elsewhere.
"""
import binascii
import importlib
import logging
[docs]def import_class(import_path):
"""Import a class based on given dotted import path string.
It just splits the import path in order to geth the module and class names,
then it just calls to :py:func:`__import__` with the module name and
:py:func:`getattr` with the module and the class name.
:arg import_path: dotted import path string.
:return: the class once imported.
:raise: :py:exc:`ImportError` if the class can't be imported.
"""
parts = import_path.split('.')
mod_str, klass_str = '.'.join(parts[:-1]), parts[-1]
try:
mod = importlib.import_module(mod_str)
return getattr(mod, klass_str)
except (AttributeError, ValueError):
raise ImportError('Unable to import {klass} from module {mod}'.format(
klass=klass_str,
mod=mod_str,
))
[docs]def import_object(import_path, *args, **kwargs):
"""Import a class and instantiate it.
Uses :py:func:`import_class` in order to import the given class by its
import path and instantiate it using given positional and keyword
arguments.
:arg import_path: Same argument as :py:func:`import_class`.
:arg \*args: extra pPositional arguments for object instantiation.
:arg \*\*kwargs: extra Keyword arguments for object instantiation.
:returns: an object the imported class initialized with given arguments.
"""
return import_class(import_path)(*args, **kwargs)
[docs]def pem_to_der(pem):
"""Convert an ascii-armored PEM certificate to a DER encoded certificate
See http://stackoverflow.com/a/12921889 for details. Python ``ssl`` module
has it own method for this, but it shouldn't work properly and this method
is required.
:arg str pem: The PEM certificate as string.
"""
# TODO(rafaduran): report and/or fix Python ssl method.
# Importing here since Crypto module is only require for the SSL security
# provider plugin.
from Crypto.Util.asn1 import DerSequence
lines = pem.replace(" ", '').split()
der = binascii.a2b_base64(''.join(lines[1:-1]))
# Extract subject_public_key_info field from X.509 certificate (see RFC3280)
cert = DerSequence()
cert.decode(der)
tbs_certificate = DerSequence()
tbs_certificate.decode(cert[0])
subject_public_key_info = tbs_certificate[6]
# this can be passed to RSA.importKey()
return subject_public_key_info
[docs]def load_rsa_key(filename):
"""Read filename and try to load its contents as an RSA key.
Wrapper over :py:meth:`Crypto.PublicKey.RSA.importKey`, just getting the
file content first and then just loading the key from it.
:param filename: RSA key file name.
:returns: loaded RSA key.
"""
# Importing here since Crypto module is only require for the SSL security
# provider plugin.
from Crypto.PublicKey import RSA
logger = logging.getLogger(__name__)
logger.debug("reading RSA key from {f}".format(f=filename))
with open(filename, 'rt') as key:
content = key.read()
if content.startswith('-----BEGIN CERTIFICATE-----'):
# TODO(rafadruan): this lacks testing.
logger.debug("found ASCII-armored PEM certificate; converting to DER")
content = pem_to_der(content)
logger.debug("Importing RSA key")
k = RSA.importKey(content)
logger.debug("returning key")
return k