mirror of
https://github.com/openvswitch/ovs
synced 2025-10-25 15:07:05 +00:00
Python tests: Ported UNIX sockets to Windows
Unix sockets (AF_UNIX) are not supported on Windows. The replacement of Unix sockets on Windows is implemented using named pipes, we are trying to mimic the behaviour of unix sockets. Instead of using Unix sockets to communicate between components Named Pipes are used. This makes the python sockets compatible with the Named Pipe used in Windows applications. Signed-off-by: Paul-Daniel Boca <pboca@cloudbasesolutions.com> Signed-off-by: Alin Balutoiu <abalutoiu@cloudbasesolutions.com> Acked-by: Alin Gabriel Serdean <aserdean@cloudbasesolutions> Tested-by: Alin Gabriel Serdean <aserdean@cloudbasesolutions> Signed-off-by: Gurucharan Shetty <guru@ovn.org>
This commit is contained in:
committed by
Gurucharan Shetty
parent
f98c8a093f
commit
03947eb7ec
@@ -15,6 +15,7 @@
|
||||
import errno
|
||||
import os
|
||||
import socket
|
||||
import sys
|
||||
|
||||
import six
|
||||
|
||||
@@ -27,6 +28,13 @@ try:
|
||||
except ImportError:
|
||||
SSL = None
|
||||
|
||||
if sys.platform == 'win32':
|
||||
import ovs.winutils as winutils
|
||||
import pywintypes
|
||||
import win32event
|
||||
import win32file
|
||||
import win32pipe
|
||||
|
||||
vlog = ovs.vlog.Vlog("stream")
|
||||
|
||||
|
||||
@@ -63,6 +71,13 @@ class Stream(object):
|
||||
_SSL_certificate_file = None
|
||||
_SSL_ca_cert_file = None
|
||||
|
||||
# Windows only
|
||||
_write = None # overlapped for write operation
|
||||
_read = None # overlapped for read operation
|
||||
_write_pending = False
|
||||
_read_pending = False
|
||||
_retry_connect = False
|
||||
|
||||
@staticmethod
|
||||
def register_method(method, cls):
|
||||
Stream._SOCKET_METHODS[method + ":"] = cls
|
||||
@@ -81,8 +96,23 @@ class Stream(object):
|
||||
otherwise False."""
|
||||
return bool(Stream._find_method(name))
|
||||
|
||||
def __init__(self, socket, name, status):
|
||||
def __init__(self, socket, name, status, pipe=None, is_server=False):
|
||||
self.socket = socket
|
||||
self.pipe = pipe
|
||||
if sys.platform == 'win32':
|
||||
self._read = pywintypes.OVERLAPPED()
|
||||
self._read.hEvent = winutils.get_new_event()
|
||||
self._write = pywintypes.OVERLAPPED()
|
||||
self._write.hEvent = winutils.get_new_event()
|
||||
if pipe is not None:
|
||||
# Flag to check if fd is a server HANDLE. In the case of a
|
||||
# server handle we have to issue a disconnect before closing
|
||||
# the actual handle.
|
||||
self._server = is_server
|
||||
suffix = name.split(":", 1)[1]
|
||||
suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
|
||||
self._pipename = winutils.get_pipe_name(suffix)
|
||||
|
||||
self.name = name
|
||||
if status == errno.EAGAIN:
|
||||
self.state = Stream.__S_CONNECTING
|
||||
@@ -120,6 +150,38 @@ class Stream(object):
|
||||
suffix = name.split(":", 1)[1]
|
||||
if name.startswith("unix:"):
|
||||
suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
|
||||
if sys.platform == 'win32':
|
||||
pipename = winutils.get_pipe_name(suffix)
|
||||
|
||||
if len(suffix) > 255:
|
||||
# Return invalid argument if the name is too long
|
||||
return errno.ENOENT, None
|
||||
|
||||
try:
|
||||
# In case of "unix:" argument, the assumption is that
|
||||
# there is a file created in the path (suffix).
|
||||
open(suffix, 'r').close()
|
||||
except:
|
||||
return errno.ENOENT, None
|
||||
|
||||
try:
|
||||
npipe = winutils.create_file(pipename)
|
||||
try:
|
||||
winutils.set_pipe_mode(npipe,
|
||||
win32pipe.PIPE_READMODE_BYTE)
|
||||
except pywintypes.error as e:
|
||||
return errno.ENOENT, None
|
||||
except pywintypes.error as e:
|
||||
if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
|
||||
# Pipe is busy, set the retry flag to true and retry
|
||||
# again during the connect function.
|
||||
Stream.retry_connect = True
|
||||
return 0, cls(None, name, errno.EAGAIN,
|
||||
pipe=win32file.INVALID_HANDLE_VALUE,
|
||||
is_server=False)
|
||||
return errno.ENOENT, None
|
||||
return 0, cls(None, name, 0, pipe=npipe, is_server=False)
|
||||
|
||||
error, sock = cls._open(suffix, dscp)
|
||||
if error:
|
||||
return error, None
|
||||
@@ -145,6 +207,10 @@ class Stream(object):
|
||||
if not error:
|
||||
while True:
|
||||
error = stream.connect()
|
||||
if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
|
||||
# WSAEWOULDBLOCK would be the equivalent on Windows
|
||||
# for EAGAIN on Unix.
|
||||
error = errno.EAGAIN
|
||||
if error != errno.EAGAIN:
|
||||
break
|
||||
stream.run()
|
||||
@@ -152,7 +218,8 @@ class Stream(object):
|
||||
stream.run_wait(poller)
|
||||
stream.connect_wait(poller)
|
||||
poller.block()
|
||||
assert error != errno.EINPROGRESS
|
||||
if stream.socket is not None:
|
||||
assert error != errno.EINPROGRESS
|
||||
|
||||
if error and stream:
|
||||
stream.close()
|
||||
@@ -160,11 +227,36 @@ class Stream(object):
|
||||
return error, stream
|
||||
|
||||
def close(self):
|
||||
self.socket.close()
|
||||
if self.socket is not None:
|
||||
self.socket.close()
|
||||
if self.pipe is not None:
|
||||
if self._server:
|
||||
win32pipe.DisconnectNamedPipe(self.pipe)
|
||||
winutils.close_handle(self.pipe, vlog.warn)
|
||||
winutils.close_handle(self._read.hEvent, vlog.warn)
|
||||
winutils.close_handle(self._write.hEvent, vlog.warn)
|
||||
|
||||
def __scs_connecting(self):
|
||||
retval = ovs.socket_util.check_connection_completion(self.socket)
|
||||
assert retval != errno.EINPROGRESS
|
||||
if self.socket is not None:
|
||||
retval = ovs.socket_util.check_connection_completion(self.socket)
|
||||
assert retval != errno.EINPROGRESS
|
||||
elif sys.platform == 'win32':
|
||||
if self.retry_connect:
|
||||
try:
|
||||
self.pipe = winutils.create_file(self._pipename)
|
||||
self._retry_connect = False
|
||||
retval = 0
|
||||
except pywintypes.error as e:
|
||||
if e.winerror == winutils.winerror.ERROR_PIPE_BUSY:
|
||||
retval = errno.EAGAIN
|
||||
else:
|
||||
self._retry_connect = False
|
||||
retval = errno.ENOENT
|
||||
else:
|
||||
# If retry_connect is false, it means it's already
|
||||
# connected so we can set the value of retval to 0
|
||||
retval = 0
|
||||
|
||||
if retval == 0:
|
||||
self.state = Stream.__S_CONNECTED
|
||||
elif retval != errno.EAGAIN:
|
||||
@@ -209,11 +301,63 @@ class Stream(object):
|
||||
elif n == 0:
|
||||
return (0, "")
|
||||
|
||||
if sys.platform == 'win32' and self.socket is None:
|
||||
return self.__recv_windows(n)
|
||||
|
||||
try:
|
||||
return (0, self.socket.recv(n))
|
||||
except socket.error as e:
|
||||
return (ovs.socket_util.get_exception_errno(e), "")
|
||||
|
||||
def __recv_windows(self, n):
|
||||
if self._read_pending:
|
||||
try:
|
||||
nBytesRead = winutils.get_overlapped_result(self.pipe,
|
||||
self._read,
|
||||
False)
|
||||
self._read_pending = False
|
||||
recvBuffer = self._read_buffer[:nBytesRead]
|
||||
|
||||
return (0, winutils.get_decoded_buffer(recvBuffer))
|
||||
except pywintypes.error as e:
|
||||
if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
|
||||
# The operation is still pending, try again
|
||||
self._read_pending = True
|
||||
return (errno.EAGAIN, "")
|
||||
elif e.winerror in winutils.pipe_disconnected_errors:
|
||||
# If the pipe was disconnected, return 0.
|
||||
return (0, "")
|
||||
else:
|
||||
return (errno.EINVAL, "")
|
||||
|
||||
(errCode, self._read_buffer) = winutils.read_file(self.pipe,
|
||||
n,
|
||||
self._read)
|
||||
if errCode:
|
||||
if errCode == winutils.winerror.ERROR_IO_PENDING:
|
||||
self._read_pending = True
|
||||
return (errno.EAGAIN, "")
|
||||
elif errCode in winutils.pipe_disconnected_errors:
|
||||
# If the pipe was disconnected, return 0.
|
||||
return (0, "")
|
||||
else:
|
||||
return (errCode, "")
|
||||
|
||||
try:
|
||||
nBytesRead = winutils.get_overlapped_result(self.pipe,
|
||||
self._read,
|
||||
False)
|
||||
winutils.win32event.SetEvent(self._read.hEvent)
|
||||
except pywintypes.error as e:
|
||||
if e.winerror in winutils.pipe_disconnected_errors:
|
||||
# If the pipe was disconnected, return 0.
|
||||
return (0, "")
|
||||
else:
|
||||
return (e.winerror, "")
|
||||
|
||||
recvBuffer = self._read_buffer[:nBytesRead]
|
||||
return (0, winutils.get_decoded_buffer(recvBuffer))
|
||||
|
||||
def send(self, buf):
|
||||
"""Tries to send 'buf' on this stream.
|
||||
|
||||
@@ -231,6 +375,9 @@ class Stream(object):
|
||||
elif len(buf) == 0:
|
||||
return 0
|
||||
|
||||
if sys.platform == 'win32' and self.socket is None:
|
||||
return self.__send_windows(buf)
|
||||
|
||||
try:
|
||||
# Python 3 has separate types for strings and bytes. We must have
|
||||
# bytes here.
|
||||
@@ -240,6 +387,40 @@ class Stream(object):
|
||||
except socket.error as e:
|
||||
return -ovs.socket_util.get_exception_errno(e)
|
||||
|
||||
def __send_windows(self, buf):
|
||||
if self._write_pending:
|
||||
try:
|
||||
nBytesWritten = winutils.get_overlapped_result(self.pipe,
|
||||
self._write,
|
||||
False)
|
||||
self._write_pending = False
|
||||
return nBytesWritten
|
||||
except pywintypes.error as e:
|
||||
if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
|
||||
# The operation is still pending, try again
|
||||
self._read_pending = True
|
||||
return -errno.EAGAIN
|
||||
elif e.winerror in winutils.pipe_disconnected_errors:
|
||||
# If the pipe was disconnected, return connection reset.
|
||||
return -errno.ECONNRESET
|
||||
else:
|
||||
return -errno.EINVAL
|
||||
|
||||
buf = winutils.get_encoded_buffer(buf)
|
||||
self._write_pending = False
|
||||
(errCode, nBytesWritten) = winutils.write_file(self.pipe,
|
||||
buf,
|
||||
self._write)
|
||||
if errCode:
|
||||
if errCode == winutils.winerror.ERROR_IO_PENDING:
|
||||
self._write_pending = True
|
||||
return -errno.EAGAIN
|
||||
if (not nBytesWritten and
|
||||
errCode in winutils.pipe_disconnected_errors):
|
||||
# If the pipe was disconnected, return connection reset.
|
||||
return -errno.ECONNRESET
|
||||
return nBytesWritten
|
||||
|
||||
def run(self):
|
||||
pass
|
||||
|
||||
@@ -255,11 +436,52 @@ class Stream(object):
|
||||
|
||||
if self.state == Stream.__S_CONNECTING:
|
||||
wait = Stream.W_CONNECT
|
||||
|
||||
if sys.platform == 'win32':
|
||||
self.__wait_windows(poller, wait)
|
||||
return
|
||||
|
||||
if wait == Stream.W_RECV:
|
||||
poller.fd_wait(self.socket, ovs.poller.POLLIN)
|
||||
else:
|
||||
poller.fd_wait(self.socket, ovs.poller.POLLOUT)
|
||||
|
||||
def __wait_windows(self, poller, wait):
|
||||
if self.socket is not None:
|
||||
if wait == Stream.W_RECV:
|
||||
read_flags = (win32file.FD_READ |
|
||||
win32file.FD_ACCEPT |
|
||||
win32file.FD_CLOSE)
|
||||
try:
|
||||
win32file.WSAEventSelect(self.socket,
|
||||
self._read.hEvent,
|
||||
read_flags)
|
||||
except pywintypes.error as e:
|
||||
vlog.err("failed to associate events with socket: %s"
|
||||
% e.strerror)
|
||||
poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
|
||||
else:
|
||||
write_flags = (win32file.FD_WRITE |
|
||||
win32file.FD_CONNECT |
|
||||
win32file.FD_CLOSE)
|
||||
try:
|
||||
win32file.WSAEventSelect(self.socket,
|
||||
self._write.hEvent,
|
||||
write_flags)
|
||||
except pywintypes.error as e:
|
||||
vlog.err("failed to associate events with socket: %s"
|
||||
% e.strerror)
|
||||
poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT)
|
||||
else:
|
||||
if wait == Stream.W_RECV:
|
||||
if self._read:
|
||||
poller.fd_wait(self._read.hEvent, ovs.poller.POLLIN)
|
||||
elif wait == Stream.W_SEND:
|
||||
if self._write:
|
||||
poller.fd_wait(self._write.hEvent, ovs.poller.POLLOUT)
|
||||
elif wait == Stream.W_CONNECT:
|
||||
return
|
||||
|
||||
def connect_wait(self, poller):
|
||||
self.wait(poller, Stream.W_CONNECT)
|
||||
|
||||
@@ -267,11 +489,22 @@ class Stream(object):
|
||||
self.wait(poller, Stream.W_RECV)
|
||||
|
||||
def send_wait(self, poller):
|
||||
if sys.platform == 'win32':
|
||||
poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
|
||||
self.wait(poller, Stream.W_SEND)
|
||||
|
||||
def __del__(self):
|
||||
# Don't delete the file: we might have forked.
|
||||
self.socket.close()
|
||||
if self.socket is not None:
|
||||
self.socket.close()
|
||||
if self.pipe is not None:
|
||||
# Check if there are any remaining valid handles and close them
|
||||
if self.pipe:
|
||||
winutils.close_handle(self.pipe)
|
||||
if self._read.hEvent:
|
||||
winutils.close_handle(self._read.hEvent)
|
||||
if self._write.hEvent:
|
||||
winutils.close_handle(self._write.hEvent)
|
||||
|
||||
@staticmethod
|
||||
def ssl_set_private_key_file(file_name):
|
||||
@@ -287,6 +520,10 @@ class Stream(object):
|
||||
|
||||
|
||||
class PassiveStream(object):
|
||||
# Windows only
|
||||
connect = None # overlapped for read operation
|
||||
connect_pending = False
|
||||
|
||||
@staticmethod
|
||||
def is_valid_name(name):
|
||||
"""Returns True if 'name' is a passive stream name in the form
|
||||
@@ -294,9 +531,18 @@ class PassiveStream(object):
|
||||
"punix:" or "ptcp"), otherwise False."""
|
||||
return name.startswith("punix:") | name.startswith("ptcp:")
|
||||
|
||||
def __init__(self, sock, name, bind_path):
|
||||
def __init__(self, sock, name, bind_path, pipe=None):
|
||||
self.name = name
|
||||
self.pipe = pipe
|
||||
self.socket = sock
|
||||
if pipe is not None:
|
||||
self.connect = pywintypes.OVERLAPPED()
|
||||
self.connect.hEvent = winutils.get_new_event(bManualReset=True)
|
||||
self.connect_pending = False
|
||||
suffix = name.split(":", 1)[1]
|
||||
suffix = ovs.util.abs_file_name(ovs.dirs.RUNDIR, suffix)
|
||||
self._pipename = winutils.get_pipe_name(suffix)
|
||||
|
||||
self.bind_path = bind_path
|
||||
|
||||
@staticmethod
|
||||
@@ -315,11 +561,27 @@ class PassiveStream(object):
|
||||
bind_path = name[6:]
|
||||
if name.startswith("punix:"):
|
||||
bind_path = ovs.util.abs_file_name(ovs.dirs.RUNDIR, bind_path)
|
||||
error, sock = ovs.socket_util.make_unix_socket(socket.SOCK_STREAM,
|
||||
True, bind_path,
|
||||
None)
|
||||
if error:
|
||||
return error, None
|
||||
if sys.platform != 'win32':
|
||||
error, sock = ovs.socket_util.make_unix_socket(
|
||||
socket.SOCK_STREAM, True, bind_path, None)
|
||||
if error:
|
||||
return error, None
|
||||
else:
|
||||
# Branch used only on Windows
|
||||
try:
|
||||
open(bind_path, 'w').close()
|
||||
except:
|
||||
return errno.ENOENT, None
|
||||
|
||||
pipename = winutils.get_pipe_name(bind_path)
|
||||
if len(pipename) > 255:
|
||||
# Return invalid argument if the name is too long
|
||||
return errno.ENOENT, None
|
||||
|
||||
npipe = winutils.create_named_pipe(pipename)
|
||||
if not npipe:
|
||||
return errno.ENOENT, None
|
||||
return 0, PassiveStream(None, name, bind_path, pipe=npipe)
|
||||
|
||||
elif name.startswith("ptcp:"):
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
@@ -341,7 +603,11 @@ class PassiveStream(object):
|
||||
|
||||
def close(self):
|
||||
"""Closes this PassiveStream."""
|
||||
self.socket.close()
|
||||
if self.socket is not None:
|
||||
self.socket.close()
|
||||
if self.pipe is not None:
|
||||
winutils.close_handle(self.pipe, vlog.warn)
|
||||
winutils.close_handle(self.connect.hEvent, vlog.warn)
|
||||
if self.bind_path is not None:
|
||||
ovs.fatal_signal.unlink_file_now(self.bind_path)
|
||||
self.bind_path = None
|
||||
@@ -354,28 +620,80 @@ class PassiveStream(object):
|
||||
|
||||
Will not block waiting for a connection. If no connection is ready to
|
||||
be accepted, returns (errno.EAGAIN, None) immediately."""
|
||||
|
||||
if sys.platform == 'win32' and self.socket is None:
|
||||
return self.__accept_windows()
|
||||
while True:
|
||||
try:
|
||||
sock, addr = self.socket.accept()
|
||||
ovs.socket_util.set_nonblocking(sock)
|
||||
if (sock.family == socket.AF_UNIX):
|
||||
if (sys.platform != 'win32' and sock.family == socket.AF_UNIX):
|
||||
return 0, Stream(sock, "unix:%s" % addr, 0)
|
||||
return 0, Stream(sock, 'ptcp:%s:%s' % (addr[0],
|
||||
str(addr[1])), 0)
|
||||
except socket.error as e:
|
||||
error = ovs.socket_util.get_exception_errno(e)
|
||||
if sys.platform == 'win32' and error == errno.WSAEWOULDBLOCK:
|
||||
# WSAEWOULDBLOCK would be the equivalent on Windows
|
||||
# for EAGAIN on Unix.
|
||||
error = errno.EAGAIN
|
||||
if error != errno.EAGAIN:
|
||||
# XXX rate-limit
|
||||
vlog.dbg("accept: %s" % os.strerror(error))
|
||||
return error, None
|
||||
|
||||
def __accept_windows(self):
|
||||
if self.connect_pending:
|
||||
try:
|
||||
winutils.get_overlapped_result(self.pipe, self.connect, False)
|
||||
except pywintypes.error as e:
|
||||
if e.winerror == winutils.winerror.ERROR_IO_INCOMPLETE:
|
||||
# The operation is still pending, try again
|
||||
self.connect_pending = True
|
||||
return errno.EAGAIN, None
|
||||
else:
|
||||
if self.pipe:
|
||||
win32pipe.DisconnectNamedPipe(self.pipe)
|
||||
return errno.EINVAL, None
|
||||
self.connect_pending = False
|
||||
|
||||
error = winutils.connect_named_pipe(self.pipe, self.connect)
|
||||
if error:
|
||||
if error == winutils.winerror.ERROR_IO_PENDING:
|
||||
self.connect_pending = True
|
||||
return errno.EAGAIN, None
|
||||
elif error != winutils.winerror.ERROR_PIPE_CONNECTED:
|
||||
if self.pipe:
|
||||
win32pipe.DisconnectNamedPipe(self.pipe)
|
||||
self.connect_pending = False
|
||||
return errno.EINVAL, None
|
||||
else:
|
||||
win32event.SetEvent(self.connect.hEvent)
|
||||
|
||||
npipe = winutils.create_named_pipe(self._pipename)
|
||||
if not npipe:
|
||||
return errno.ENOENT, None
|
||||
|
||||
old_pipe = self.pipe
|
||||
self.pipe = npipe
|
||||
winutils.win32event.ResetEvent(self.connect.hEvent)
|
||||
return 0, Stream(None, self.name, 0, pipe=old_pipe)
|
||||
|
||||
def wait(self, poller):
|
||||
poller.fd_wait(self.socket, ovs.poller.POLLIN)
|
||||
if sys.platform != 'win32' or self.socket is not None:
|
||||
poller.fd_wait(self.socket, ovs.poller.POLLIN)
|
||||
else:
|
||||
poller.fd_wait(self.connect.hEvent, ovs.poller.POLLIN)
|
||||
|
||||
def __del__(self):
|
||||
# Don't delete the file: we might have forked.
|
||||
self.socket.close()
|
||||
if self.socket is not None:
|
||||
self.socket.close()
|
||||
if self.pipe is not None:
|
||||
# Check if there are any remaining valid handles and close them
|
||||
if self.pipe:
|
||||
winutils.close_handle(self.pipe)
|
||||
if self._connect.hEvent:
|
||||
winutils.close_handle(self._read.hEvent)
|
||||
|
||||
|
||||
def usage(name):
|
||||
|
||||
Reference in New Issue
Block a user