diff --git a/postfix/HISTORY b/postfix/HISTORY index b4785a095..3cbfb9c95 100644 --- a/postfix/HISTORY +++ b/postfix/HISTORY @@ -13076,6 +13076,24 @@ Apologies for any names omitted. that have only minor differences in dictionary open/access options. +20070105 + + Performance: pipeline of pending delivery agent connections, + to improve Linux/Solaris mail delivery performance by another + 10% while going down-hill with the wind from behind. Design + and implementation Victor and Wietse. Files: *qmgr/qmgr.c, + *qmgr/qmgr.h, *qmgr/qmgr_transport.c. + +20070106 + + Cleanup: eliminate the Linux/Solaris "wait for accept()" + stage from the queue manager to delivery agent protocol. + This alone achieves 99.99% of the Linux/Solaris speed up + from the preceding change. The pending connection pipeline + takes care of the rest. Tested on Linux kernels dating + back to 2.0.27 (that's more than 10 years ago). Files: + *qmgr/qmgr_transport.c. + Wish list: Update BACKSCATTER_README to use PCRE because that's what I diff --git a/postfix/conf/post-install b/postfix/conf/post-install index 755bb53ee..27d6ac2bd 100644 --- a/postfix/conf/post-install +++ b/postfix/conf/post-install @@ -694,8 +694,9 @@ test -n "$first_install_reminder" && { Warning: you still need to edit myorigin/mydestination/mynetworks parameter settings in $config_directory/main.cf. - See also http://www.postfix.org/faq.html for information about - dialup sites or about sites inside a firewalled network. + See also http://www.postfix.org/STANDARD_CONFIGURATION_README.html + for information about dialup sites or about sites inside a + firewalled network. BTW: Check your $ALIASES file and be sure to set up aliases that send mail for root and postmaster to a real person, then diff --git a/postfix/src/global/mail_version.h b/postfix/src/global/mail_version.h index 36f3128c6..3c967c28c 100644 --- a/postfix/src/global/mail_version.h +++ b/postfix/src/global/mail_version.h @@ -20,7 +20,7 @@ * Patches change both the patchlevel and the release date. Snapshots have no * patchlevel; they change the release date only. */ -#define MAIL_RELEASE_DATE "20070104" +#define MAIL_RELEASE_DATE "20070107" #define MAIL_VERSION_NUMBER "2.4" #ifdef SNAPSHOT diff --git a/postfix/src/oqmgr/qmgr.c b/postfix/src/oqmgr/qmgr.c index 927f1be2f..a61ad4064 100644 --- a/postfix/src/oqmgr/qmgr.c +++ b/postfix/src/oqmgr/qmgr.c @@ -330,8 +330,11 @@ int var_proc_limit; bool var_verp_bounce_off; int var_qmgr_clog_warn_time; -static QMGR_SCAN *qmgr_incoming; -static QMGR_SCAN *qmgr_deferred; +static QMGR_SCAN *qmgr_scans[2]; + +#define QMGR_SCAN_IDX_INCOMING 0 +#define QMGR_SCAN_IDX_DEFERRED 1 +#define QMGR_SCAN_IDX_COUNT (sizeof(qmgr_scans) / sizeof(qmgr_scans[0])) /* qmgr_deferred_run_event - queue manager heartbeat */ @@ -342,7 +345,7 @@ static void qmgr_deferred_run_event(int unused_event, char *dummy) * This routine runs when it is time for another deferred queue scan. * Make sure this routine gets called again in the future. */ - qmgr_scan_request(qmgr_deferred, QMGR_SCAN_START); + qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_DEFERRED], QMGR_SCAN_START); event_request_timer(qmgr_deferred_run_event, dummy, var_queue_run_delay); } @@ -402,19 +405,22 @@ static void qmgr_trigger_event(char *buf, int len, * requested, the request takes effect immediately. */ if (incoming_flag != 0) - qmgr_scan_request(qmgr_incoming, incoming_flag); + qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_INCOMING], incoming_flag); if (deferred_flag != 0) - qmgr_scan_request(qmgr_deferred, deferred_flag); + qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_DEFERRED], deferred_flag); } /* qmgr_loop - queue manager main loop */ static int qmgr_loop(char *unused_name, char **unused_argv) { - char *in_path = 0; - char *df_path = 0; + char *path; int token_count; - int in_feed = 0; + int feed = 0; + int scan_idx; /* Priority order scan index */ + static int first_scan_idx = QMGR_SCAN_IDX_INCOMING; + int last_scan_idx = QMGR_SCAN_IDX_COUNT - 1; + int delay; /* * This routine runs as part of the event handling loop, after the event @@ -436,17 +442,34 @@ static int qmgr_loop(char *unused_name, char **unused_argv) /* * Let some new blood into the active queue when the queue size is * smaller than some configurable limit, and when the number of in-core - * recipients does not exceed some configurable limit. When the system is - * under heavy load, favor new mail over old mail. + * recipients does not exceed some configurable limit. + * + * We import one message per interrupt, to optimally tune the input count + * for the number of delivery agent protocol wait states, as explained in + * qmgr_transport.c. + */ + delay = WAIT_FOR_EVENT; + for (scan_idx = 0; qmgr_message_count < var_qmgr_active_limit + && qmgr_recipient_count < var_qmgr_rcpt_limit + && scan_idx < QMGR_SCAN_IDX_COUNT; ++scan_idx) { + last_scan_idx = (scan_idx + first_scan_idx) % QMGR_SCAN_IDX_COUNT; + if ((path = qmgr_scan_next(qmgr_scans[last_scan_idx])) != 0) { + delay = DONT_WAIT; + if ((feed = qmgr_active_feed(qmgr_scans[last_scan_idx], path)) != 0) + break; + } + } + + /* + * Round-robin the queue scans. When the active queue becomes full, + * prefer new mail over deferred mail. */ if (qmgr_message_count < var_qmgr_active_limit - && qmgr_recipient_count < var_qmgr_rcpt_limit) - if ((in_path = qmgr_scan_next(qmgr_incoming)) != 0) - in_feed = qmgr_active_feed(qmgr_incoming, in_path); - if (qmgr_message_count < var_qmgr_active_limit - && qmgr_recipient_count < var_qmgr_rcpt_limit) - if ((df_path = qmgr_scan_next(qmgr_deferred)) != 0) - qmgr_active_feed(qmgr_deferred, df_path); + && qmgr_recipient_count < var_qmgr_rcpt_limit) { + first_scan_idx = (last_scan_idx + 1) % QMGR_SCAN_IDX_COUNT; + } else if (first_scan_idx != QMGR_SCAN_IDX_INCOMING) { + first_scan_idx = QMGR_SCAN_IDX_INCOMING; + } /* * Global flow control. If enabled, slow down receiving processes that @@ -455,17 +478,15 @@ static int qmgr_loop(char *unused_name, char **unused_argv) if (var_in_flow_delay > 0) { token_count = mail_flow_count(); if (token_count < var_proc_limit) { - if (in_feed != 0) + if (feed != 0 && last_scan_idx == QMGR_SCAN_IDX_INCOMING) mail_flow_put(1); - else if (qmgr_incoming->handle == 0) + else if (qmgr_scans[QMGR_SCAN_IDX_INCOMING]->handle == 0) mail_flow_put(var_proc_limit - token_count); } else if (token_count > var_proc_limit) { mail_flow_get(token_count - var_proc_limit); } } - if (in_path || df_path) - return (DONT_WAIT); - return (WAIT_FOR_EVENT); + return (delay); } /* pre_accept - see if tables have changed */ @@ -521,9 +542,9 @@ static void qmgr_post_init(char *unused_name, char **unused_argv) var_use_limit = 0; var_idle_limit = 0; qmgr_move(MAIL_QUEUE_ACTIVE, MAIL_QUEUE_INCOMING, event_time()); - qmgr_incoming = qmgr_scan_create(MAIL_QUEUE_INCOMING); - qmgr_deferred = qmgr_scan_create(MAIL_QUEUE_DEFERRED); - qmgr_scan_request(qmgr_incoming, QMGR_SCAN_START); + qmgr_scans[QMGR_SCAN_IDX_INCOMING] = qmgr_scan_create(MAIL_QUEUE_INCOMING); + qmgr_scans[QMGR_SCAN_IDX_DEFERRED] = qmgr_scan_create(MAIL_QUEUE_DEFERRED); + qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_INCOMING], QMGR_SCAN_START); qmgr_deferred_run_event(0, (char *) 0); } diff --git a/postfix/src/oqmgr/qmgr.h b/postfix/src/oqmgr/qmgr.h index c28430c00..73744c5d8 100644 --- a/postfix/src/oqmgr/qmgr.h +++ b/postfix/src/oqmgr/qmgr.h @@ -114,6 +114,7 @@ struct QMGR_QUEUE_LIST { struct QMGR_TRANSPORT { int flags; /* blocked, etc. */ + int pending; /* incomplete DA connections */ char *name; /* transport name */ int dest_concurrency_limit; /* concurrency per domain */ int init_dest_concurrency; /* init. per-domain concurrency */ @@ -125,7 +126,6 @@ struct QMGR_TRANSPORT { }; #define QMGR_TRANSPORT_STAT_DEAD (1<<1) -#define QMGR_TRANSPORT_STAT_BUSY (1<<2) typedef void (*QMGR_TRANSPORT_ALLOC_NOTIFY) (QMGR_TRANSPORT *, VSTREAM *); extern QMGR_TRANSPORT *qmgr_transport_select(void); @@ -135,7 +135,7 @@ extern void qmgr_transport_unthrottle(QMGR_TRANSPORT *); extern QMGR_TRANSPORT *qmgr_transport_create(const char *); extern QMGR_TRANSPORT *qmgr_transport_find(const char *); -#define QMGR_TRANSPORT_THROTTLED(t) ((t)->flags & QMGR_TRANSPORT_STAT_DEAD) +#define QMGR_TRANSPORT_THROTTLED(t) ((t)->flags & QMGR_TRANSPORT_STAT_DEAD) /* * Each next hop (e.g., a domain name) has its own queue of pending message diff --git a/postfix/src/oqmgr/qmgr_transport.c b/postfix/src/oqmgr/qmgr_transport.c index 65ae935b3..55a192e09 100644 --- a/postfix/src/oqmgr/qmgr_transport.c +++ b/postfix/src/oqmgr/qmgr_transport.c @@ -105,6 +105,49 @@ struct QMGR_TRANSPORT_ALLOC { QMGR_TRANSPORT_ALLOC_NOTIFY notify; /* application call-back routine */ }; + /* + * Connections to delivery agents are managed asynchronously. Each delivery + * agent connection goes through multiple wait states: + * + * - With Linux/Solaris and old queue manager implementations only, wait for + * the server to invoke accept(). + * + * - Wait for the delivery agent's announcement that it is ready to receive a + * delivery request. + * + * - Wait for the delivery request completion status. + * + * Older queue manager implementations had only one pending delivery agent + * connection per transport. With low-latency destinations, the output rates + * were reduced on Linux/Solaris systems that had the extra wait state. + * + * To maximize delivery agent output rates with low-latency destinations, the + * following changes were made to the queue manager by the end of the 2.4 + * development cycle: + * + * - The Linux/Solaris accept() wait state was eliminated. + * + * - A pipeline was implemented for pending delivery agent connections. The + * number of pending delivery agent connections was increased from one to + * two: the number of before-delivery wait states, plus one extra pipeline + * slot to prevent the pipeline from stalling easily. Increasing the + * pipeline much further actually hurt performance. + * + * - To reduce queue manager disk competition with delivery agents, the queue + * scanning algorithm was modified to import only one message per interrupt. + * The incoming and deferred queue scans now happen on alternate interrupts. + * + * Simplistically reasoned, a non-zero (incoming + active) queue length is + * equivalent to a time shift for mail deliveries; this is undesirable when + * delivery agents are not fully utilized. + * + * On the other hand a non-empty active queue is what allows us to do clever + * things such as queue file prefetch, concurrency windows, and connection + * caching; the idea is that such "thinking time" is affordable only after + * the output channels are maxed out. + */ +#define QMGR_TRANSPORT_MAX_PEND 2 + /* qmgr_transport_unthrottle_wrapper - in case (char *) != (struct *) */ static void qmgr_transport_unthrottle_wrapper(int unused_event, char *context) @@ -191,11 +234,15 @@ static void qmgr_transport_event(int unused_event, char *context) event_cancel_timer(qmgr_transport_abort, context); /* - * Disable further read events that end up calling this function. + * Disable further read events that end up calling this function, turn + * off the Linux connect() workaround, and free up this pending + * connection pipeline slot. */ - if (alloc->stream) + if (alloc->stream) { event_disable_readwrite(vstream_fileno(alloc->stream)); - alloc->transport->flags &= ~QMGR_TRANSPORT_STAT_BUSY; + non_blocking(vstream_fileno(alloc->stream), BLOCKING); + } + alloc->transport->pending -= 1; /* * Notify the requestor. @@ -204,46 +251,34 @@ static void qmgr_transport_event(int unused_event, char *context) myfree((char *) alloc); } -#ifdef UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT - -/* qmgr_transport_connect - handle connection request completion */ - -static void qmgr_transport_connect(int unused_event, char *context) -{ - QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context; - - /* - * This code is necessary for some versions of LINUX, where connect(2) - * blocks until the application performs an accept(2). Reportedly, the - * same can happen on Solaris 2.5.1. - */ - event_disable_readwrite(vstream_fileno(alloc->stream)); - non_blocking(vstream_fileno(alloc->stream), BLOCKING); - event_enable_read(vstream_fileno(alloc->stream), - qmgr_transport_event, (char *) alloc); -} - -#endif - /* qmgr_transport_select - select transport for allocation */ QMGR_TRANSPORT *qmgr_transport_select(void) { QMGR_TRANSPORT *xport; QMGR_QUEUE *queue; + int need; /* * If we find a suitable transport, rotate the list of transports to * effectuate round-robin selection. See similar selection code in * qmgr_queue_select(). + * + * This function is called repeatedly until all transports have maxed out + * the number of pending delivery agent connections, until all delivery + * agent concurrency windows are maxed out, or until we run out of "todo" + * queue entries. */ -#define STAY_AWAY (QMGR_TRANSPORT_STAT_BUSY | QMGR_TRANSPORT_STAT_DEAD) +#define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y)) for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) { - if (xport->flags & STAY_AWAY) + if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0 + || xport->pending >= QMGR_TRANSPORT_MAX_PEND) continue; + need = xport->pending + 1; for (queue = xport->queue_list.next; queue; queue = queue->peers.next) { - if (queue->window > queue->busy_refcount && queue->todo.next != 0) { + if ((need -= MIN5af51743e4eef(queue->window - queue->busy_refcount, + queue->todo_refcount)) <= 0) { QMGR_LIST_ROTATE(qmgr_transport_list, xport); if (msg_verbose) msg_info("qmgr_transport_select: %s", xport->name); @@ -259,56 +294,46 @@ QMGR_TRANSPORT *qmgr_transport_select(void) void qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOTIFY notify) { QMGR_TRANSPORT_ALLOC *alloc; - VSTREAM *stream; /* * Sanity checks. */ if (transport->flags & QMGR_TRANSPORT_STAT_DEAD) msg_panic("qmgr_transport: dead transport: %s", transport->name); - if (transport->flags & QMGR_TRANSPORT_STAT_BUSY) - msg_panic("qmgr_transport: nested allocation: %s", transport->name); + if (transport->pending >= QMGR_TRANSPORT_MAX_PEND) + msg_panic("qmgr_transport: excess allocation: %s", transport->name); /* * Connect to the well-known port for this delivery service, and wake up - * when a process announces its availability. In the mean time, block out - * other delivery process allocation attempts for this transport. In case - * of problems, back off. Do not hose the system when it is in trouble + * when a process announces its availability. Allow only a limited number + * of delivery process allocation attempts for this transport. In case of + * problems, back off. Do not hose the system when it is in trouble * already. - */ -#ifdef UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT -#define BLOCK_MODE NON_BLOCKING -#define ENABLE_EVENTS event_enable_write -#define EVENT_HANDLER qmgr_transport_connect -#else -#define BLOCK_MODE BLOCKING -#define ENABLE_EVENTS event_enable_read -#define EVENT_HANDLER qmgr_transport_event -#endif - - /* - * When the connection to the delivery agent cannot be completed, notify - * the event handler so that it can throttle the transport and defer the - * todo queues, just like it does when communication fails *after* - * connection completion. * - * Before Postfix 2.4, the event handler was not invoked, and mail was not - * deferred. Because of this, mail would be stuck in the active queue - * after triggering a "connection refused" condition. + * Use non-blocking connect(), so that Linux won't block the queue manager + * until the delivery agent calls accept(). + * + * When the connection to delivery agent cannot be completed, notify the + * event handler so that it can throttle the transport and defer the todo + * queues, just like it does when communication fails *after* connection + * completion. + * + * Before Postfix 2.4, the event handler was not invoked after connect() + * error, and mail was not deferred. Because of this, mail would be stuck + * in the active queue after triggering a "connection refused" condition. */ - if ((stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name, BLOCK_MODE)) == 0) { - msg_warn("connect to transport %s: %m", transport->name); - } alloc = (QMGR_TRANSPORT_ALLOC *) mymalloc(sizeof(*alloc)); - alloc->stream = stream; alloc->transport = transport; alloc->notify = notify; - transport->flags |= QMGR_TRANSPORT_STAT_BUSY; - if (alloc->stream == 0) { + transport->pending += 1; + if ((alloc->stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name, + NON_BLOCKING)) == 0) { + msg_warn("connect to transport %s: %m", transport->name); event_request_timer(qmgr_transport_event, (char *) alloc, 0); return; } - ENABLE_EVENTS(vstream_fileno(alloc->stream), EVENT_HANDLER, (char *) alloc); + event_enable_read(vstream_fileno(alloc->stream), qmgr_transport_event, + (char *) alloc); /* * Guard against broken systems. @@ -327,6 +352,7 @@ QMGR_TRANSPORT *qmgr_transport_create(const char *name) msg_panic("qmgr_transport_create: transport exists: %s", name); transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT)); transport->flags = 0; + transport->pending = 0; transport->name = mystrdup(name); /* diff --git a/postfix/src/qmgr/qmgr.c b/postfix/src/qmgr/qmgr.c index 4641f4cb8..8a2349784 100644 --- a/postfix/src/qmgr/qmgr.c +++ b/postfix/src/qmgr/qmgr.c @@ -390,8 +390,11 @@ int var_proc_limit; bool var_verp_bounce_off; int var_qmgr_clog_warn_time; -static QMGR_SCAN *qmgr_incoming; -static QMGR_SCAN *qmgr_deferred; +static QMGR_SCAN *qmgr_scans[2]; + +#define QMGR_SCAN_IDX_INCOMING 0 +#define QMGR_SCAN_IDX_DEFERRED 1 +#define QMGR_SCAN_IDX_COUNT (sizeof(qmgr_scans) / sizeof(qmgr_scans[0])) /* qmgr_deferred_run_event - queue manager heartbeat */ @@ -402,7 +405,7 @@ static void qmgr_deferred_run_event(int unused_event, char *dummy) * This routine runs when it is time for another deferred queue scan. * Make sure this routine gets called again in the future. */ - qmgr_scan_request(qmgr_deferred, QMGR_SCAN_START); + qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_DEFERRED], QMGR_SCAN_START); event_request_timer(qmgr_deferred_run_event, dummy, var_queue_run_delay); } @@ -462,19 +465,22 @@ static void qmgr_trigger_event(char *buf, int len, * requested, the request takes effect immediately. */ if (incoming_flag != 0) - qmgr_scan_request(qmgr_incoming, incoming_flag); + qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_INCOMING], incoming_flag); if (deferred_flag != 0) - qmgr_scan_request(qmgr_deferred, deferred_flag); + qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_DEFERRED], deferred_flag); } /* qmgr_loop - queue manager main loop */ static int qmgr_loop(char *unused_name, char **unused_argv) { - char *in_path = 0; - char *df_path = 0; + char *path; int token_count; - int in_feed = 0; + int feed = 0; + int scan_idx; /* Priority order scan index */ + static int first_scan_idx = QMGR_SCAN_IDX_INCOMING; + int last_scan_idx = QMGR_SCAN_IDX_COUNT - 1; + int delay; /* * This routine runs as part of the event handling loop, after the event @@ -495,15 +501,32 @@ static int qmgr_loop(char *unused_name, char **unused_argv) /* * Let some new blood into the active queue when the queue size is - * smaller than some configurable limit. When the system is under heavy - * load, favor new mail over old mail. + * smaller than some configurable limit. + * + * We import one message per interrupt, to optimally tune the input count + * for the number of delivery agent protocol wait states, as explained in + * qmgr_transport.c. */ - if (qmgr_message_count < var_qmgr_active_limit) - if ((in_path = qmgr_scan_next(qmgr_incoming)) != 0) - in_feed = qmgr_active_feed(qmgr_incoming, in_path); - if (qmgr_message_count < var_qmgr_active_limit) - if ((df_path = qmgr_scan_next(qmgr_deferred)) != 0) - qmgr_active_feed(qmgr_deferred, df_path); + delay = WAIT_FOR_EVENT; + for (scan_idx = 0; qmgr_message_count < var_qmgr_active_limit + && scan_idx < QMGR_SCAN_IDX_COUNT; ++scan_idx) { + last_scan_idx = (scan_idx + first_scan_idx) % QMGR_SCAN_IDX_COUNT; + if ((path = qmgr_scan_next(qmgr_scans[last_scan_idx])) != 0) { + delay = DONT_WAIT; + if ((feed = qmgr_active_feed(qmgr_scans[last_scan_idx], path)) != 0) + break; + } + } + + /* + * Round-robin the queue scans. When the active queue becomes full, + * prefer new mail over deferred mail. + */ + if (qmgr_message_count < var_qmgr_active_limit) { + first_scan_idx = (last_scan_idx + 1) % QMGR_SCAN_IDX_COUNT; + } else if (first_scan_idx != QMGR_SCAN_IDX_INCOMING) { + first_scan_idx = QMGR_SCAN_IDX_INCOMING; + } /* * Global flow control. If enabled, slow down receiving processes that @@ -512,17 +535,15 @@ static int qmgr_loop(char *unused_name, char **unused_argv) if (var_in_flow_delay > 0) { token_count = mail_flow_count(); if (token_count < var_proc_limit) { - if (in_feed != 0) + if (feed != 0 && last_scan_idx == QMGR_SCAN_IDX_INCOMING) mail_flow_put(1); - else if (qmgr_incoming->handle == 0) + else if (qmgr_scans[QMGR_SCAN_IDX_INCOMING]->handle == 0) mail_flow_put(var_proc_limit - token_count); } else if (token_count > var_proc_limit) { mail_flow_get(token_count - var_proc_limit); } } - if (in_path || df_path) - return (DONT_WAIT); - return (WAIT_FOR_EVENT); + return (delay); } /* pre_accept - see if tables have changed */ @@ -588,9 +609,9 @@ static void qmgr_post_init(char *name, char **unused_argv) var_use_limit = 0; var_idle_limit = 0; qmgr_move(MAIL_QUEUE_ACTIVE, MAIL_QUEUE_INCOMING, event_time()); - qmgr_incoming = qmgr_scan_create(MAIL_QUEUE_INCOMING); - qmgr_deferred = qmgr_scan_create(MAIL_QUEUE_DEFERRED); - qmgr_scan_request(qmgr_incoming, QMGR_SCAN_START); + qmgr_scans[QMGR_SCAN_IDX_INCOMING] = qmgr_scan_create(MAIL_QUEUE_INCOMING); + qmgr_scans[QMGR_SCAN_IDX_DEFERRED] = qmgr_scan_create(MAIL_QUEUE_DEFERRED); + qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_INCOMING], QMGR_SCAN_START); qmgr_deferred_run_event(0, (char *) 0); } diff --git a/postfix/src/qmgr/qmgr.h b/postfix/src/qmgr/qmgr.h index d791a9ef0..5582ef81a 100644 --- a/postfix/src/qmgr/qmgr.h +++ b/postfix/src/qmgr/qmgr.h @@ -131,6 +131,7 @@ struct QMGR_JOB_LIST { struct QMGR_TRANSPORT { int flags; /* blocked, etc. */ + int pending; /* incomplete DA connections */ char *name; /* transport name */ int dest_concurrency_limit; /* concurrency per domain */ int init_dest_concurrency; /* init. per-domain concurrency */ @@ -165,7 +166,6 @@ struct QMGR_TRANSPORT { }; #define QMGR_TRANSPORT_STAT_DEAD (1<<1) -#define QMGR_TRANSPORT_STAT_BUSY (1<<2) typedef void (*QMGR_TRANSPORT_ALLOC_NOTIFY) (QMGR_TRANSPORT *, VSTREAM *); extern QMGR_TRANSPORT *qmgr_transport_select(void); @@ -175,7 +175,7 @@ extern void qmgr_transport_unthrottle(QMGR_TRANSPORT *); extern QMGR_TRANSPORT *qmgr_transport_create(const char *); extern QMGR_TRANSPORT *qmgr_transport_find(const char *); -#define QMGR_TRANSPORT_THROTTLED(t) ((t)->flags & QMGR_TRANSPORT_STAT_DEAD) +#define QMGR_TRANSPORT_THROTTLED(t) ((t)->flags & QMGR_TRANSPORT_STAT_DEAD) /* * Each next hop (e.g., a domain name) has its own queue of pending message diff --git a/postfix/src/qmgr/qmgr_transport.c b/postfix/src/qmgr/qmgr_transport.c index d7e54f116..aa67a1c87 100644 --- a/postfix/src/qmgr/qmgr_transport.c +++ b/postfix/src/qmgr/qmgr_transport.c @@ -110,6 +110,49 @@ struct QMGR_TRANSPORT_ALLOC { QMGR_TRANSPORT_ALLOC_NOTIFY notify; /* application call-back routine */ }; + /* + * Connections to delivery agents are managed asynchronously. Each delivery + * agent connection goes through multiple wait states: + * + * - With Linux/Solaris and old queue manager implementations only, wait for + * the server to invoke accept(). + * + * - Wait for the delivery agent's announcement that it is ready to receive a + * delivery request. + * + * - Wait for the delivery request completion status. + * + * Older queue manager implementations had only one pending delivery agent + * connection per transport. With low-latency destinations, the output rates + * were reduced on Linux/Solaris systems that had the extra wait state. + * + * To maximize delivery agent output rates with low-latency destinations, the + * following changes were made to the queue manager by the end of the 2.4 + * development cycle: + * + * - The Linux/Solaris accept() wait state was eliminated. + * + * - A pipeline was implemented for pending delivery agent connections. The + * number of pending delivery agent connections was increased from one to + * two: the number of before-delivery wait states, plus one extra pipeline + * slot to prevent the pipeline from stalling easily. Increasing the + * pipeline much further actually hurt performance. + * + * - To reduce queue manager disk competition with delivery agents, the queue + * scanning algorithm was modified to import only one message per interrupt. + * The incoming and deferred queue scans now happen on alternate interrupts. + * + * Simplistically reasoned, a non-zero (incoming + active) queue length is + * equivalent to a time shift for mail deliveries; this is undesirable when + * delivery agents are not fully utilized. + * + * On the other hand a non-empty active queue is what allows us to do clever + * things such as queue file prefetch, concurrency windows, and connection + * caching; the idea is that such "thinking time" is affordable only after + * the output channels are maxed out. + */ +#define QMGR_TRANSPORT_MAX_PEND 2 + /* qmgr_transport_unthrottle_wrapper - in case (char *) != (struct *) */ static void qmgr_transport_unthrottle_wrapper(int unused_event, char *context) @@ -196,11 +239,15 @@ static void qmgr_transport_event(int unused_event, char *context) event_cancel_timer(qmgr_transport_abort, context); /* - * Disable further read events that end up calling this function. + * Disable further read events that end up calling this function, turn + * off the Linux connect() workaround, and free up this pending + * connection pipeline slot. */ - if (alloc->stream) + if (alloc->stream) { event_disable_readwrite(vstream_fileno(alloc->stream)); - alloc->transport->flags &= ~QMGR_TRANSPORT_STAT_BUSY; + non_blocking(vstream_fileno(alloc->stream), BLOCKING); + } + alloc->transport->pending -= 1; /* * Notify the requestor. @@ -209,46 +256,34 @@ static void qmgr_transport_event(int unused_event, char *context) myfree((char *) alloc); } -#ifdef UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT - -/* qmgr_transport_connect - handle connection request completion */ - -static void qmgr_transport_connect(int unused_event, char *context) -{ - QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context; - - /* - * This code is necessary for some versions of LINUX, where connect(2) - * blocks until the application performs an accept(2). Reportedly, the - * same can happen on Solaris 2.5.1. - */ - event_disable_readwrite(vstream_fileno(alloc->stream)); - non_blocking(vstream_fileno(alloc->stream), BLOCKING); - event_enable_read(vstream_fileno(alloc->stream), - qmgr_transport_event, (char *) alloc); -} - -#endif - /* qmgr_transport_select - select transport for allocation */ QMGR_TRANSPORT *qmgr_transport_select(void) { QMGR_TRANSPORT *xport; QMGR_QUEUE *queue; + int need; /* * If we find a suitable transport, rotate the list of transports to * effectuate round-robin selection. See similar selection code in * qmgr_peer_select(). + * + * This function is called repeatedly until all transports have maxed out + * the number of pending delivery agent connections, until all delivery + * agent concurrency windows are maxed out, or until we run out of "todo" + * queue entries. */ -#define STAY_AWAY (QMGR_TRANSPORT_STAT_BUSY | QMGR_TRANSPORT_STAT_DEAD) +#define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y)) for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) { - if (xport->flags & STAY_AWAY) + if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0 + || xport->pending >= QMGR_TRANSPORT_MAX_PEND) continue; + need = xport->pending + 1; for (queue = xport->queue_list.next; queue; queue = queue->peers.next) { - if (queue->window > queue->busy_refcount && queue->todo.next != 0) { + if ((need -= MIN5af51743e4eef(queue->window - queue->busy_refcount, + queue->todo_refcount)) <= 0) { QMGR_LIST_ROTATE(qmgr_transport_list, xport, peers); if (msg_verbose) msg_info("qmgr_transport_select: %s", xport->name); @@ -264,56 +299,46 @@ QMGR_TRANSPORT *qmgr_transport_select(void) void qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOTIFY notify) { QMGR_TRANSPORT_ALLOC *alloc; - VSTREAM *stream; /* * Sanity checks. */ if (transport->flags & QMGR_TRANSPORT_STAT_DEAD) msg_panic("qmgr_transport: dead transport: %s", transport->name); - if (transport->flags & QMGR_TRANSPORT_STAT_BUSY) - msg_panic("qmgr_transport: nested allocation: %s", transport->name); + if (transport->pending >= QMGR_TRANSPORT_MAX_PEND) + msg_panic("qmgr_transport: excess allocation: %s", transport->name); /* * Connect to the well-known port for this delivery service, and wake up - * when a process announces its availability. In the mean time, block out - * other delivery process allocation attempts for this transport. In case - * of problems, back off. Do not hose the system when it is in trouble + * when a process announces its availability. Allow only a limited number + * of delivery process allocation attempts for this transport. In case of + * problems, back off. Do not hose the system when it is in trouble * already. - */ -#ifdef UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT -#define BLOCK_MODE NON_BLOCKING -#define ENABLE_EVENTS event_enable_write -#define EVENT_HANDLER qmgr_transport_connect -#else -#define BLOCK_MODE BLOCKING -#define ENABLE_EVENTS event_enable_read -#define EVENT_HANDLER qmgr_transport_event -#endif - - /* - * When the connection to the delivery agent cannot be completed, notify - * the event handler so that it can throttle the transport and defer the - * todo queues, just like it does when communication fails *after* - * connection completion. * - * Before Postfix 2.4, the event handler was not invoked, and mail was not - * deferred. Because of this, mail would be stuck in the active queue - * after triggering a "connection refused" condition. + * Use non-blocking connect(), so that Linux won't block the queue manager + * until the delivery agent calls accept(). + * + * When the connection to delivery agent cannot be completed, notify the + * event handler so that it can throttle the transport and defer the todo + * queues, just like it does when communication fails *after* connection + * completion. + * + * Before Postfix 2.4, the event handler was not invoked after connect() + * error, and mail was not deferred. Because of this, mail would be stuck + * in the active queue after triggering a "connection refused" condition. */ - if ((stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name, BLOCK_MODE)) == 0) { - msg_warn("connect to transport %s: %m", transport->name); - } alloc = (QMGR_TRANSPORT_ALLOC *) mymalloc(sizeof(*alloc)); - alloc->stream = stream; alloc->transport = transport; alloc->notify = notify; - transport->flags |= QMGR_TRANSPORT_STAT_BUSY; - if (alloc->stream == 0) { + transport->pending += 1; + if ((alloc->stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name, + NON_BLOCKING)) == 0) { + msg_warn("connect to transport %s: %m", transport->name); event_request_timer(qmgr_transport_event, (char *) alloc, 0); return; } - ENABLE_EVENTS(vstream_fileno(alloc->stream), EVENT_HANDLER, (char *) alloc); + event_enable_read(vstream_fileno(alloc->stream), qmgr_transport_event, + (char *) alloc); /* * Guard against broken systems. @@ -332,6 +357,7 @@ QMGR_TRANSPORT *qmgr_transport_create(const char *name) msg_panic("qmgr_transport_create: transport exists: %s", name); transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT)); transport->flags = 0; + transport->pending = 0; transport->name = mystrdup(name); /* @@ -364,9 +390,9 @@ QMGR_TRANSPORT *qmgr_transport_create(const char *name) transport->rcpt_per_stack = get_mail_conf_int2(name, _STACK_RCPT_LIMIT, var_stack_rcpt_limit, 0, 0); transport->refill_limit = get_mail_conf_int2(name, _XPORT_REFILL_LIMIT, - var_xport_refill_limit, 1, 0); + var_xport_refill_limit, 1, 0); transport->refill_delay = get_mail_conf_time2(name, _XPORT_REFILL_DELAY, - var_xport_refill_delay, 's', 1, 0); + var_xport_refill_delay, 's', 1, 0); transport->queue_byname = htable_create(0); QMGR_LIST_INIT(transport->queue_list); diff --git a/postfix/src/util/sys_defs.h b/postfix/src/util/sys_defs.h index 25c0b40fb..c8ca62672 100644 --- a/postfix/src/util/sys_defs.h +++ b/postfix/src/util/sys_defs.h @@ -432,7 +432,6 @@ extern int opterr; #define DBM_NO_TRAILING_NULL #define USE_STATVFS #define STATVFS_IN_SYS_STATVFS_H -#define UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT #define STRCASECMP_IN_STRINGS_H #define SET_H_ERRNO(err) (set_h_errno(err)) #endif @@ -463,7 +462,6 @@ extern int opterr; #define DBM_NO_TRAILING_NULL #define USE_STATVFS #define STATVFS_IN_SYS_STATVFS_H -#define UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT #endif /* @@ -679,7 +677,6 @@ extern int initgroups(const char *, int); #define FIONREAD_IN_TERMIOS_H #define USE_STATFS #define STATFS_IN_SYS_VFS_H -#define UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT #define PREPEND_PLUS_TO_OPTSTRING #define HAS_POSIX_REGEXP #define NATIVE_SENDMAIL_PATH "/usr/sbin/sendmail" @@ -697,7 +694,10 @@ extern int initgroups(const char *, int); # define _PATH_PROCNET_IFINET6 "/proc/net/if_inet6" #endif #include -#if !defined(KERNEL_VERSION) || (LINUX_VERSION_CODE < KERNEL_VERSION(2,2,0)) \ +#if !defined(KERNEL_VERSION) +# define KERNEL_VERSION(a,b,c) (LINUX_VERSION_CODE + 1) +#endif +#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,2,0)) \ || (__GLIBC__ < 2) # define CANT_USE_SEND_RECV_MSG # define DEF_SMTP_CACHE_DEMAND 0 @@ -727,7 +727,6 @@ extern int initgroups(const char *, int); #define FIONREAD_IN_TERMIOS_H /* maybe unnecessary */ #define USE_STATFS #define STATFS_IN_SYS_VFS_H -#define UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT /* unverified */ #define PREPEND_PLUS_TO_OPTSTRING #define HAS_POSIX_REGEXP #define NATIVE_SENDMAIL_PATH "/usr/sbin/sendmail" @@ -1018,7 +1017,6 @@ extern int opterr; /* XXX use */ #define DBM_NO_TRAILING_NULL #define USE_STATVFS #define STATVFS_IN_SYS_STATVFS_H -#define UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT #ifndef S_ISSOCK #define S_ISSOCK(mode) ((mode&0xF000) == 0xC000) #endif @@ -1049,7 +1047,6 @@ extern int h_errno; #define ROOT_PATH "/bin:/etc:/usr/bin:/tcb/bin" #define USE_STATVFS #define STATVFS_IN_SYS_STATVFS_H -#define UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT #define MISSING_SETENV #define STRCASECMP_IN_STRINGS_H /* SCO5 misses just S_ISSOCK, the others are there