From 2912f13047d92d9a2171e906d7a1735311365267 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20B=C3=BChler?= Date: Sun, 5 Apr 2009 20:45:49 +0200 Subject: [PATCH] Add GByteArray as queue type --- fastcgi.c | 254 ++++++++++++++++++++++++++++++++++------------------- fastcgi.h | 16 ++-- fcgi-cgi.c | 47 ++-------- 3 files changed, 181 insertions(+), 136 deletions(-) diff --git a/fastcgi.c b/fastcgi.c index 4ecd971..772ab61 100644 --- a/fastcgi.c +++ b/fastcgi.c @@ -12,6 +12,11 @@ #include #include +typedef struct fastcgi_queue_link { + GList queue_link; + enum { FASTCGI_QUEUE_STRING, FASTCGI_QUEUE_BYTEARRAY } elem_type; +} fastcgi_queue_link; + /* some util functions */ #define GSTR_LEN(x) (x) ? (x)->str : "", (x) ? (x)->len : 0 #define UNUSED(x) ((void)(x)) @@ -32,6 +37,149 @@ static void fd_init(int fd) { #endif } +static fastcgi_queue_link* fastcgi_queue_link_new_string(GString *s) { + fastcgi_queue_link *l = g_slice_new0(fastcgi_queue_link); + l->queue_link.data = s; + l->elem_type = FASTCGI_QUEUE_STRING; + return l; +} + +static fastcgi_queue_link* fastcgi_queue_link_new_bytearray(GByteArray *a) { + fastcgi_queue_link *l = g_slice_new0(fastcgi_queue_link); + l->queue_link.data = a; + l->elem_type = FASTCGI_QUEUE_BYTEARRAY ; + return l; +} + +static void fastcgi_queue_link_free(fastcgi_queue *queue, fastcgi_queue_link *l) { + switch (l->elem_type) { + case FASTCGI_QUEUE_STRING: + if (queue) queue->length -= ((GString*)l->queue_link.data)->len; + g_string_free(l->queue_link.data, TRUE); + break; + case FASTCGI_QUEUE_BYTEARRAY: + if (queue) queue->length -= ((GByteArray*)l->queue_link.data)->len; + g_byte_array_free(l->queue_link.data, TRUE); + break; + } + g_slice_free(fastcgi_queue_link, l); +} + +static fastcgi_queue_link *fastcgi_queue_peek_head(fastcgi_queue *queue) { + return (fastcgi_queue_link*) g_queue_peek_head_link(&queue->queue); +} + +static fastcgi_queue_link *fastcgi_queue_pop_head(fastcgi_queue *queue) { + return (fastcgi_queue_link*) g_queue_pop_head_link(&queue->queue); +} + +void fastcgi_queue_clear(fastcgi_queue *queue) { + fastcgi_queue_link *l; + queue->offset = 0; + while (NULL != (l = fastcgi_queue_pop_head(queue))) { + fastcgi_queue_link_free(queue, l); + } + g_assert(0 == queue->length); +} + +void fastcgi_queue_append_string(fastcgi_queue *queue, GString *buf) { + fastcgi_queue_link *l; + if (!buf) return; + if (!buf->len) { g_string_free(buf, TRUE); return; } + l = fastcgi_queue_link_new_string(buf); + g_queue_push_tail_link(&queue->queue, (GList*) l); + queue->length += buf->len; +} + +void fastcgi_queue_append_bytearray(fastcgi_queue *queue, GByteArray *buf) { + fastcgi_queue_link *l; + if (!buf) return; + if (!buf->len) { g_byte_array_free(buf, TRUE); return; } + l = fastcgi_queue_link_new_bytearray(buf); + g_queue_push_tail_link(&queue->queue, (GList*) l); + queue->length += buf->len; +} + +/* return values: 0 ok, -1 error, -2 con closed */ +gint fastcgi_queue_write(int fd, fastcgi_queue *queue, gsize max_write) { + gsize rem_write = max_write; + g_assert(rem_write <= G_MAXSSIZE); +#ifdef TCP_CORK + int corked = 0; +#endif + +#ifdef TCP_CORK + /* Linux: put a cork into the socket as we want to combine the write() calls + * but only if we really have multiple chunks + */ + if (queue->queue.length > 1) { + corked = 1; + setsockopt(fd, IPPROTO_TCP, TCP_CORK, &corked, sizeof(corked)); + } +#endif + + while (rem_write > 0 && queue->length > 0) { + fastcgi_queue_link *l = fastcgi_queue_peek_head(queue); + gsize towrite, datalen; + gssize res; + gchar *data; + switch (l->elem_type) { + case FASTCGI_QUEUE_STRING: + data = ((GString*) l->queue_link.data)->str; + datalen = towrite = ((GString*) l->queue_link.data)->len; + break; + case FASTCGI_QUEUE_BYTEARRAY: + data = (gchar*) ((GByteArray*) l->queue_link.data)->data; + datalen = towrite = ((GByteArray*) l->queue_link.data)->len; + break; + default: + g_error("invalid fastcgi_queue_link type\n"); + } + towrite -= queue->offset; data += queue->offset; + if (towrite > rem_write) towrite = rem_write; + res = write(fd, data, towrite); + if (-1 == res) { +#ifdef TCP_CORK + if (corked) { + corked = 0; + setsockopt(fd, IPPROTO_TCP, TCP_CORK, &corked, sizeof(corked)); + } +#endif + switch (errno) { + case EINTR: + case EAGAIN: +#if EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif + return 0; /* try again later */ + case ECONNRESET: + case EPIPE: + return -2; + default: + ERROR("write to fd=%d failed, %s\n", fd, g_strerror(errno) ); + return -1; + } + } else { + queue->offset += res; + rem_write -= res; + if (queue->offset == datalen) { + queue->offset = 0; + fastcgi_queue_link_free(queue, fastcgi_queue_pop_head(queue)); + } + } + } + +#ifdef TCP_CORK + if (corked) { + corked = 0; + setsockopt(fd, IPPROTO_TCP, TCP_CORK, &corked, sizeof(corked)); + } +#endif + + return 0; +} + + static void ev_io_add_events(struct ev_loop *loop, ev_io *watcher, int events) { if ((watcher->events & events) == events) return; ev_io_stop(loop, watcher); @@ -73,39 +221,39 @@ static guint8 stream_build_fcgi_record(GString *buf, guint8 type, guint16 reques } /* returns padding length */ -static guint8 stream_send_fcgi_record(fastcgi_gstring_queue *out, guint8 type, guint16 requestid, guint16 datalen) { +static guint8 stream_send_fcgi_record(fastcgi_queue *out, guint8 type, guint16 requestid, guint16 datalen) { GString *record = g_string_sized_new(FCGI_HEADER_LEN); guint8 padlen = stream_build_fcgi_record(record, type, requestid, datalen); - fastcgi_gstring_queue_append(out, record); + fastcgi_queue_append_string(out, record); return padlen; } -static void stream_send_data(fastcgi_gstring_queue *out, guint8 type, guint16 requestid, const gchar *data, size_t datalen) { +static void stream_send_data(fastcgi_queue *out, guint8 type, guint16 requestid, const gchar *data, size_t datalen) { while (datalen > 0) { guint16 tosend = (datalen > G_MAXUINT16) ? G_MAXUINT16 : datalen; guint8 padlen = stream_send_fcgi_record(out, type, requestid, tosend); GString *tmps = g_string_sized_new(tosend + padlen); g_string_append_len(tmps, data, tosend); append_padding(tmps, padlen); - fastcgi_gstring_queue_append(out, tmps); + fastcgi_queue_append_string(out, tmps); data += tosend; datalen -= tosend; } } /* kills string */ -static void stream_send_string(fastcgi_gstring_queue *out, guint8 type, guint16 requestid, GString *data) { +static void stream_send_string(fastcgi_queue *out, guint8 type, guint16 requestid, GString *data) { if (data->len > G_MAXUINT16) { stream_send_data(out, type, requestid, GSTR_LEN(data)); g_string_free(data, TRUE); } else { guint8 padlen = stream_send_fcgi_record(out, type, requestid, data->len); append_padding(data, padlen); - fastcgi_gstring_queue_append(out, data); + fastcgi_queue_append_string(out, data); } } -static void stream_send_end_request(fastcgi_gstring_queue *out, guint16 requestID, gint32 appStatus, enum FCGI_ProtocolStatus status) { +static void stream_send_end_request(fastcgi_queue *out, guint16 requestID, gint32 appStatus, enum FCGI_ProtocolStatus status) { GString *record; record = g_string_sized_new(16); stream_build_fcgi_record(record, FCGI_END_REQUEST, requestID, 8); @@ -113,81 +261,20 @@ static void stream_send_end_request(fastcgi_gstring_queue *out, guint16 requestI g_string_append_len(record, (const gchar*) &appStatus, sizeof(appStatus)); g_string_append_c(record, status); g_string_append_len(record, __padding, 3); - fastcgi_gstring_queue_append(out, record); + fastcgi_queue_append_string(out, record); } static void write_queue(fastcgi_connection *fcon) { - const gssize max_rem_write = 256*1024; - gssize rem_write = 256*1024; -#ifdef TCP_CORK - int corked = 0; -#endif - if (fcon->closing) return; -#ifdef TCP_CORK - /* Linux: put a cork into the socket as we want to combine the write() calls - * but only if we really have multiple chunks - */ - if (fcon->write_queue->queue.length > 1) { - corked = 1; - setsockopt(fd, IPPROTO_TCP, TCP_CORK, &corked, sizeof(corked)); - } -#endif - - while (rem_write > 0 && fcon->write_queue.length > 0) { - GString *s = g_queue_peek_head(&fcon->write_queue.queue); - gssize towrite = s->len - fcon->write_queue.offset, res; - if (towrite > max_rem_write) towrite = max_rem_write; - res = write(fcon->fd, s->str + fcon->write_queue.offset, towrite); - if (-1 == res) { -#ifdef TCP_CORK - if (corked) { - corked = 0; - setsockopt(fd, IPPROTO_TCP, TCP_CORK, &corked, sizeof(corked)); - } -#endif - switch (errno) { - case EINTR: - case EAGAIN: -#if EWOULDBLOCK != EAGAIN - case EWOULDBLOCK: -#endif - goto out; /* try again later */ - case ECONNRESET: - case EPIPE: - fastcgi_connection_close(fcon); - return; - default: - ERROR("write to fd=%d failed, %s\n", fcon->fd, g_strerror(errno) ); - fastcgi_connection_close(fcon); - return; - } - } else { - fcon->write_queue.offset += res; - rem_write -= res; - if (fcon->write_queue.offset == s->len) { - g_queue_pop_head(&fcon->write_queue.queue); - fcon->write_queue.offset = 0; - fcon->write_queue.length -= s->len; - g_string_free(s, TRUE); - } - } + if (fastcgi_queue_write(fcon->fd, &fcon->write_queue, 256*1024) < 0) { + fastcgi_connection_close(fcon); + return; } -#ifdef TCP_CORK - if (corked) { - corked = 0; - setsockopt(fd, IPPROTO_TCP, TCP_CORK, &corked, sizeof(corked)); - } -#endif - -out: - if (!fcon->closing && rem_write != max_rem_write) { - if (fcon->fsrv->callbacks->cb_wrote_data) { - fcon->fsrv->callbacks->cb_wrote_data(fcon); - } + if (fcon->fsrv->callbacks->cb_wrote_data) { + fcon->fsrv->callbacks->cb_wrote_data(fcon); } if (!fcon->closing) { @@ -516,7 +603,7 @@ static void fastcgi_connection_free(fastcgi_connection *fcon) { fcon->fd = -1; } - fastcgi_gstring_queue_clear(&fcon->write_queue); + fastcgi_queue_clear(&fcon->write_queue); fastcgi_connection_environ_clear(fcon); g_ptr_array_free(fcon->environ, TRUE); g_string_free(fcon->buffer, TRUE); @@ -533,7 +620,7 @@ void fastcgi_connection_close(fastcgi_connection *fcon) { fcon->fd = -1; } - fastcgi_gstring_queue_clear(&fcon->write_queue); + fastcgi_queue_clear(&fcon->write_queue); g_string_truncate(fcon->buffer, 0); g_string_truncate(fcon->parambuf, 0); @@ -723,18 +810,3 @@ const gchar* fastcgi_connection_environ_lookup(fastcgi_connection *fcon, const g } return NULL; } - -void fastcgi_gstring_queue_append(fastcgi_gstring_queue *queue, GString *buf) { - if (!buf) return; - g_queue_push_tail(&queue->queue, buf); - queue->length += buf->len; -} - -void fastcgi_gstring_queue_clear(fastcgi_gstring_queue *queue) { - GString *s; - queue->length = 0; - queue->offset = 0; - while (NULL != (s = g_queue_pop_head(&queue->queue))) { - g_string_free(s, TRUE); - } -} diff --git a/fastcgi.h b/fastcgi.h index 7846675..a1851bd 100644 --- a/fastcgi.h +++ b/fastcgi.h @@ -56,8 +56,8 @@ typedef struct fastcgi_callbacks fastcgi_callbacks; struct fastcgi_connection; typedef struct fastcgi_connection fastcgi_connection; -struct fastcgi_gstring_queue; -typedef struct fastcgi_gstring_queue fastcgi_gstring_queue; +struct fastcgi_queue; +typedef struct fastcgi_queue fastcgi_queue; struct fastcgi_server { /* custom user data */ @@ -88,7 +88,7 @@ struct fastcgi_callbacks { void (*cb_reset_connection)(fastcgi_connection *fcon); /* cleanup custom data before fcon is freed, not for keep-alive */ }; -struct fastcgi_gstring_queue { +struct fastcgi_queue { GQueue queue; gsize offset; /* offset in first chunk */ gsize length; @@ -135,7 +135,7 @@ struct fastcgi_connection { gboolean read_suspended; /* write queue */ - fastcgi_gstring_queue write_queue; + fastcgi_queue write_queue; }; fastcgi_server *fastcgi_server_create(struct ev_loop *loop, gint socketfd, const fastcgi_callbacks *callbacks, guint max_connections); @@ -154,7 +154,11 @@ void fastcgi_connection_close(fastcgi_connection *fcon); /* shouldn't be needed void fastcgi_connection_environ_clear(fastcgi_connection *fcon); const gchar* fastcgi_connection_environ_lookup(fastcgi_connection *fcon, const gchar* key, gsize keylen); -void fastcgi_gstring_queue_append(fastcgi_gstring_queue *queue, GString *buf); -void fastcgi_gstring_queue_clear(fastcgi_gstring_queue *queue); +void fastcgi_queue_append_string(fastcgi_queue *queue, GString *buf); +void fastcgi_queue_append_bytearray(fastcgi_queue *queue, GByteArray *buf); +void fastcgi_queue_clear(fastcgi_queue *queue); + +/* return values: 0 ok, -1 error, -2 con closed */ +gint fastcgi_queue_write(int fd, fastcgi_queue *queue, gsize max_write); #endif diff --git a/fcgi-cgi.c b/fcgi-cgi.c index 9d60f62..68b2d7e 100644 --- a/fcgi-cgi.c +++ b/fcgi-cgi.c @@ -53,7 +53,7 @@ struct fcgi_cgi_child { ev_io pipe_in_watcher, pipe_out_watcher, pipe_err_watcher; /* write queue */ - fastcgi_gstring_queue write_queue; + fastcgi_queue write_queue; }; static fcgi_cgi_child* fcgi_cgi_child_create(fcgi_cgi_server *srv, fastcgi_connection *fcon); @@ -104,45 +104,13 @@ static void fcgi_cgi_child_child_cb(struct ev_loop *loop, ev_child *w, int reven } static void write_queue(fcgi_cgi_child *cld) { - const gssize max_rem_write = 256*1024; - gssize rem_write = 256*1024; if (-1 == cld->pipe_out) return; - while (rem_write > 0 && cld->write_queue.length > 0) { - GString *s = g_queue_peek_head(&cld->write_queue.queue); - gssize towrite = s->len - cld->write_queue.offset, res; - if (towrite > max_rem_write) towrite = max_rem_write; - res = write(cld->pipe_out, s->str + cld->write_queue.offset, towrite); - if (-1 == res) { - switch (errno) { - case EINTR: - case EAGAIN: -#if EWOULDBLOCK != EAGAIN - case EWOULDBLOCK: -#endif - goto out; /* try again later */ - case ECONNRESET: - case EPIPE: - fcgi_cgi_child_close_write(cld); - return; - default: - ERROR("write to fd=%d failed, %s\n", cld->pipe_out, g_strerror(errno)); - fcgi_cgi_child_close_write(cld); - return; - } - } else { - cld->write_queue.offset += res; - rem_write -= res; - if (cld->write_queue.offset == s->len) { - g_queue_pop_head(&cld->write_queue.queue); - cld->write_queue.offset = 0; - cld->write_queue.length -= s->len; - g_string_free(s, TRUE); - } - } + if (fastcgi_queue_write(cld->pipe_out, &cld->write_queue, 256*1024) < 0) { + fcgi_cgi_child_close_write(cld); + return; } -out: if (-1 != cld->pipe_out) { if (cld->write_queue.length > 0) { ev_io_start(cld->srv->loop, &cld->pipe_out_watcher); @@ -296,7 +264,7 @@ static void fcgi_cgi_child_close_write(fcgi_cgi_child *cld) { ev_io_stop(cld->srv->loop, &cld->pipe_out_watcher); close(cld->pipe_out); cld->pipe_out = -1; - fastcgi_gstring_queue_clear(&cld->write_queue); + fastcgi_queue_clear(&cld->write_queue); cld->write_queue.closed = TRUE; fcgi_cgi_child_check_done(cld); } @@ -418,7 +386,7 @@ static void fcgi_cgi_direct_result(fastcgi_connection *fcon, int status) { static void fcgi_cgi_new_request(fastcgi_connection *fcon) { fcgi_cgi_child *cld = (fcgi_cgi_child*) fcon->data; - gchar *binpath; + const gchar *binpath; struct stat st; if (cld) return; @@ -476,7 +444,7 @@ static void fcgi_cgi_received_stdin(fastcgi_connection *fcon, GString *data) { if (data) g_string_free(data, TRUE); return; } - fastcgi_gstring_queue_append(&cld->write_queue, data); + fastcgi_queue_append_string(&cld->write_queue, data); write_queue(cld); /* if we don't call this we have to check the write-queue length */ } @@ -524,6 +492,7 @@ static void fcgi_cgi_server_free(fcgi_cgi_server* srv) { for (i = 0; i < srv->aborted_pending_childs->len; i++) { fcgi_cgi_child_free(g_ptr_array_index(srv->aborted_pending_childs, i)); } + g_ptr_array_free(srv->aborted_pending_childs, TRUE); fastcgi_server_free(srv->fsrv); g_slice_free(fcgi_cgi_server, srv); }