Source code for netconf_client.session

from threading import Thread
from concurrent.futures import Future

from six.moves.queue import Queue, Empty
from lxml import etree

from netconf_client.parser import parse_messages
from netconf_client.log import logger
from netconf_client.constants import DEFAULT_HELLO, NAMESPACES, CAP_NETCONF_11
from netconf_client.error import SessionClosedException, RpcError


[docs]class Session: """A session with a NETCONF server This class is a context manager, and should always be either used with a ``with`` statement or the :meth:`close` method should be called manually when the object is no longer required. :ivar server_capabilities: The list of capabilities parsed from the server's ``<hello>`` :ivar client_capabilities: The list of capabilities parsed from the client's ``<hello>`` """ def __init__(self, sock): self.sock = sock self.mode = "1.0" self.send_msg(DEFAULT_HELLO) self.client_hello = DEFAULT_HELLO self.parser = parse_messages(sock, self.mode) # First message will be the server hello self.server_hello = next(self.parser) server_ele = etree.fromstring(self.server_hello) self.session_id = int( server_ele.xpath("/nc:hello/nc:session-id", namespaces=NAMESPACES)[0].text ) self.server_capabilities = capabilities_from_hello(server_ele) client_ele = etree.fromstring(self.client_hello) self.client_capabilities = capabilities_from_hello(client_ele) if ( CAP_NETCONF_11 in self.client_capabilities and CAP_NETCONF_11 in self.server_capabilities ): self.mode = "1.1" self.unknown_recvq = Queue() self.notifications = Queue() self.rpc_reply_futures = Queue() self.thread = Thread(target=self._recv_loop) self.thread.daemon = True self.thread.start() def __enter__(self): return self def __exit__(self, _, __, ___): self.close()
[docs] def close(self): """Closes any associated sockets and frees any other associated resources""" try: self.sock.close() except Exception: pass try: while True: f = self.rpc_reply_futures.get(block=False) f.set_exception(SessionClosedException()) self.rpc_reply_futures.task_done() except Empty: pass
[docs] def send_msg(self, msg): """Sends a raw byte string to the server :param bytes msg: The byte string to send """ logger.debug("Sending message on session %s", msg) if self.mode == "1.0": self.sock.sendall(msg + b"]]>]]>") elif self.mode == "1.1": self.sock.sendall(frame_message_11(msg))
[docs] def send_rpc(self, rpc): """Sends a raw RPC to the server :param bytes rpc: The RPC to send :rtype: :class:`concurrent.futures.Future` with a result type of tuple(:class:`bytes`, :class:`lxml.Element`) """ f = Future() self.rpc_reply_futures.put(f) self.send_msg(rpc) return f
def _recv_loop(self): while True: try: msg = self.parser.send(self.mode) except Exception as e: logger.info("Stopping recv thread due to exception %s", str(e)) return ele = etree.fromstring(msg) if ele.xpath("/nc:rpc-reply", namespaces=NAMESPACES): try: f = self.rpc_reply_futures.get(block=False) if ele.xpath("/nc:rpc-reply/nc:rpc-error", namespaces=NAMESPACES): f.set_exception(RpcError(msg, ele)) else: f.set_result((msg, ele)) self.rpc_reply_futures.task_done() msg = None except Empty: logger.warning( "An <rpc-reply> was received " "with no corresponding handler: %s", msg, ) elif ele.xpath("/notif:notification", namespaces=NAMESPACES): self.notifications.put((msg, ele)) msg = None if msg is not None: self.unknown_recvq.put((msg, ele))
def capabilities_from_hello(hello): return [ x.text for x in hello.xpath( "/nc:hello/nc:capabilities/nc:capability", namespaces=NAMESPACES ) ] def frame_message_11(msg): header = "\n#{}\n".format(len(msg)).encode("ascii") footer = b"\n##\n" return header + msg + footer