| 
									
										
										
										
											2012-05-02 15:21:36 -07:00
										 |  |  | # Copyright (c) 2010, 2011, 2012 Nicira, Inc. | 
					
						
							| 
									
										
										
										
											2010-08-25 10:26:40 -07:00
										 |  |  | # | 
					
						
							|  |  |  | # Licensed under the Apache License, Version 2.0 (the "License"); | 
					
						
							|  |  |  | # you may not use this file except in compliance with the License. | 
					
						
							|  |  |  | # You may obtain a copy of the License at: | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | #     http://www.apache.org/licenses/LICENSE-2.0 | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # Unless required by applicable law or agreed to in writing, software | 
					
						
							|  |  |  | # distributed under the License is distributed on an "AS IS" BASIS, | 
					
						
							|  |  |  | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
					
						
							|  |  |  | # See the License for the specific language governing permissions and | 
					
						
							|  |  |  | # limitations under the License. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import errno | 
					
						
							|  |  |  | import os | 
					
						
							|  |  |  | import select | 
					
						
							|  |  |  | import socket | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ovs.poller | 
					
						
							|  |  |  | import ovs.socket_util | 
					
						
							| 
									
										
										
										
											2011-09-24 17:53:30 -07:00
										 |  |  | import ovs.vlog | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | vlog = ovs.vlog.Vlog("stream") | 
					
						
							| 
									
										
										
										
											2010-08-25 10:26:40 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2011-09-23 23:43:12 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-04-11 20:18:34 -07:00
										 |  |  | def stream_or_pstream_needs_probes(name): | 
					
						
							|  |  |  |     """ 1 if the stream or pstream specified by 'name' needs periodic probes to
 | 
					
						
							|  |  |  |     verify connectivty.  For [p]streams which need probes, it can take a long | 
					
						
							|  |  |  |     time to notice the connection was dropped.  Returns 0 if probes aren't | 
					
						
							|  |  |  |     needed, and -1 if 'name' is invalid"""
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if PassiveStream.is_valid_name(name) or Stream.is_valid_name(name): | 
					
						
							|  |  |  |         # Only unix and punix are supported currently. | 
					
						
							|  |  |  |         return 0 | 
					
						
							|  |  |  |     else: | 
					
						
							|  |  |  |         return -1 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-08-25 10:26:40 -07:00
										 |  |  | class Stream(object): | 
					
						
							|  |  |  |     """Bidirectional byte stream.  Currently only Unix domain sockets
 | 
					
						
							|  |  |  |     are implemented."""
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # States. | 
					
						
							|  |  |  |     __S_CONNECTING = 0 | 
					
						
							|  |  |  |     __S_CONNECTED = 1 | 
					
						
							|  |  |  |     __S_DISCONNECTED = 2 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # Kinds of events that one might wait for. | 
					
						
							|  |  |  |     W_CONNECT = 0               # Connect complete (success or failure). | 
					
						
							|  |  |  |     W_RECV = 1                  # Data received. | 
					
						
							|  |  |  |     W_SEND = 2                  # Send buffer room available. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @staticmethod | 
					
						
							|  |  |  |     def is_valid_name(name): | 
					
						
							|  |  |  |         """Returns True if 'name' is a stream name in the form "TYPE:ARGS" and
 | 
					
						
							|  |  |  |         TYPE is a supported stream type (currently only "unix:"), otherwise | 
					
						
							|  |  |  |         False."""
 | 
					
						
							|  |  |  |         return name.startswith("unix:") | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-02-27 11:13:00 -08:00
										 |  |  |     def __init__(self, socket, name, status): | 
					
						
							| 
									
										
										
										
											2010-08-25 10:26:40 -07:00
										 |  |  |         self.socket = socket | 
					
						
							|  |  |  |         self.name = name | 
					
						
							|  |  |  |         if status == errno.EAGAIN: | 
					
						
							|  |  |  |             self.state = Stream.__S_CONNECTING | 
					
						
							|  |  |  |         elif status == 0: | 
					
						
							|  |  |  |             self.state = Stream.__S_CONNECTED | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             self.state = Stream.__S_DISCONNECTED | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.error = 0 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @staticmethod | 
					
						
							|  |  |  |     def open(name): | 
					
						
							|  |  |  |         """Attempts to connect a stream to a remote peer.  'name' is a
 | 
					
						
							|  |  |  |         connection name in the form "TYPE:ARGS", where TYPE is an active stream | 
					
						
							|  |  |  |         class's name and ARGS are stream class-specific.  Currently the only | 
					
						
							|  |  |  |         supported TYPE is "unix". | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Returns (error, stream): on success 'error' is 0 and 'stream' is the | 
					
						
							|  |  |  |         new Stream, on failure 'error' is a positive errno value and 'stream' | 
					
						
							|  |  |  |         is None. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Never returns errno.EAGAIN or errno.EINPROGRESS.  Instead, returns 0 | 
					
						
							|  |  |  |         and a new Stream.  The connect() method can be used to check for | 
					
						
							|  |  |  |         successful connection completion."""
 | 
					
						
							|  |  |  |         if not Stream.is_valid_name(name): | 
					
						
							|  |  |  |             return errno.EAFNOSUPPORT, None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         connect_path = name[5:] | 
					
						
							|  |  |  |         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM, | 
					
						
							| 
									
										
										
										
											2012-02-27 11:13:00 -08:00
										 |  |  |                                                        True, None, | 
					
						
							| 
									
										
										
										
											2010-08-25 10:26:40 -07:00
										 |  |  |                                                        connect_path) | 
					
						
							|  |  |  |         if error: | 
					
						
							|  |  |  |             return error, None | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             status = ovs.socket_util.check_connection_completion(sock) | 
					
						
							| 
									
										
										
										
											2012-02-27 11:13:00 -08:00
										 |  |  |             return 0, Stream(sock, name, status) | 
					
						
							| 
									
										
										
										
											2010-08-25 10:26:40 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     @staticmethod | 
					
						
							| 
									
										
										
										
											2011-08-23 10:50:47 -07:00
										 |  |  |     def open_block((error, stream)): | 
					
						
							| 
									
										
										
										
											2010-08-25 10:26:40 -07:00
										 |  |  |         """Blocks until a Stream completes its connection attempt, either
 | 
					
						
							| 
									
										
										
										
											2011-08-23 10:50:47 -07:00
										 |  |  |         succeeding or failing.  (error, stream) should be the tuple returned by | 
					
						
							| 
									
										
										
										
											2010-08-25 10:26:40 -07:00
										 |  |  |         Stream.open().  Returns a tuple of the same form. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Typical usage: | 
					
						
							| 
									
										
										
										
											2011-08-23 10:50:47 -07:00
										 |  |  |         error, stream = Stream.open_block(Stream.open("unix:/tmp/socket"))"""
 | 
					
						
							| 
									
										
										
										
											2010-08-25 10:26:40 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |         if not error: | 
					
						
							|  |  |  |             while True: | 
					
						
							|  |  |  |                 error = stream.connect() | 
					
						
							|  |  |  |                 if error != errno.EAGAIN: | 
					
						
							|  |  |  |                     break | 
					
						
							|  |  |  |                 stream.run() | 
					
						
							|  |  |  |                 poller = ovs.poller.Poller() | 
					
						
							| 
									
										
										
										
											2012-07-02 10:34:32 -07:00
										 |  |  |                 stream.run_wait(poller) | 
					
						
							| 
									
										
										
										
											2010-08-25 10:26:40 -07:00
										 |  |  |                 stream.connect_wait(poller) | 
					
						
							|  |  |  |                 poller.block() | 
					
						
							|  |  |  |             assert error != errno.EINPROGRESS | 
					
						
							| 
									
										
										
										
											2011-09-23 23:43:12 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-08-25 10:26:40 -07:00
										 |  |  |         if error and stream: | 
					
						
							|  |  |  |             stream.close() | 
					
						
							|  |  |  |             stream = None | 
					
						
							|  |  |  |         return error, stream | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def close(self): | 
					
						
							|  |  |  |         self.socket.close() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __scs_connecting(self): | 
					
						
							|  |  |  |         retval = ovs.socket_util.check_connection_completion(self.socket) | 
					
						
							|  |  |  |         assert retval != errno.EINPROGRESS | 
					
						
							|  |  |  |         if retval == 0: | 
					
						
							|  |  |  |             self.state = Stream.__S_CONNECTED | 
					
						
							|  |  |  |         elif retval != errno.EAGAIN: | 
					
						
							|  |  |  |             self.state = Stream.__S_DISCONNECTED | 
					
						
							|  |  |  |             self.error = retval | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def connect(self): | 
					
						
							|  |  |  |         """Tries to complete the connection on this stream.  If the connection
 | 
					
						
							|  |  |  |         is complete, returns 0 if the connection was successful or a positive | 
					
						
							|  |  |  |         errno value if it failed.  If the connection is still in progress, | 
					
						
							|  |  |  |         returns errno.EAGAIN."""
 | 
					
						
							|  |  |  |         last_state = -1         # Always differs from initial self.state | 
					
						
							|  |  |  |         while self.state != last_state: | 
					
						
							| 
									
										
										
										
											2011-08-23 11:09:46 -07:00
										 |  |  |             last_state = self.state | 
					
						
							| 
									
										
										
										
											2010-08-25 10:26:40 -07:00
										 |  |  |             if self.state == Stream.__S_CONNECTING: | 
					
						
							|  |  |  |                 self.__scs_connecting() | 
					
						
							|  |  |  |             elif self.state == Stream.__S_CONNECTED: | 
					
						
							|  |  |  |                 return 0 | 
					
						
							|  |  |  |             elif self.state == Stream.__S_DISCONNECTED: | 
					
						
							|  |  |  |                 return self.error | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def recv(self, n): | 
					
						
							|  |  |  |         """Tries to receive up to 'n' bytes from this stream.  Returns a
 | 
					
						
							|  |  |  |         (error, string) tuple: | 
					
						
							| 
									
										
										
										
											2011-09-23 23:43:12 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-08-25 10:26:40 -07:00
										 |  |  |             - If successful, 'error' is zero and 'string' contains between 1 | 
					
						
							|  |  |  |               and 'n' bytes of data. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             - On error, 'error' is a positive errno value. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             - If the connection has been closed in the normal fashion or if 'n' | 
					
						
							|  |  |  |               is 0, the tuple is (0, ""). | 
					
						
							| 
									
										
										
										
											2011-09-23 23:43:12 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-08-25 10:26:40 -07:00
										 |  |  |         The recv function will not block waiting for data to arrive.  If no | 
					
						
							|  |  |  |         data have been received, it returns (errno.EAGAIN, "") immediately."""
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         retval = self.connect() | 
					
						
							|  |  |  |         if retval != 0: | 
					
						
							|  |  |  |             return (retval, "") | 
					
						
							|  |  |  |         elif n == 0: | 
					
						
							|  |  |  |             return (0, "") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             return (0, self.socket.recv(n)) | 
					
						
							|  |  |  |         except socket.error, e: | 
					
						
							|  |  |  |             return (ovs.socket_util.get_exception_errno(e), "") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def send(self, buf): | 
					
						
							|  |  |  |         """Tries to send 'buf' on this stream.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         If successful, returns the number of bytes sent, between 1 and | 
					
						
							|  |  |  |         len(buf).  0 is only a valid return value if len(buf) is 0. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         On error, returns a negative errno value. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Will not block.  If no bytes can be immediately accepted for | 
					
						
							|  |  |  |         transmission, returns -errno.EAGAIN immediately."""
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         retval = self.connect() | 
					
						
							|  |  |  |         if retval != 0: | 
					
						
							|  |  |  |             return -retval | 
					
						
							|  |  |  |         elif len(buf) == 0: | 
					
						
							|  |  |  |             return 0 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             return self.socket.send(buf) | 
					
						
							|  |  |  |         except socket.error, e: | 
					
						
							|  |  |  |             return -ovs.socket_util.get_exception_errno(e) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def run(self): | 
					
						
							|  |  |  |         pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def run_wait(self, poller): | 
					
						
							|  |  |  |         pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def wait(self, poller, wait): | 
					
						
							|  |  |  |         assert wait in (Stream.W_CONNECT, Stream.W_RECV, Stream.W_SEND) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if self.state == Stream.__S_DISCONNECTED: | 
					
						
							|  |  |  |             poller.immediate_wake() | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if self.state == Stream.__S_CONNECTING: | 
					
						
							|  |  |  |             wait = Stream.W_CONNECT | 
					
						
							| 
									
										
										
										
											2011-08-23 11:16:57 -07:00
										 |  |  |         if wait == Stream.W_RECV: | 
					
						
							| 
									
										
										
										
											2010-08-25 10:26:40 -07:00
										 |  |  |             poller.fd_wait(self.socket, select.POLLIN) | 
					
						
							| 
									
										
										
										
											2011-08-23 11:16:57 -07:00
										 |  |  |         else: | 
					
						
							|  |  |  |             poller.fd_wait(self.socket, select.POLLOUT) | 
					
						
							| 
									
										
										
										
											2010-08-25 10:26:40 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def connect_wait(self, poller): | 
					
						
							|  |  |  |         self.wait(poller, Stream.W_CONNECT) | 
					
						
							| 
									
										
										
										
											2011-09-23 23:43:12 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-08-25 10:26:40 -07:00
										 |  |  |     def recv_wait(self, poller): | 
					
						
							|  |  |  |         self.wait(poller, Stream.W_RECV) | 
					
						
							| 
									
										
										
										
											2011-09-23 23:43:12 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-08-25 10:26:40 -07:00
										 |  |  |     def send_wait(self, poller): | 
					
						
							|  |  |  |         self.wait(poller, Stream.W_SEND) | 
					
						
							| 
									
										
										
										
											2011-09-23 23:43:12 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-08-25 10:26:40 -07:00
										 |  |  |     def __del__(self): | 
					
						
							|  |  |  |         # Don't delete the file: we might have forked. | 
					
						
							|  |  |  |         self.socket.close() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2011-09-23 23:43:12 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-08-25 10:26:40 -07:00
										 |  |  | class PassiveStream(object): | 
					
						
							|  |  |  |     @staticmethod | 
					
						
							|  |  |  |     def is_valid_name(name): | 
					
						
							|  |  |  |         """Returns True if 'name' is a passive stream name in the form
 | 
					
						
							|  |  |  |         "TYPE:ARGS" and TYPE is a supported passive stream type (currently only | 
					
						
							|  |  |  |         "punix:"), otherwise False."""
 | 
					
						
							|  |  |  |         return name.startswith("punix:") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, sock, name, bind_path): | 
					
						
							|  |  |  |         self.name = name | 
					
						
							|  |  |  |         self.socket = sock | 
					
						
							|  |  |  |         self.bind_path = bind_path | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @staticmethod | 
					
						
							|  |  |  |     def open(name): | 
					
						
							|  |  |  |         """Attempts to start listening for remote stream connections.  'name'
 | 
					
						
							|  |  |  |         is a connection name in the form "TYPE:ARGS", where TYPE is an passive | 
					
						
							|  |  |  |         stream class's name and ARGS are stream class-specific.  Currently the | 
					
						
							|  |  |  |         only supported TYPE is "punix". | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Returns (error, pstream): on success 'error' is 0 and 'pstream' is the | 
					
						
							|  |  |  |         new PassiveStream, on failure 'error' is a positive errno value and | 
					
						
							|  |  |  |         'pstream' is None."""
 | 
					
						
							|  |  |  |         if not PassiveStream.is_valid_name(name): | 
					
						
							|  |  |  |             return errno.EAFNOSUPPORT, None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         bind_path = name[6:] | 
					
						
							|  |  |  |         error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM, | 
					
						
							|  |  |  |                                                        True, bind_path, None) | 
					
						
							|  |  |  |         if error: | 
					
						
							|  |  |  |             return error, None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             sock.listen(10) | 
					
						
							|  |  |  |         except socket.error, e: | 
					
						
							| 
									
										
										
										
											2011-09-24 17:53:30 -07:00
										 |  |  |             vlog.err("%s: listen: %s" % (name, os.strerror(e.error))) | 
					
						
							| 
									
										
										
										
											2010-08-25 10:26:40 -07:00
										 |  |  |             sock.close() | 
					
						
							|  |  |  |             return e.error, None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         return 0, PassiveStream(sock, name, bind_path) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def close(self): | 
					
						
							|  |  |  |         """Closes this PassiveStream.""" | 
					
						
							|  |  |  |         self.socket.close() | 
					
						
							|  |  |  |         if self.bind_path is not None: | 
					
						
							|  |  |  |             ovs.fatal_signal.unlink_file_now(self.bind_path) | 
					
						
							|  |  |  |             self.bind_path = None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def accept(self): | 
					
						
							|  |  |  |         """Tries to accept a new connection on this passive stream.  Returns
 | 
					
						
							|  |  |  |         (error, stream): if successful, 'error' is 0 and 'stream' is the new | 
					
						
							|  |  |  |         Stream object, and on failure 'error' is a positive errno value and | 
					
						
							|  |  |  |         'stream' is None. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Will not block waiting for a connection.  If no connection is ready to | 
					
						
							|  |  |  |         be accepted, returns (errno.EAGAIN, None) immediately."""
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         while True: | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 sock, addr = self.socket.accept() | 
					
						
							|  |  |  |                 ovs.socket_util.set_nonblocking(sock) | 
					
						
							| 
									
										
										
										
											2012-02-27 11:13:00 -08:00
										 |  |  |                 return 0, Stream(sock, "unix:%s" % addr, 0) | 
					
						
							| 
									
										
										
										
											2010-08-25 10:26:40 -07:00
										 |  |  |             except socket.error, e: | 
					
						
							|  |  |  |                 error = ovs.socket_util.get_exception_errno(e) | 
					
						
							|  |  |  |                 if error != errno.EAGAIN: | 
					
						
							|  |  |  |                     # XXX rate-limit | 
					
						
							| 
									
										
										
										
											2011-09-24 17:53:30 -07:00
										 |  |  |                     vlog.dbg("accept: %s" % os.strerror(error)) | 
					
						
							| 
									
										
										
										
											2010-08-25 10:26:40 -07:00
										 |  |  |                 return error, None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def wait(self, poller): | 
					
						
							|  |  |  |         poller.fd_wait(self.socket, select.POLLIN) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __del__(self): | 
					
						
							|  |  |  |         # Don't delete the file: we might have forked. | 
					
						
							|  |  |  |         self.socket.close() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2011-09-23 23:43:12 -07:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2011-09-26 16:02:14 -07:00
										 |  |  | def usage(name): | 
					
						
							|  |  |  |     return """
 | 
					
						
							|  |  |  | Active %s connection methods: | 
					
						
							|  |  |  |   unix:FILE               Unix domain socket named FILE | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | Passive %s connection methods: | 
					
						
							|  |  |  |   punix:FILE              Listen on Unix domain socket FILE""" % (name, name)
 |