diff --git a/lib/isc/netmgr/http.c b/lib/isc/netmgr/http.c index 9a1249feba..eea87aa5d8 100644 --- a/lib/isc/netmgr/http.c +++ b/lib/isc/netmgr/http.c @@ -188,6 +188,8 @@ struct isc_nm_http_session { isc__nm_http_pending_callbacks_t pending_write_callbacks; isc_buffer_t *pending_write_data; + size_t data_in_flight; + /* * The statistical values below are for usage on server-side * only. They are meant to detect clients that are taking too many @@ -230,7 +232,7 @@ typedef struct isc_http_send_req { #define HTTP_HANDLER_MAGIC ISC_MAGIC('H', 'T', 'H', 'L') #define VALID_HTTP_HANDLER(t) ISC_MAGIC_VALID(t, HTTP_HANDLER_MAGIC) -static bool +static void http_send_outgoing(isc_nm_http_session_t *session, isc_nmhandle_t *httphandle, isc_nm_cb_t cb, void *cbarg); @@ -1054,6 +1056,19 @@ client_submit_request(isc_nm_http_session_t *session, http_cstream_t *stream) { return ISC_R_SUCCESS; } +static inline size_t +http_in_flight_data_size(isc_nm_http_session_t *session) { + size_t in_flight = 0; + + if (session->pending_write_data != NULL) { + in_flight += isc_buffer_usedlength(session->pending_write_data); + } + + in_flight += session->data_in_flight; + + return in_flight; +} + static ssize_t http_process_input_data(isc_nm_http_session_t *session, isc_buffer_t *input_data) { @@ -1105,13 +1120,14 @@ http_process_input_data(isc_nm_http_session_t *session, (session->received - session->processed); /* - * If there are non completed send requests in flight -let's - * not process any incoming data, as it could lead to piling - * up too much send data in send buffers. With many clients + * If there is too much outgoing data in flight - let's not + * process any incoming data, as it could lead to piling up + * too much send data in send buffers. With many clients * connected it can lead to excessive memory consumption on * the server instance. */ - if (session->sending > 0) { + const size_t in_flight = http_in_flight_data_size(session); + if (in_flight >= ISC_NETMGR_TCP_SENDBUF_SIZE) { break; } @@ -1320,6 +1336,8 @@ http_writecb(isc_nmhandle_t *handle, isc_result_t result, void *arg) { isc_nmhandle_detach(&req->httphandle); } + session->data_in_flight -= + isc_buffer_usedlength(req->pending_write_data); isc_buffer_free(&req->pending_write_data); session->processed += req->submitted; isc_mem_put(session->mctx, req, sizeof(*req)); @@ -1349,7 +1367,23 @@ move_pending_send_callbacks(isc_nm_http_session_t *session, ISC_LIST_INIT(session->pending_write_callbacks); } -static bool +static inline void +http_append_pending_send_request(isc_nm_http_session_t *session, + isc_nmhandle_t *httphandle, isc_nm_cb_t cb, + void *cbarg) { + REQUIRE(VALID_HTTP2_SESSION(session)); + REQUIRE(VALID_NMHANDLE(httphandle)); + REQUIRE(cb != NULL); + + isc__nm_uvreq_t *newcb = isc__nm_uvreq_get(httphandle->sock); + + newcb->cb.send = cb; + newcb->cbarg = cbarg; + isc_nmhandle_attach(httphandle, &newcb->handle); + ISC_LIST_APPEND(session->pending_write_callbacks, newcb, link); +} + +static void http_send_outgoing(isc_nm_http_session_t *session, isc_nmhandle_t *httphandle, isc_nm_cb_t cb, void *cbarg) { isc_http_send_req_t *send = NULL; @@ -1360,11 +1394,26 @@ http_send_outgoing(isc_nm_http_session_t *session, isc_nmhandle_t *httphandle, size_t max_total_write_size = 0; #endif /* ENABLE_HTTP_WRITE_BUFFERING */ - if (!http_session_active(session) || - (!nghttp2_session_want_write(session->ngsession) && - session->pending_write_data == NULL)) + if (!http_session_active(session)) { + if (cb != NULL) { + isc__nm_uvreq_t *req = + isc__nm_uvreq_get(httphandle->sock); + + req->cb.send = cb; + req->cbarg = cbarg; + isc_nmhandle_attach(httphandle, &req->handle); + isc__nm_sendcb(httphandle->sock, req, ISC_R_CANCELED, + true); + } + return; + } else if (!nghttp2_session_want_write(session->ngsession) && + session->pending_write_data == NULL) { - return false; + if (cb != NULL) { + http_append_pending_send_request(session, httphandle, + cb, cbarg); + } + return; } /* @@ -1430,16 +1479,8 @@ http_send_outgoing(isc_nm_http_session_t *session, isc_nmhandle_t *httphandle, * will flush the buffer. */ if (cb != NULL) { - isc__nm_uvreq_t *newcb = NULL; - - INSIST(VALID_NMHANDLE(httphandle)); - - newcb = isc__nm_uvreq_get(httphandle->sock); - newcb->cb.send = cb; - newcb->cbarg = cbarg; - isc_nmhandle_attach(httphandle, &newcb->handle); - ISC_LIST_APPEND(session->pending_write_callbacks, newcb, - link); + http_append_pending_send_request(session, httphandle, + cb, cbarg); } goto nothing_to_send; } else if (session->sending == 0 && total == 0 && @@ -1481,6 +1522,10 @@ http_send_outgoing(isc_nm_http_session_t *session, isc_nmhandle_t *httphandle, if (total == 0) { /* No data returned */ + if (cb != NULL) { + http_append_pending_send_request(session, httphandle, + cb, cbarg); + } goto nothing_to_send; } @@ -1509,20 +1554,32 @@ http_send_outgoing(isc_nm_http_session_t *session, isc_nmhandle_t *httphandle, session->sending++; isc_buffer_usedregion(send->pending_write_data, &send_data); + session->data_in_flight += send_data.length; isc_nm_send(transphandle, &send_data, http_writecb, send); - return true; + return; nothing_to_send: isc_nmhandle_detach(&transphandle); - return false; } static inline bool http_too_many_active_streams(isc_nm_http_session_t *session) { const uint64_t active_streams = session->received - session->processed; + /* + * The motivation behind capping the maximum active streams number + * to a third of maximum streams is to allow the value to scale + * with the max number of streams. + * + * We do not want to have too many active streams at once as every + * stream is processed as a separate virtual connection by the + * higher level code. If a client sends a bulk of requests without + * waiting for the previous ones to complete we might want to + * throttle it as it might be not a friend knocking at the + * door. We already have some job to do for it. + */ const uint64_t max_active_streams = - ISC_MIN(ISC_NETMGR_MAX_STREAM_CLIENTS_PER_CONN, - session->max_concurrent_streams); + ISC_MAX(ISC_NETMGR_MAX_STREAM_CLIENTS_PER_CONN, + session->max_concurrent_streams / 3); if (session->client) { return false; @@ -1558,8 +1615,8 @@ http_do_bio(isc_nm_http_session_t *session, isc_nmhandle_t *send_httphandle, if (send_cb != NULL) { INSIST(VALID_NMHANDLE(send_httphandle)); - (void)http_send_outgoing(session, send_httphandle, send_cb, - send_cbarg); + http_send_outgoing(session, send_httphandle, send_cb, + send_cbarg); return; } @@ -1568,7 +1625,7 @@ http_do_bio(isc_nm_http_session_t *session, isc_nmhandle_t *send_httphandle, INSIST(send_cbarg == NULL); if (session->pending_write_data != NULL && session->sending == 0) { - (void)http_send_outgoing(session, NULL, NULL, NULL); + http_send_outgoing(session, NULL, NULL, NULL); return; } @@ -1623,8 +1680,7 @@ http_do_bio(isc_nm_http_session_t *session, isc_nmhandle_t *send_httphandle, */ http_do_bio_async(session); } else { - (void)http_send_outgoing(session, NULL, NULL, - NULL); + http_send_outgoing(session, NULL, NULL, NULL); } isc__nm_httpsession_detach(&tmpsess); @@ -1642,7 +1698,7 @@ http_do_bio(isc_nm_http_session_t *session, isc_nmhandle_t *send_httphandle, } /* we might have some data to send after processing */ - (void)http_send_outgoing(session, NULL, NULL, NULL); + http_send_outgoing(session, NULL, NULL, NULL); if (nghttp2_session_want_read(session->ngsession) == 0 && nghttp2_session_want_write(session->ngsession) == 0 &&