Source code for pymco.rpc

"""
:py:mod:`pymco.rpc`
-------------------
MCollective RPC calls support.
"""
import logging

LOG = logging.getLogger(__name__)


[docs]class SimpleAction(object): """Single RPC call to MCollective :arg config: :py:class:`pymco.config.Config` instance. :arg msg: A dictionary like object, usually a :py:class:`pymco.message.Message` instance. :arg \*\*kwargs: extra keyword arguments. Set the collective here if you aren't targeting the main collective. """ def __init__(self, config, msg, agent, logger=LOG, **kwargs): self.logger = logger self.logger.debug("init rpc.SimpleAction") self.config = config self.msg = msg self.agent = agent self._connector = None self.collective = (kwargs.get('collective', None) or self.config['main_collective']) @property def connector(self): if not self._connector: self._connector = self.config.get_connector() return self._connector
[docs] def get_target(self): """MCollective RPC call target. :return: middleware target for the request. """ return self.connector.get_target(collective=self.collective, agent=self.agent)
[docs] def get_reply_target(self): """MCollective RPC call reply target. This should build the subscription target required for listening replies to this RPC call. :return: middleware target for the response. """ return self.connector.get_reply_target(collective=self.collective, agent=self.agent)
[docs] def call(self, timeout=5): """Make the RPC call. It should subscribe to the reply target, execute the RPC call and wait for the result. :arg timeout: RPC call timeout. :return: a dictionary like object with response. :raise: :py:exc:`pymco.exc.TimeoutError` if expected messages don't arrive in ``timeout`` seconds. """ self.logger.debug("connecting, wait=True") self.connector.connect(wait=True) reply_target = self.get_reply_target() self.logger.debug("subscribing to destination={r}".format(r=reply_target)) self.connector.subscribe(destination=reply_target) self.logger.debug("sending") self.connector.send(self.msg, self.get_target(), **{'reply-to': reply_target}) self.logger.debug("receiving replies, timeout={t}".format(t=timeout)) result = self.connector.receive(timeout=timeout) self.logger.debug("disconnecting") self.connector.disconnect() return result