diff --git a/src/bin/msgq/msgq.py.in b/src/bin/msgq/msgq.py.in index 407c408c3c..03a2403bd6 100755 --- a/src/bin/msgq/msgq.py.in +++ b/src/bin/msgq/msgq.py.in @@ -465,6 +465,15 @@ class MsgQ: sock.setblocking(1) def send_prepared_msg(self, sock, msg): + ''' + Add a message to the queue. If there's nothing waiting, try + to send it right away. + + Return if the socket is still alive. It can return false if the + socket dies (for example due to EPIPE in the attempt to send). + Returning true does not guarantee the message will be delivered, + but returning false means it won't. + ''' # Try to send the data, but only if there's nothing waiting fileno = sock.fileno() if fileno in self.sendbuffs: @@ -473,7 +482,7 @@ class MsgQ: amount_sent = self.__send_data(sock, msg) if amount_sent is None: # Socket has been killed, drop the send - return + return False # Still something to send, add it to outgoing queue if amount_sent < len(msg): @@ -483,7 +492,7 @@ class MsgQ: (last_sent, buff) = self.sendbuffs[fileno] if now - last_sent > 0.1: self.kill_socket(fileno, sock) - return + return False buff += msg else: buff = msg[amount_sent:] @@ -494,6 +503,7 @@ class MsgQ: else: self.add_kqueue_socket(sock, True) self.sendbuffs[fileno] = (last_sent, buff) + return True def __process_write(self, fileno): # Try to send some data from the buffer @@ -549,10 +559,11 @@ class MsgQ: # Don't bounce to self sockets.remove(sock) - if sockets: - for socket in sockets: - self.send_prepared_msg(socket, msg) - elif routing.get(CC_HEADER_WANT_ANSWER) and \ + has_recipient = False + for socket in sockets: + if self.send_prepared_msg(socket, msg): + has_recipient = True + if not has_recipient and routing.get(CC_HEADER_WANT_ANSWER) and \ CC_HEADER_REPLY not in routing: # We have no recipients. But the sender insists on a reply # (and the message isn't a reply itself). We need to send diff --git a/src/bin/msgq/tests/msgq_test.py b/src/bin/msgq/tests/msgq_test.py index f3ffcf97e6..e33b9862e6 100644 --- a/src/bin/msgq/tests/msgq_test.py +++ b/src/bin/msgq/tests/msgq_test.py @@ -168,14 +168,16 @@ class MsgQTest(unittest.TestCase): undeliverable notifications under the correct circumstances. """ sent_messages = [] - def fake_end_prepared_msg(socket, msg): + def fake_send_prepared_msg(socket, msg): sent_messages.append((socket, msg)) - self.__msgq.send_prepared_msg = fake_end_prepared_msg + return True + self.__msgq.send_prepared_msg = fake_send_prepared_msg # These would be real sockets in the MsgQ, but we pass them as # parameters only, so we don't need them to be. We use simple # integers to tell one from another. sender = 1 recipient = 2 + another_recipiet = 3 # The routing headers and data to test with. routing = { 'to': '*', @@ -255,6 +257,39 @@ class MsgQTest(unittest.TestCase): self.assertEqual(2, sent_messages[0][0]) # The recipient self.assertEqual((routing, data), self.parse_msg(sent_messages[0][1])) sent_messages = [] + # If an attempt to send fails, consider it no recipient. + def fail_send_prepared_msg(socket, msg): + ''' + Pretend sending a message failed. After one call, return to the + usual mock, so the errors or other messages can be sent. + ''' + self.__msgq.send_prepared_msg = fake_send_prepared_msg + self.__msgq.send_prepared_msg = fail_send_prepared_msg + self.__msgq.process_command_send(sender, routing, data) + self.assertEqual(1, len(sent_messages)) + self.assertEqual(1, sent_messages[0][0]) + self.assertEqual(({ + 'group': 'group', + 'instance': '*', + 'reply': 42, + 'seq': 42, + 'from': 'msgq', + 'to': 'sender', + 'want_answer': True + }, {'result': [-1, "No such recipient"]}), + self.parse_msg(sent_messages[0][1])) + sent_messages = [] + # But if there are more recipients and only one fails, it should + # be delivered to the other and not considered an error + self.__msgq.send_prepared_msg = fail_send_prepared_msg + routing["to"] = '*' + self.__msgq.subs.find = lambda group, instance: [recipient, + another_recipiet] + self.__msgq.process_command_send(sender, routing, data) + self.assertEqual(1, len(sent_messages)) + self.assertEqual(3, sent_messages[0][0]) # The recipient + self.assertEqual((routing, data), self.parse_msg(sent_messages[0][1])) + sent_messages = [] class DummySocket: """ @@ -360,17 +395,27 @@ class SendNonblock(unittest.TestCase): self.assertEqual(0, status, "The task did not complete successfully in time") + def get_msgq_with_sockets(self): + ''' + Create a message queue and prepare it for use with a socket pair. + The write end is put into the message queue, so we can check it. + It returns (msgq, read_end, write_end). It is expected the sockets + are closed by the caller afterwards. + ''' + msgq = MsgQ() + # We do only partial setup, so we don't create the listening socket + msgq.setup_poller() + (read, write) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) + msgq.register_socket(write) + return (msgq, read, write) + def infinite_sender(self, sender): """ Sends data until an exception happens. socket.error is caught, as it means the socket got closed. Sender is called to actually send the data. """ - msgq = MsgQ() - # We do only partial setup, so we don't create the listening socket - msgq.setup_poller() - (read, write) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) - msgq.register_socket(write) + (msgq, read, write) = self.get_msgq_with_sockets() # Keep sending while it is not closed by the msgq try: while True: @@ -406,6 +451,34 @@ class SendNonblock(unittest.TestCase): self.terminate_check(lambda: self.infinite_sender( lambda msgq, socket: msgq.send_prepared_msg(socket, data))) + def test_sendprepared_success(self): + ''' + Test the send_prepared_msg returns success when queueing messages. + It does so on the first attempt (when it actually tries to send + something to the socket) and on any attempt that follows and the + buffer is already full. + ''' + (msgq, read, write) = self.get_msgq_with_sockets() + # Now keep sending until we fill in something into the internal + # buffer. + while not write.fileno() in msgq.sendbuffs: + self.assertTrue(msgq.send_prepared_msg(write, b'data')) + read.close() + write.close() + + def test_sendprepared_epipe(self): + ''' + Test the send_prepared_msg returns false when we try to queue a + message and the other side is not there any more. It should be done + with EPIPE, so not a fatal error. + ''' + (msgq, read, write) = self.get_msgq_with_sockets() + # Close one end. It should make a EPIPE on the other. + read.close() + # Now it should soft-fail + self.assertFalse(msgq.send_prepared_msg(write, b'data')) + write.close() + def send_many(self, data): """ Tries that sending a command many times and getting an answer works. diff --git a/src/lib/util/common_defs.cc b/src/lib/util/common_defs.cc index f3b195a505..977e942626 100644 --- a/src/lib/util/common_defs.cc +++ b/src/lib/util/common_defs.cc @@ -38,7 +38,7 @@ const char* CC_COMMAND_SEND = "send"; const char* CC_TO_WILDCARD = "*"; const char* CC_INSTANCE_WILDCARD = "*"; // Reply codes -const int CC_REPLY_NO_RECPT = -1; // No recipient +const int CC_REPLY_NO_RECPT = -1; } }