mirror of
https://github.com/vdukhovni/postfix
synced 2025-08-31 22:25:24 +00:00
postfix-2.4-20070107
This commit is contained in:
committed by
Viktor Dukhovni
parent
5c2eb772dd
commit
2adc086307
@@ -13076,6 +13076,24 @@ Apologies for any names omitted.
|
||||
that have only minor differences in dictionary open/access
|
||||
options.
|
||||
|
||||
20070105
|
||||
|
||||
Performance: pipeline of pending delivery agent connections,
|
||||
to improve Linux/Solaris mail delivery performance by another
|
||||
10% while going down-hill with the wind from behind. Design
|
||||
and implementation Victor and Wietse. Files: *qmgr/qmgr.c,
|
||||
*qmgr/qmgr.h, *qmgr/qmgr_transport.c.
|
||||
|
||||
20070106
|
||||
|
||||
Cleanup: eliminate the Linux/Solaris "wait for accept()"
|
||||
stage from the queue manager to delivery agent protocol.
|
||||
This alone achieves 99.99% of the Linux/Solaris speed up
|
||||
from the preceding change. The pending connection pipeline
|
||||
takes care of the rest. Tested on Linux kernels dating
|
||||
back to 2.0.27 (that's more than 10 years ago). Files:
|
||||
*qmgr/qmgr_transport.c.
|
||||
|
||||
Wish list:
|
||||
|
||||
Update BACKSCATTER_README to use PCRE because that's what I
|
||||
|
@@ -694,8 +694,9 @@ test -n "$first_install_reminder" && {
|
||||
Warning: you still need to edit myorigin/mydestination/mynetworks
|
||||
parameter settings in $config_directory/main.cf.
|
||||
|
||||
See also http://www.postfix.org/faq.html for information about
|
||||
dialup sites or about sites inside a firewalled network.
|
||||
See also http://www.postfix.org/STANDARD_CONFIGURATION_README.html
|
||||
for information about dialup sites or about sites inside a
|
||||
firewalled network.
|
||||
|
||||
BTW: Check your $ALIASES file and be sure to set up aliases
|
||||
that send mail for root and postmaster to a real person, then
|
||||
|
@@ -20,7 +20,7 @@
|
||||
* Patches change both the patchlevel and the release date. Snapshots have no
|
||||
* patchlevel; they change the release date only.
|
||||
*/
|
||||
#define MAIL_RELEASE_DATE "20070104"
|
||||
#define MAIL_RELEASE_DATE "20070107"
|
||||
#define MAIL_VERSION_NUMBER "2.4"
|
||||
|
||||
#ifdef SNAPSHOT
|
||||
|
@@ -330,8 +330,11 @@ int var_proc_limit;
|
||||
bool var_verp_bounce_off;
|
||||
int var_qmgr_clog_warn_time;
|
||||
|
||||
static QMGR_SCAN *qmgr_incoming;
|
||||
static QMGR_SCAN *qmgr_deferred;
|
||||
static QMGR_SCAN *qmgr_scans[2];
|
||||
|
||||
#define QMGR_SCAN_IDX_INCOMING 0
|
||||
#define QMGR_SCAN_IDX_DEFERRED 1
|
||||
#define QMGR_SCAN_IDX_COUNT (sizeof(qmgr_scans) / sizeof(qmgr_scans[0]))
|
||||
|
||||
/* qmgr_deferred_run_event - queue manager heartbeat */
|
||||
|
||||
@@ -342,7 +345,7 @@ static void qmgr_deferred_run_event(int unused_event, char *dummy)
|
||||
* This routine runs when it is time for another deferred queue scan.
|
||||
* Make sure this routine gets called again in the future.
|
||||
*/
|
||||
qmgr_scan_request(qmgr_deferred, QMGR_SCAN_START);
|
||||
qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_DEFERRED], QMGR_SCAN_START);
|
||||
event_request_timer(qmgr_deferred_run_event, dummy, var_queue_run_delay);
|
||||
}
|
||||
|
||||
@@ -402,19 +405,22 @@ static void qmgr_trigger_event(char *buf, int len,
|
||||
* requested, the request takes effect immediately.
|
||||
*/
|
||||
if (incoming_flag != 0)
|
||||
qmgr_scan_request(qmgr_incoming, incoming_flag);
|
||||
qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_INCOMING], incoming_flag);
|
||||
if (deferred_flag != 0)
|
||||
qmgr_scan_request(qmgr_deferred, deferred_flag);
|
||||
qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_DEFERRED], deferred_flag);
|
||||
}
|
||||
|
||||
/* qmgr_loop - queue manager main loop */
|
||||
|
||||
static int qmgr_loop(char *unused_name, char **unused_argv)
|
||||
{
|
||||
char *in_path = 0;
|
||||
char *df_path = 0;
|
||||
char *path;
|
||||
int token_count;
|
||||
int in_feed = 0;
|
||||
int feed = 0;
|
||||
int scan_idx; /* Priority order scan index */
|
||||
static int first_scan_idx = QMGR_SCAN_IDX_INCOMING;
|
||||
int last_scan_idx = QMGR_SCAN_IDX_COUNT - 1;
|
||||
int delay;
|
||||
|
||||
/*
|
||||
* This routine runs as part of the event handling loop, after the event
|
||||
@@ -436,17 +442,34 @@ static int qmgr_loop(char *unused_name, char **unused_argv)
|
||||
/*
|
||||
* Let some new blood into the active queue when the queue size is
|
||||
* smaller than some configurable limit, and when the number of in-core
|
||||
* recipients does not exceed some configurable limit. When the system is
|
||||
* under heavy load, favor new mail over old mail.
|
||||
* recipients does not exceed some configurable limit.
|
||||
*
|
||||
* We import one message per interrupt, to optimally tune the input count
|
||||
* for the number of delivery agent protocol wait states, as explained in
|
||||
* qmgr_transport.c.
|
||||
*/
|
||||
delay = WAIT_FOR_EVENT;
|
||||
for (scan_idx = 0; qmgr_message_count < var_qmgr_active_limit
|
||||
&& qmgr_recipient_count < var_qmgr_rcpt_limit
|
||||
&& scan_idx < QMGR_SCAN_IDX_COUNT; ++scan_idx) {
|
||||
last_scan_idx = (scan_idx + first_scan_idx) % QMGR_SCAN_IDX_COUNT;
|
||||
if ((path = qmgr_scan_next(qmgr_scans[last_scan_idx])) != 0) {
|
||||
delay = DONT_WAIT;
|
||||
if ((feed = qmgr_active_feed(qmgr_scans[last_scan_idx], path)) != 0)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Round-robin the queue scans. When the active queue becomes full,
|
||||
* prefer new mail over deferred mail.
|
||||
*/
|
||||
if (qmgr_message_count < var_qmgr_active_limit
|
||||
&& qmgr_recipient_count < var_qmgr_rcpt_limit)
|
||||
if ((in_path = qmgr_scan_next(qmgr_incoming)) != 0)
|
||||
in_feed = qmgr_active_feed(qmgr_incoming, in_path);
|
||||
if (qmgr_message_count < var_qmgr_active_limit
|
||||
&& qmgr_recipient_count < var_qmgr_rcpt_limit)
|
||||
if ((df_path = qmgr_scan_next(qmgr_deferred)) != 0)
|
||||
qmgr_active_feed(qmgr_deferred, df_path);
|
||||
&& qmgr_recipient_count < var_qmgr_rcpt_limit) {
|
||||
first_scan_idx = (last_scan_idx + 1) % QMGR_SCAN_IDX_COUNT;
|
||||
} else if (first_scan_idx != QMGR_SCAN_IDX_INCOMING) {
|
||||
first_scan_idx = QMGR_SCAN_IDX_INCOMING;
|
||||
}
|
||||
|
||||
/*
|
||||
* Global flow control. If enabled, slow down receiving processes that
|
||||
@@ -455,17 +478,15 @@ static int qmgr_loop(char *unused_name, char **unused_argv)
|
||||
if (var_in_flow_delay > 0) {
|
||||
token_count = mail_flow_count();
|
||||
if (token_count < var_proc_limit) {
|
||||
if (in_feed != 0)
|
||||
if (feed != 0 && last_scan_idx == QMGR_SCAN_IDX_INCOMING)
|
||||
mail_flow_put(1);
|
||||
else if (qmgr_incoming->handle == 0)
|
||||
else if (qmgr_scans[QMGR_SCAN_IDX_INCOMING]->handle == 0)
|
||||
mail_flow_put(var_proc_limit - token_count);
|
||||
} else if (token_count > var_proc_limit) {
|
||||
mail_flow_get(token_count - var_proc_limit);
|
||||
}
|
||||
}
|
||||
if (in_path || df_path)
|
||||
return (DONT_WAIT);
|
||||
return (WAIT_FOR_EVENT);
|
||||
return (delay);
|
||||
}
|
||||
|
||||
/* pre_accept - see if tables have changed */
|
||||
@@ -521,9 +542,9 @@ static void qmgr_post_init(char *unused_name, char **unused_argv)
|
||||
var_use_limit = 0;
|
||||
var_idle_limit = 0;
|
||||
qmgr_move(MAIL_QUEUE_ACTIVE, MAIL_QUEUE_INCOMING, event_time());
|
||||
qmgr_incoming = qmgr_scan_create(MAIL_QUEUE_INCOMING);
|
||||
qmgr_deferred = qmgr_scan_create(MAIL_QUEUE_DEFERRED);
|
||||
qmgr_scan_request(qmgr_incoming, QMGR_SCAN_START);
|
||||
qmgr_scans[QMGR_SCAN_IDX_INCOMING] = qmgr_scan_create(MAIL_QUEUE_INCOMING);
|
||||
qmgr_scans[QMGR_SCAN_IDX_DEFERRED] = qmgr_scan_create(MAIL_QUEUE_DEFERRED);
|
||||
qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_INCOMING], QMGR_SCAN_START);
|
||||
qmgr_deferred_run_event(0, (char *) 0);
|
||||
}
|
||||
|
||||
|
@@ -114,6 +114,7 @@ struct QMGR_QUEUE_LIST {
|
||||
|
||||
struct QMGR_TRANSPORT {
|
||||
int flags; /* blocked, etc. */
|
||||
int pending; /* incomplete DA connections */
|
||||
char *name; /* transport name */
|
||||
int dest_concurrency_limit; /* concurrency per domain */
|
||||
int init_dest_concurrency; /* init. per-domain concurrency */
|
||||
@@ -125,7 +126,6 @@ struct QMGR_TRANSPORT {
|
||||
};
|
||||
|
||||
#define QMGR_TRANSPORT_STAT_DEAD (1<<1)
|
||||
#define QMGR_TRANSPORT_STAT_BUSY (1<<2)
|
||||
|
||||
typedef void (*QMGR_TRANSPORT_ALLOC_NOTIFY) (QMGR_TRANSPORT *, VSTREAM *);
|
||||
extern QMGR_TRANSPORT *qmgr_transport_select(void);
|
||||
@@ -135,7 +135,7 @@ extern void qmgr_transport_unthrottle(QMGR_TRANSPORT *);
|
||||
extern QMGR_TRANSPORT *qmgr_transport_create(const char *);
|
||||
extern QMGR_TRANSPORT *qmgr_transport_find(const char *);
|
||||
|
||||
#define QMGR_TRANSPORT_THROTTLED(t) ((t)->flags & QMGR_TRANSPORT_STAT_DEAD)
|
||||
#define QMGR_TRANSPORT_THROTTLED(t) ((t)->flags & QMGR_TRANSPORT_STAT_DEAD)
|
||||
|
||||
/*
|
||||
* Each next hop (e.g., a domain name) has its own queue of pending message
|
||||
|
@@ -105,6 +105,49 @@ struct QMGR_TRANSPORT_ALLOC {
|
||||
QMGR_TRANSPORT_ALLOC_NOTIFY notify; /* application call-back routine */
|
||||
};
|
||||
|
||||
/*
|
||||
* Connections to delivery agents are managed asynchronously. Each delivery
|
||||
* agent connection goes through multiple wait states:
|
||||
*
|
||||
* - With Linux/Solaris and old queue manager implementations only, wait for
|
||||
* the server to invoke accept().
|
||||
*
|
||||
* - Wait for the delivery agent's announcement that it is ready to receive a
|
||||
* delivery request.
|
||||
*
|
||||
* - Wait for the delivery request completion status.
|
||||
*
|
||||
* Older queue manager implementations had only one pending delivery agent
|
||||
* connection per transport. With low-latency destinations, the output rates
|
||||
* were reduced on Linux/Solaris systems that had the extra wait state.
|
||||
*
|
||||
* To maximize delivery agent output rates with low-latency destinations, the
|
||||
* following changes were made to the queue manager by the end of the 2.4
|
||||
* development cycle:
|
||||
*
|
||||
* - The Linux/Solaris accept() wait state was eliminated.
|
||||
*
|
||||
* - A pipeline was implemented for pending delivery agent connections. The
|
||||
* number of pending delivery agent connections was increased from one to
|
||||
* two: the number of before-delivery wait states, plus one extra pipeline
|
||||
* slot to prevent the pipeline from stalling easily. Increasing the
|
||||
* pipeline much further actually hurt performance.
|
||||
*
|
||||
* - To reduce queue manager disk competition with delivery agents, the queue
|
||||
* scanning algorithm was modified to import only one message per interrupt.
|
||||
* The incoming and deferred queue scans now happen on alternate interrupts.
|
||||
*
|
||||
* Simplistically reasoned, a non-zero (incoming + active) queue length is
|
||||
* equivalent to a time shift for mail deliveries; this is undesirable when
|
||||
* delivery agents are not fully utilized.
|
||||
*
|
||||
* On the other hand a non-empty active queue is what allows us to do clever
|
||||
* things such as queue file prefetch, concurrency windows, and connection
|
||||
* caching; the idea is that such "thinking time" is affordable only after
|
||||
* the output channels are maxed out.
|
||||
*/
|
||||
#define QMGR_TRANSPORT_MAX_PEND 2
|
||||
|
||||
/* qmgr_transport_unthrottle_wrapper - in case (char *) != (struct *) */
|
||||
|
||||
static void qmgr_transport_unthrottle_wrapper(int unused_event, char *context)
|
||||
@@ -191,11 +234,15 @@ static void qmgr_transport_event(int unused_event, char *context)
|
||||
event_cancel_timer(qmgr_transport_abort, context);
|
||||
|
||||
/*
|
||||
* Disable further read events that end up calling this function.
|
||||
* Disable further read events that end up calling this function, turn
|
||||
* off the Linux connect() workaround, and free up this pending
|
||||
* connection pipeline slot.
|
||||
*/
|
||||
if (alloc->stream)
|
||||
if (alloc->stream) {
|
||||
event_disable_readwrite(vstream_fileno(alloc->stream));
|
||||
alloc->transport->flags &= ~QMGR_TRANSPORT_STAT_BUSY;
|
||||
non_blocking(vstream_fileno(alloc->stream), BLOCKING);
|
||||
}
|
||||
alloc->transport->pending -= 1;
|
||||
|
||||
/*
|
||||
* Notify the requestor.
|
||||
@@ -204,46 +251,34 @@ static void qmgr_transport_event(int unused_event, char *context)
|
||||
myfree((char *) alloc);
|
||||
}
|
||||
|
||||
#ifdef UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
|
||||
|
||||
/* qmgr_transport_connect - handle connection request completion */
|
||||
|
||||
static void qmgr_transport_connect(int unused_event, char *context)
|
||||
{
|
||||
QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context;
|
||||
|
||||
/*
|
||||
* This code is necessary for some versions of LINUX, where connect(2)
|
||||
* blocks until the application performs an accept(2). Reportedly, the
|
||||
* same can happen on Solaris 2.5.1.
|
||||
*/
|
||||
event_disable_readwrite(vstream_fileno(alloc->stream));
|
||||
non_blocking(vstream_fileno(alloc->stream), BLOCKING);
|
||||
event_enable_read(vstream_fileno(alloc->stream),
|
||||
qmgr_transport_event, (char *) alloc);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
/* qmgr_transport_select - select transport for allocation */
|
||||
|
||||
QMGR_TRANSPORT *qmgr_transport_select(void)
|
||||
{
|
||||
QMGR_TRANSPORT *xport;
|
||||
QMGR_QUEUE *queue;
|
||||
int need;
|
||||
|
||||
/*
|
||||
* If we find a suitable transport, rotate the list of transports to
|
||||
* effectuate round-robin selection. See similar selection code in
|
||||
* qmgr_queue_select().
|
||||
*
|
||||
* This function is called repeatedly until all transports have maxed out
|
||||
* the number of pending delivery agent connections, until all delivery
|
||||
* agent concurrency windows are maxed out, or until we run out of "todo"
|
||||
* queue entries.
|
||||
*/
|
||||
#define STAY_AWAY (QMGR_TRANSPORT_STAT_BUSY | QMGR_TRANSPORT_STAT_DEAD)
|
||||
#define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y))
|
||||
|
||||
for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) {
|
||||
if (xport->flags & STAY_AWAY)
|
||||
if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0
|
||||
|| xport->pending >= QMGR_TRANSPORT_MAX_PEND)
|
||||
continue;
|
||||
need = xport->pending + 1;
|
||||
for (queue = xport->queue_list.next; queue; queue = queue->peers.next) {
|
||||
if (queue->window > queue->busy_refcount && queue->todo.next != 0) {
|
||||
if ((need -= MIN5af51743e4eef(queue->window - queue->busy_refcount,
|
||||
queue->todo_refcount)) <= 0) {
|
||||
QMGR_LIST_ROTATE(qmgr_transport_list, xport);
|
||||
if (msg_verbose)
|
||||
msg_info("qmgr_transport_select: %s", xport->name);
|
||||
@@ -259,56 +294,46 @@ QMGR_TRANSPORT *qmgr_transport_select(void)
|
||||
void qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOTIFY notify)
|
||||
{
|
||||
QMGR_TRANSPORT_ALLOC *alloc;
|
||||
VSTREAM *stream;
|
||||
|
||||
/*
|
||||
* Sanity checks.
|
||||
*/
|
||||
if (transport->flags & QMGR_TRANSPORT_STAT_DEAD)
|
||||
msg_panic("qmgr_transport: dead transport: %s", transport->name);
|
||||
if (transport->flags & QMGR_TRANSPORT_STAT_BUSY)
|
||||
msg_panic("qmgr_transport: nested allocation: %s", transport->name);
|
||||
if (transport->pending >= QMGR_TRANSPORT_MAX_PEND)
|
||||
msg_panic("qmgr_transport: excess allocation: %s", transport->name);
|
||||
|
||||
/*
|
||||
* Connect to the well-known port for this delivery service, and wake up
|
||||
* when a process announces its availability. In the mean time, block out
|
||||
* other delivery process allocation attempts for this transport. In case
|
||||
* of problems, back off. Do not hose the system when it is in trouble
|
||||
* when a process announces its availability. Allow only a limited number
|
||||
* of delivery process allocation attempts for this transport. In case of
|
||||
* problems, back off. Do not hose the system when it is in trouble
|
||||
* already.
|
||||
*/
|
||||
#ifdef UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
|
||||
#define BLOCK_MODE NON_BLOCKING
|
||||
#define ENABLE_EVENTS event_enable_write
|
||||
#define EVENT_HANDLER qmgr_transport_connect
|
||||
#else
|
||||
#define BLOCK_MODE BLOCKING
|
||||
#define ENABLE_EVENTS event_enable_read
|
||||
#define EVENT_HANDLER qmgr_transport_event
|
||||
#endif
|
||||
|
||||
/*
|
||||
* When the connection to the delivery agent cannot be completed, notify
|
||||
* the event handler so that it can throttle the transport and defer the
|
||||
* todo queues, just like it does when communication fails *after*
|
||||
* connection completion.
|
||||
*
|
||||
* Before Postfix 2.4, the event handler was not invoked, and mail was not
|
||||
* deferred. Because of this, mail would be stuck in the active queue
|
||||
* after triggering a "connection refused" condition.
|
||||
* Use non-blocking connect(), so that Linux won't block the queue manager
|
||||
* until the delivery agent calls accept().
|
||||
*
|
||||
* When the connection to delivery agent cannot be completed, notify the
|
||||
* event handler so that it can throttle the transport and defer the todo
|
||||
* queues, just like it does when communication fails *after* connection
|
||||
* completion.
|
||||
*
|
||||
* Before Postfix 2.4, the event handler was not invoked after connect()
|
||||
* error, and mail was not deferred. Because of this, mail would be stuck
|
||||
* in the active queue after triggering a "connection refused" condition.
|
||||
*/
|
||||
if ((stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name, BLOCK_MODE)) == 0) {
|
||||
msg_warn("connect to transport %s: %m", transport->name);
|
||||
}
|
||||
alloc = (QMGR_TRANSPORT_ALLOC *) mymalloc(sizeof(*alloc));
|
||||
alloc->stream = stream;
|
||||
alloc->transport = transport;
|
||||
alloc->notify = notify;
|
||||
transport->flags |= QMGR_TRANSPORT_STAT_BUSY;
|
||||
if (alloc->stream == 0) {
|
||||
transport->pending += 1;
|
||||
if ((alloc->stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name,
|
||||
NON_BLOCKING)) == 0) {
|
||||
msg_warn("connect to transport %s: %m", transport->name);
|
||||
event_request_timer(qmgr_transport_event, (char *) alloc, 0);
|
||||
return;
|
||||
}
|
||||
ENABLE_EVENTS(vstream_fileno(alloc->stream), EVENT_HANDLER, (char *) alloc);
|
||||
event_enable_read(vstream_fileno(alloc->stream), qmgr_transport_event,
|
||||
(char *) alloc);
|
||||
|
||||
/*
|
||||
* Guard against broken systems.
|
||||
@@ -327,6 +352,7 @@ QMGR_TRANSPORT *qmgr_transport_create(const char *name)
|
||||
msg_panic("qmgr_transport_create: transport exists: %s", name);
|
||||
transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT));
|
||||
transport->flags = 0;
|
||||
transport->pending = 0;
|
||||
transport->name = mystrdup(name);
|
||||
|
||||
/*
|
||||
|
@@ -390,8 +390,11 @@ int var_proc_limit;
|
||||
bool var_verp_bounce_off;
|
||||
int var_qmgr_clog_warn_time;
|
||||
|
||||
static QMGR_SCAN *qmgr_incoming;
|
||||
static QMGR_SCAN *qmgr_deferred;
|
||||
static QMGR_SCAN *qmgr_scans[2];
|
||||
|
||||
#define QMGR_SCAN_IDX_INCOMING 0
|
||||
#define QMGR_SCAN_IDX_DEFERRED 1
|
||||
#define QMGR_SCAN_IDX_COUNT (sizeof(qmgr_scans) / sizeof(qmgr_scans[0]))
|
||||
|
||||
/* qmgr_deferred_run_event - queue manager heartbeat */
|
||||
|
||||
@@ -402,7 +405,7 @@ static void qmgr_deferred_run_event(int unused_event, char *dummy)
|
||||
* This routine runs when it is time for another deferred queue scan.
|
||||
* Make sure this routine gets called again in the future.
|
||||
*/
|
||||
qmgr_scan_request(qmgr_deferred, QMGR_SCAN_START);
|
||||
qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_DEFERRED], QMGR_SCAN_START);
|
||||
event_request_timer(qmgr_deferred_run_event, dummy, var_queue_run_delay);
|
||||
}
|
||||
|
||||
@@ -462,19 +465,22 @@ static void qmgr_trigger_event(char *buf, int len,
|
||||
* requested, the request takes effect immediately.
|
||||
*/
|
||||
if (incoming_flag != 0)
|
||||
qmgr_scan_request(qmgr_incoming, incoming_flag);
|
||||
qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_INCOMING], incoming_flag);
|
||||
if (deferred_flag != 0)
|
||||
qmgr_scan_request(qmgr_deferred, deferred_flag);
|
||||
qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_DEFERRED], deferred_flag);
|
||||
}
|
||||
|
||||
/* qmgr_loop - queue manager main loop */
|
||||
|
||||
static int qmgr_loop(char *unused_name, char **unused_argv)
|
||||
{
|
||||
char *in_path = 0;
|
||||
char *df_path = 0;
|
||||
char *path;
|
||||
int token_count;
|
||||
int in_feed = 0;
|
||||
int feed = 0;
|
||||
int scan_idx; /* Priority order scan index */
|
||||
static int first_scan_idx = QMGR_SCAN_IDX_INCOMING;
|
||||
int last_scan_idx = QMGR_SCAN_IDX_COUNT - 1;
|
||||
int delay;
|
||||
|
||||
/*
|
||||
* This routine runs as part of the event handling loop, after the event
|
||||
@@ -495,15 +501,32 @@ static int qmgr_loop(char *unused_name, char **unused_argv)
|
||||
|
||||
/*
|
||||
* Let some new blood into the active queue when the queue size is
|
||||
* smaller than some configurable limit. When the system is under heavy
|
||||
* load, favor new mail over old mail.
|
||||
* smaller than some configurable limit.
|
||||
*
|
||||
* We import one message per interrupt, to optimally tune the input count
|
||||
* for the number of delivery agent protocol wait states, as explained in
|
||||
* qmgr_transport.c.
|
||||
*/
|
||||
if (qmgr_message_count < var_qmgr_active_limit)
|
||||
if ((in_path = qmgr_scan_next(qmgr_incoming)) != 0)
|
||||
in_feed = qmgr_active_feed(qmgr_incoming, in_path);
|
||||
if (qmgr_message_count < var_qmgr_active_limit)
|
||||
if ((df_path = qmgr_scan_next(qmgr_deferred)) != 0)
|
||||
qmgr_active_feed(qmgr_deferred, df_path);
|
||||
delay = WAIT_FOR_EVENT;
|
||||
for (scan_idx = 0; qmgr_message_count < var_qmgr_active_limit
|
||||
&& scan_idx < QMGR_SCAN_IDX_COUNT; ++scan_idx) {
|
||||
last_scan_idx = (scan_idx + first_scan_idx) % QMGR_SCAN_IDX_COUNT;
|
||||
if ((path = qmgr_scan_next(qmgr_scans[last_scan_idx])) != 0) {
|
||||
delay = DONT_WAIT;
|
||||
if ((feed = qmgr_active_feed(qmgr_scans[last_scan_idx], path)) != 0)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Round-robin the queue scans. When the active queue becomes full,
|
||||
* prefer new mail over deferred mail.
|
||||
*/
|
||||
if (qmgr_message_count < var_qmgr_active_limit) {
|
||||
first_scan_idx = (last_scan_idx + 1) % QMGR_SCAN_IDX_COUNT;
|
||||
} else if (first_scan_idx != QMGR_SCAN_IDX_INCOMING) {
|
||||
first_scan_idx = QMGR_SCAN_IDX_INCOMING;
|
||||
}
|
||||
|
||||
/*
|
||||
* Global flow control. If enabled, slow down receiving processes that
|
||||
@@ -512,17 +535,15 @@ static int qmgr_loop(char *unused_name, char **unused_argv)
|
||||
if (var_in_flow_delay > 0) {
|
||||
token_count = mail_flow_count();
|
||||
if (token_count < var_proc_limit) {
|
||||
if (in_feed != 0)
|
||||
if (feed != 0 && last_scan_idx == QMGR_SCAN_IDX_INCOMING)
|
||||
mail_flow_put(1);
|
||||
else if (qmgr_incoming->handle == 0)
|
||||
else if (qmgr_scans[QMGR_SCAN_IDX_INCOMING]->handle == 0)
|
||||
mail_flow_put(var_proc_limit - token_count);
|
||||
} else if (token_count > var_proc_limit) {
|
||||
mail_flow_get(token_count - var_proc_limit);
|
||||
}
|
||||
}
|
||||
if (in_path || df_path)
|
||||
return (DONT_WAIT);
|
||||
return (WAIT_FOR_EVENT);
|
||||
return (delay);
|
||||
}
|
||||
|
||||
/* pre_accept - see if tables have changed */
|
||||
@@ -588,9 +609,9 @@ static void qmgr_post_init(char *name, char **unused_argv)
|
||||
var_use_limit = 0;
|
||||
var_idle_limit = 0;
|
||||
qmgr_move(MAIL_QUEUE_ACTIVE, MAIL_QUEUE_INCOMING, event_time());
|
||||
qmgr_incoming = qmgr_scan_create(MAIL_QUEUE_INCOMING);
|
||||
qmgr_deferred = qmgr_scan_create(MAIL_QUEUE_DEFERRED);
|
||||
qmgr_scan_request(qmgr_incoming, QMGR_SCAN_START);
|
||||
qmgr_scans[QMGR_SCAN_IDX_INCOMING] = qmgr_scan_create(MAIL_QUEUE_INCOMING);
|
||||
qmgr_scans[QMGR_SCAN_IDX_DEFERRED] = qmgr_scan_create(MAIL_QUEUE_DEFERRED);
|
||||
qmgr_scan_request(qmgr_scans[QMGR_SCAN_IDX_INCOMING], QMGR_SCAN_START);
|
||||
qmgr_deferred_run_event(0, (char *) 0);
|
||||
}
|
||||
|
||||
|
@@ -131,6 +131,7 @@ struct QMGR_JOB_LIST {
|
||||
|
||||
struct QMGR_TRANSPORT {
|
||||
int flags; /* blocked, etc. */
|
||||
int pending; /* incomplete DA connections */
|
||||
char *name; /* transport name */
|
||||
int dest_concurrency_limit; /* concurrency per domain */
|
||||
int init_dest_concurrency; /* init. per-domain concurrency */
|
||||
@@ -165,7 +166,6 @@ struct QMGR_TRANSPORT {
|
||||
};
|
||||
|
||||
#define QMGR_TRANSPORT_STAT_DEAD (1<<1)
|
||||
#define QMGR_TRANSPORT_STAT_BUSY (1<<2)
|
||||
|
||||
typedef void (*QMGR_TRANSPORT_ALLOC_NOTIFY) (QMGR_TRANSPORT *, VSTREAM *);
|
||||
extern QMGR_TRANSPORT *qmgr_transport_select(void);
|
||||
@@ -175,7 +175,7 @@ extern void qmgr_transport_unthrottle(QMGR_TRANSPORT *);
|
||||
extern QMGR_TRANSPORT *qmgr_transport_create(const char *);
|
||||
extern QMGR_TRANSPORT *qmgr_transport_find(const char *);
|
||||
|
||||
#define QMGR_TRANSPORT_THROTTLED(t) ((t)->flags & QMGR_TRANSPORT_STAT_DEAD)
|
||||
#define QMGR_TRANSPORT_THROTTLED(t) ((t)->flags & QMGR_TRANSPORT_STAT_DEAD)
|
||||
|
||||
/*
|
||||
* Each next hop (e.g., a domain name) has its own queue of pending message
|
||||
|
@@ -110,6 +110,49 @@ struct QMGR_TRANSPORT_ALLOC {
|
||||
QMGR_TRANSPORT_ALLOC_NOTIFY notify; /* application call-back routine */
|
||||
};
|
||||
|
||||
/*
|
||||
* Connections to delivery agents are managed asynchronously. Each delivery
|
||||
* agent connection goes through multiple wait states:
|
||||
*
|
||||
* - With Linux/Solaris and old queue manager implementations only, wait for
|
||||
* the server to invoke accept().
|
||||
*
|
||||
* - Wait for the delivery agent's announcement that it is ready to receive a
|
||||
* delivery request.
|
||||
*
|
||||
* - Wait for the delivery request completion status.
|
||||
*
|
||||
* Older queue manager implementations had only one pending delivery agent
|
||||
* connection per transport. With low-latency destinations, the output rates
|
||||
* were reduced on Linux/Solaris systems that had the extra wait state.
|
||||
*
|
||||
* To maximize delivery agent output rates with low-latency destinations, the
|
||||
* following changes were made to the queue manager by the end of the 2.4
|
||||
* development cycle:
|
||||
*
|
||||
* - The Linux/Solaris accept() wait state was eliminated.
|
||||
*
|
||||
* - A pipeline was implemented for pending delivery agent connections. The
|
||||
* number of pending delivery agent connections was increased from one to
|
||||
* two: the number of before-delivery wait states, plus one extra pipeline
|
||||
* slot to prevent the pipeline from stalling easily. Increasing the
|
||||
* pipeline much further actually hurt performance.
|
||||
*
|
||||
* - To reduce queue manager disk competition with delivery agents, the queue
|
||||
* scanning algorithm was modified to import only one message per interrupt.
|
||||
* The incoming and deferred queue scans now happen on alternate interrupts.
|
||||
*
|
||||
* Simplistically reasoned, a non-zero (incoming + active) queue length is
|
||||
* equivalent to a time shift for mail deliveries; this is undesirable when
|
||||
* delivery agents are not fully utilized.
|
||||
*
|
||||
* On the other hand a non-empty active queue is what allows us to do clever
|
||||
* things such as queue file prefetch, concurrency windows, and connection
|
||||
* caching; the idea is that such "thinking time" is affordable only after
|
||||
* the output channels are maxed out.
|
||||
*/
|
||||
#define QMGR_TRANSPORT_MAX_PEND 2
|
||||
|
||||
/* qmgr_transport_unthrottle_wrapper - in case (char *) != (struct *) */
|
||||
|
||||
static void qmgr_transport_unthrottle_wrapper(int unused_event, char *context)
|
||||
@@ -196,11 +239,15 @@ static void qmgr_transport_event(int unused_event, char *context)
|
||||
event_cancel_timer(qmgr_transport_abort, context);
|
||||
|
||||
/*
|
||||
* Disable further read events that end up calling this function.
|
||||
* Disable further read events that end up calling this function, turn
|
||||
* off the Linux connect() workaround, and free up this pending
|
||||
* connection pipeline slot.
|
||||
*/
|
||||
if (alloc->stream)
|
||||
if (alloc->stream) {
|
||||
event_disable_readwrite(vstream_fileno(alloc->stream));
|
||||
alloc->transport->flags &= ~QMGR_TRANSPORT_STAT_BUSY;
|
||||
non_blocking(vstream_fileno(alloc->stream), BLOCKING);
|
||||
}
|
||||
alloc->transport->pending -= 1;
|
||||
|
||||
/*
|
||||
* Notify the requestor.
|
||||
@@ -209,46 +256,34 @@ static void qmgr_transport_event(int unused_event, char *context)
|
||||
myfree((char *) alloc);
|
||||
}
|
||||
|
||||
#ifdef UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
|
||||
|
||||
/* qmgr_transport_connect - handle connection request completion */
|
||||
|
||||
static void qmgr_transport_connect(int unused_event, char *context)
|
||||
{
|
||||
QMGR_TRANSPORT_ALLOC *alloc = (QMGR_TRANSPORT_ALLOC *) context;
|
||||
|
||||
/*
|
||||
* This code is necessary for some versions of LINUX, where connect(2)
|
||||
* blocks until the application performs an accept(2). Reportedly, the
|
||||
* same can happen on Solaris 2.5.1.
|
||||
*/
|
||||
event_disable_readwrite(vstream_fileno(alloc->stream));
|
||||
non_blocking(vstream_fileno(alloc->stream), BLOCKING);
|
||||
event_enable_read(vstream_fileno(alloc->stream),
|
||||
qmgr_transport_event, (char *) alloc);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
/* qmgr_transport_select - select transport for allocation */
|
||||
|
||||
QMGR_TRANSPORT *qmgr_transport_select(void)
|
||||
{
|
||||
QMGR_TRANSPORT *xport;
|
||||
QMGR_QUEUE *queue;
|
||||
int need;
|
||||
|
||||
/*
|
||||
* If we find a suitable transport, rotate the list of transports to
|
||||
* effectuate round-robin selection. See similar selection code in
|
||||
* qmgr_peer_select().
|
||||
*
|
||||
* This function is called repeatedly until all transports have maxed out
|
||||
* the number of pending delivery agent connections, until all delivery
|
||||
* agent concurrency windows are maxed out, or until we run out of "todo"
|
||||
* queue entries.
|
||||
*/
|
||||
#define STAY_AWAY (QMGR_TRANSPORT_STAT_BUSY | QMGR_TRANSPORT_STAT_DEAD)
|
||||
#define MIN5af51743e4eef(x, y) ((x) < (y) ? (x) : (y))
|
||||
|
||||
for (xport = qmgr_transport_list.next; xport; xport = xport->peers.next) {
|
||||
if (xport->flags & STAY_AWAY)
|
||||
if ((xport->flags & QMGR_TRANSPORT_STAT_DEAD) != 0
|
||||
|| xport->pending >= QMGR_TRANSPORT_MAX_PEND)
|
||||
continue;
|
||||
need = xport->pending + 1;
|
||||
for (queue = xport->queue_list.next; queue; queue = queue->peers.next) {
|
||||
if (queue->window > queue->busy_refcount && queue->todo.next != 0) {
|
||||
if ((need -= MIN5af51743e4eef(queue->window - queue->busy_refcount,
|
||||
queue->todo_refcount)) <= 0) {
|
||||
QMGR_LIST_ROTATE(qmgr_transport_list, xport, peers);
|
||||
if (msg_verbose)
|
||||
msg_info("qmgr_transport_select: %s", xport->name);
|
||||
@@ -264,56 +299,46 @@ QMGR_TRANSPORT *qmgr_transport_select(void)
|
||||
void qmgr_transport_alloc(QMGR_TRANSPORT *transport, QMGR_TRANSPORT_ALLOC_NOTIFY notify)
|
||||
{
|
||||
QMGR_TRANSPORT_ALLOC *alloc;
|
||||
VSTREAM *stream;
|
||||
|
||||
/*
|
||||
* Sanity checks.
|
||||
*/
|
||||
if (transport->flags & QMGR_TRANSPORT_STAT_DEAD)
|
||||
msg_panic("qmgr_transport: dead transport: %s", transport->name);
|
||||
if (transport->flags & QMGR_TRANSPORT_STAT_BUSY)
|
||||
msg_panic("qmgr_transport: nested allocation: %s", transport->name);
|
||||
if (transport->pending >= QMGR_TRANSPORT_MAX_PEND)
|
||||
msg_panic("qmgr_transport: excess allocation: %s", transport->name);
|
||||
|
||||
/*
|
||||
* Connect to the well-known port for this delivery service, and wake up
|
||||
* when a process announces its availability. In the mean time, block out
|
||||
* other delivery process allocation attempts for this transport. In case
|
||||
* of problems, back off. Do not hose the system when it is in trouble
|
||||
* when a process announces its availability. Allow only a limited number
|
||||
* of delivery process allocation attempts for this transport. In case of
|
||||
* problems, back off. Do not hose the system when it is in trouble
|
||||
* already.
|
||||
*/
|
||||
#ifdef UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
|
||||
#define BLOCK_MODE NON_BLOCKING
|
||||
#define ENABLE_EVENTS event_enable_write
|
||||
#define EVENT_HANDLER qmgr_transport_connect
|
||||
#else
|
||||
#define BLOCK_MODE BLOCKING
|
||||
#define ENABLE_EVENTS event_enable_read
|
||||
#define EVENT_HANDLER qmgr_transport_event
|
||||
#endif
|
||||
|
||||
/*
|
||||
* When the connection to the delivery agent cannot be completed, notify
|
||||
* the event handler so that it can throttle the transport and defer the
|
||||
* todo queues, just like it does when communication fails *after*
|
||||
* connection completion.
|
||||
*
|
||||
* Before Postfix 2.4, the event handler was not invoked, and mail was not
|
||||
* deferred. Because of this, mail would be stuck in the active queue
|
||||
* after triggering a "connection refused" condition.
|
||||
* Use non-blocking connect(), so that Linux won't block the queue manager
|
||||
* until the delivery agent calls accept().
|
||||
*
|
||||
* When the connection to delivery agent cannot be completed, notify the
|
||||
* event handler so that it can throttle the transport and defer the todo
|
||||
* queues, just like it does when communication fails *after* connection
|
||||
* completion.
|
||||
*
|
||||
* Before Postfix 2.4, the event handler was not invoked after connect()
|
||||
* error, and mail was not deferred. Because of this, mail would be stuck
|
||||
* in the active queue after triggering a "connection refused" condition.
|
||||
*/
|
||||
if ((stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name, BLOCK_MODE)) == 0) {
|
||||
msg_warn("connect to transport %s: %m", transport->name);
|
||||
}
|
||||
alloc = (QMGR_TRANSPORT_ALLOC *) mymalloc(sizeof(*alloc));
|
||||
alloc->stream = stream;
|
||||
alloc->transport = transport;
|
||||
alloc->notify = notify;
|
||||
transport->flags |= QMGR_TRANSPORT_STAT_BUSY;
|
||||
if (alloc->stream == 0) {
|
||||
transport->pending += 1;
|
||||
if ((alloc->stream = mail_connect(MAIL_CLASS_PRIVATE, transport->name,
|
||||
NON_BLOCKING)) == 0) {
|
||||
msg_warn("connect to transport %s: %m", transport->name);
|
||||
event_request_timer(qmgr_transport_event, (char *) alloc, 0);
|
||||
return;
|
||||
}
|
||||
ENABLE_EVENTS(vstream_fileno(alloc->stream), EVENT_HANDLER, (char *) alloc);
|
||||
event_enable_read(vstream_fileno(alloc->stream), qmgr_transport_event,
|
||||
(char *) alloc);
|
||||
|
||||
/*
|
||||
* Guard against broken systems.
|
||||
@@ -332,6 +357,7 @@ QMGR_TRANSPORT *qmgr_transport_create(const char *name)
|
||||
msg_panic("qmgr_transport_create: transport exists: %s", name);
|
||||
transport = (QMGR_TRANSPORT *) mymalloc(sizeof(QMGR_TRANSPORT));
|
||||
transport->flags = 0;
|
||||
transport->pending = 0;
|
||||
transport->name = mystrdup(name);
|
||||
|
||||
/*
|
||||
@@ -364,9 +390,9 @@ QMGR_TRANSPORT *qmgr_transport_create(const char *name)
|
||||
transport->rcpt_per_stack = get_mail_conf_int2(name, _STACK_RCPT_LIMIT,
|
||||
var_stack_rcpt_limit, 0, 0);
|
||||
transport->refill_limit = get_mail_conf_int2(name, _XPORT_REFILL_LIMIT,
|
||||
var_xport_refill_limit, 1, 0);
|
||||
var_xport_refill_limit, 1, 0);
|
||||
transport->refill_delay = get_mail_conf_time2(name, _XPORT_REFILL_DELAY,
|
||||
var_xport_refill_delay, 's', 1, 0);
|
||||
var_xport_refill_delay, 's', 1, 0);
|
||||
|
||||
transport->queue_byname = htable_create(0);
|
||||
QMGR_LIST_INIT(transport->queue_list);
|
||||
|
@@ -432,7 +432,6 @@ extern int opterr;
|
||||
#define DBM_NO_TRAILING_NULL
|
||||
#define USE_STATVFS
|
||||
#define STATVFS_IN_SYS_STATVFS_H
|
||||
#define UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
|
||||
#define STRCASECMP_IN_STRINGS_H
|
||||
#define SET_H_ERRNO(err) (set_h_errno(err))
|
||||
#endif
|
||||
@@ -463,7 +462,6 @@ extern int opterr;
|
||||
#define DBM_NO_TRAILING_NULL
|
||||
#define USE_STATVFS
|
||||
#define STATVFS_IN_SYS_STATVFS_H
|
||||
#define UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
|
||||
#endif
|
||||
|
||||
/*
|
||||
@@ -679,7 +677,6 @@ extern int initgroups(const char *, int);
|
||||
#define FIONREAD_IN_TERMIOS_H
|
||||
#define USE_STATFS
|
||||
#define STATFS_IN_SYS_VFS_H
|
||||
#define UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
|
||||
#define PREPEND_PLUS_TO_OPTSTRING
|
||||
#define HAS_POSIX_REGEXP
|
||||
#define NATIVE_SENDMAIL_PATH "/usr/sbin/sendmail"
|
||||
@@ -697,7 +694,10 @@ extern int initgroups(const char *, int);
|
||||
# define _PATH_PROCNET_IFINET6 "/proc/net/if_inet6"
|
||||
#endif
|
||||
#include <linux/version.h>
|
||||
#if !defined(KERNEL_VERSION) || (LINUX_VERSION_CODE < KERNEL_VERSION(2,2,0)) \
|
||||
#if !defined(KERNEL_VERSION)
|
||||
# define KERNEL_VERSION(a,b,c) (LINUX_VERSION_CODE + 1)
|
||||
#endif
|
||||
#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,2,0)) \
|
||||
|| (__GLIBC__ < 2)
|
||||
# define CANT_USE_SEND_RECV_MSG
|
||||
# define DEF_SMTP_CACHE_DEMAND 0
|
||||
@@ -727,7 +727,6 @@ extern int initgroups(const char *, int);
|
||||
#define FIONREAD_IN_TERMIOS_H /* maybe unnecessary */
|
||||
#define USE_STATFS
|
||||
#define STATFS_IN_SYS_VFS_H
|
||||
#define UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT /* unverified */
|
||||
#define PREPEND_PLUS_TO_OPTSTRING
|
||||
#define HAS_POSIX_REGEXP
|
||||
#define NATIVE_SENDMAIL_PATH "/usr/sbin/sendmail"
|
||||
@@ -1018,7 +1017,6 @@ extern int opterr; /* XXX use <getopt.h> */
|
||||
#define DBM_NO_TRAILING_NULL
|
||||
#define USE_STATVFS
|
||||
#define STATVFS_IN_SYS_STATVFS_H
|
||||
#define UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
|
||||
#ifndef S_ISSOCK
|
||||
#define S_ISSOCK(mode) ((mode&0xF000) == 0xC000)
|
||||
#endif
|
||||
@@ -1049,7 +1047,6 @@ extern int h_errno;
|
||||
#define ROOT_PATH "/bin:/etc:/usr/bin:/tcb/bin"
|
||||
#define USE_STATVFS
|
||||
#define STATVFS_IN_SYS_STATVFS_H
|
||||
#define UNIX_DOMAIN_CONNECT_BLOCKS_FOR_ACCEPT
|
||||
#define MISSING_SETENV
|
||||
#define STRCASECMP_IN_STRINGS_H
|
||||
/* SCO5 misses just S_ISSOCK, the others are there
|
||||
|
Reference in New Issue
Block a user