Add GByteArray as queue type

This commit is contained in:
Stefan Bühler 2009-04-05 20:45:49 +02:00
parent f06a31313a
commit 2912f13047
3 changed files with 181 additions and 136 deletions

246
fastcgi.c
View File

@ -12,6 +12,11 @@
#include <unistd.h> #include <unistd.h>
#include <string.h> #include <string.h>
typedef struct fastcgi_queue_link {
GList queue_link;
enum { FASTCGI_QUEUE_STRING, FASTCGI_QUEUE_BYTEARRAY } elem_type;
} fastcgi_queue_link;
/* some util functions */ /* some util functions */
#define GSTR_LEN(x) (x) ? (x)->str : "", (x) ? (x)->len : 0 #define GSTR_LEN(x) (x) ? (x)->str : "", (x) ? (x)->len : 0
#define UNUSED(x) ((void)(x)) #define UNUSED(x) ((void)(x))
@ -32,6 +37,149 @@ static void fd_init(int fd) {
#endif #endif
} }
static fastcgi_queue_link* fastcgi_queue_link_new_string(GString *s) {
fastcgi_queue_link *l = g_slice_new0(fastcgi_queue_link);
l->queue_link.data = s;
l->elem_type = FASTCGI_QUEUE_STRING;
return l;
}
static fastcgi_queue_link* fastcgi_queue_link_new_bytearray(GByteArray *a) {
fastcgi_queue_link *l = g_slice_new0(fastcgi_queue_link);
l->queue_link.data = a;
l->elem_type = FASTCGI_QUEUE_BYTEARRAY ;
return l;
}
static void fastcgi_queue_link_free(fastcgi_queue *queue, fastcgi_queue_link *l) {
switch (l->elem_type) {
case FASTCGI_QUEUE_STRING:
if (queue) queue->length -= ((GString*)l->queue_link.data)->len;
g_string_free(l->queue_link.data, TRUE);
break;
case FASTCGI_QUEUE_BYTEARRAY:
if (queue) queue->length -= ((GByteArray*)l->queue_link.data)->len;
g_byte_array_free(l->queue_link.data, TRUE);
break;
}
g_slice_free(fastcgi_queue_link, l);
}
static fastcgi_queue_link *fastcgi_queue_peek_head(fastcgi_queue *queue) {
return (fastcgi_queue_link*) g_queue_peek_head_link(&queue->queue);
}
static fastcgi_queue_link *fastcgi_queue_pop_head(fastcgi_queue *queue) {
return (fastcgi_queue_link*) g_queue_pop_head_link(&queue->queue);
}
void fastcgi_queue_clear(fastcgi_queue *queue) {
fastcgi_queue_link *l;
queue->offset = 0;
while (NULL != (l = fastcgi_queue_pop_head(queue))) {
fastcgi_queue_link_free(queue, l);
}
g_assert(0 == queue->length);
}
void fastcgi_queue_append_string(fastcgi_queue *queue, GString *buf) {
fastcgi_queue_link *l;
if (!buf) return;
if (!buf->len) { g_string_free(buf, TRUE); return; }
l = fastcgi_queue_link_new_string(buf);
g_queue_push_tail_link(&queue->queue, (GList*) l);
queue->length += buf->len;
}
void fastcgi_queue_append_bytearray(fastcgi_queue *queue, GByteArray *buf) {
fastcgi_queue_link *l;
if (!buf) return;
if (!buf->len) { g_byte_array_free(buf, TRUE); return; }
l = fastcgi_queue_link_new_bytearray(buf);
g_queue_push_tail_link(&queue->queue, (GList*) l);
queue->length += buf->len;
}
/* return values: 0 ok, -1 error, -2 con closed */
gint fastcgi_queue_write(int fd, fastcgi_queue *queue, gsize max_write) {
gsize rem_write = max_write;
g_assert(rem_write <= G_MAXSSIZE);
#ifdef TCP_CORK
int corked = 0;
#endif
#ifdef TCP_CORK
/* Linux: put a cork into the socket as we want to combine the write() calls
* but only if we really have multiple chunks
*/
if (queue->queue.length > 1) {
corked = 1;
setsockopt(fd, IPPROTO_TCP, TCP_CORK, &corked, sizeof(corked));
}
#endif
while (rem_write > 0 && queue->length > 0) {
fastcgi_queue_link *l = fastcgi_queue_peek_head(queue);
gsize towrite, datalen;
gssize res;
gchar *data;
switch (l->elem_type) {
case FASTCGI_QUEUE_STRING:
data = ((GString*) l->queue_link.data)->str;
datalen = towrite = ((GString*) l->queue_link.data)->len;
break;
case FASTCGI_QUEUE_BYTEARRAY:
data = (gchar*) ((GByteArray*) l->queue_link.data)->data;
datalen = towrite = ((GByteArray*) l->queue_link.data)->len;
break;
default:
g_error("invalid fastcgi_queue_link type\n");
}
towrite -= queue->offset; data += queue->offset;
if (towrite > rem_write) towrite = rem_write;
res = write(fd, data, towrite);
if (-1 == res) {
#ifdef TCP_CORK
if (corked) {
corked = 0;
setsockopt(fd, IPPROTO_TCP, TCP_CORK, &corked, sizeof(corked));
}
#endif
switch (errno) {
case EINTR:
case EAGAIN:
#if EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
return 0; /* try again later */
case ECONNRESET:
case EPIPE:
return -2;
default:
ERROR("write to fd=%d failed, %s\n", fd, g_strerror(errno) );
return -1;
}
} else {
queue->offset += res;
rem_write -= res;
if (queue->offset == datalen) {
queue->offset = 0;
fastcgi_queue_link_free(queue, fastcgi_queue_pop_head(queue));
}
}
}
#ifdef TCP_CORK
if (corked) {
corked = 0;
setsockopt(fd, IPPROTO_TCP, TCP_CORK, &corked, sizeof(corked));
}
#endif
return 0;
}
static void ev_io_add_events(struct ev_loop *loop, ev_io *watcher, int events) { static void ev_io_add_events(struct ev_loop *loop, ev_io *watcher, int events) {
if ((watcher->events & events) == events) return; if ((watcher->events & events) == events) return;
ev_io_stop(loop, watcher); ev_io_stop(loop, watcher);
@ -73,39 +221,39 @@ static guint8 stream_build_fcgi_record(GString *buf, guint8 type, guint16 reques
} }
/* returns padding length */ /* returns padding length */
static guint8 stream_send_fcgi_record(fastcgi_gstring_queue *out, guint8 type, guint16 requestid, guint16 datalen) { static guint8 stream_send_fcgi_record(fastcgi_queue *out, guint8 type, guint16 requestid, guint16 datalen) {
GString *record = g_string_sized_new(FCGI_HEADER_LEN); GString *record = g_string_sized_new(FCGI_HEADER_LEN);
guint8 padlen = stream_build_fcgi_record(record, type, requestid, datalen); guint8 padlen = stream_build_fcgi_record(record, type, requestid, datalen);
fastcgi_gstring_queue_append(out, record); fastcgi_queue_append_string(out, record);
return padlen; return padlen;
} }
static void stream_send_data(fastcgi_gstring_queue *out, guint8 type, guint16 requestid, const gchar *data, size_t datalen) { static void stream_send_data(fastcgi_queue *out, guint8 type, guint16 requestid, const gchar *data, size_t datalen) {
while (datalen > 0) { while (datalen > 0) {
guint16 tosend = (datalen > G_MAXUINT16) ? G_MAXUINT16 : datalen; guint16 tosend = (datalen > G_MAXUINT16) ? G_MAXUINT16 : datalen;
guint8 padlen = stream_send_fcgi_record(out, type, requestid, tosend); guint8 padlen = stream_send_fcgi_record(out, type, requestid, tosend);
GString *tmps = g_string_sized_new(tosend + padlen); GString *tmps = g_string_sized_new(tosend + padlen);
g_string_append_len(tmps, data, tosend); g_string_append_len(tmps, data, tosend);
append_padding(tmps, padlen); append_padding(tmps, padlen);
fastcgi_gstring_queue_append(out, tmps); fastcgi_queue_append_string(out, tmps);
data += tosend; data += tosend;
datalen -= tosend; datalen -= tosend;
} }
} }
/* kills string */ /* kills string */
static void stream_send_string(fastcgi_gstring_queue *out, guint8 type, guint16 requestid, GString *data) { static void stream_send_string(fastcgi_queue *out, guint8 type, guint16 requestid, GString *data) {
if (data->len > G_MAXUINT16) { if (data->len > G_MAXUINT16) {
stream_send_data(out, type, requestid, GSTR_LEN(data)); stream_send_data(out, type, requestid, GSTR_LEN(data));
g_string_free(data, TRUE); g_string_free(data, TRUE);
} else { } else {
guint8 padlen = stream_send_fcgi_record(out, type, requestid, data->len); guint8 padlen = stream_send_fcgi_record(out, type, requestid, data->len);
append_padding(data, padlen); append_padding(data, padlen);
fastcgi_gstring_queue_append(out, data); fastcgi_queue_append_string(out, data);
} }
} }
static void stream_send_end_request(fastcgi_gstring_queue *out, guint16 requestID, gint32 appStatus, enum FCGI_ProtocolStatus status) { static void stream_send_end_request(fastcgi_queue *out, guint16 requestID, gint32 appStatus, enum FCGI_ProtocolStatus status) {
GString *record; GString *record;
record = g_string_sized_new(16); record = g_string_sized_new(16);
stream_build_fcgi_record(record, FCGI_END_REQUEST, requestID, 8); stream_build_fcgi_record(record, FCGI_END_REQUEST, requestID, 8);
@ -113,82 +261,21 @@ static void stream_send_end_request(fastcgi_gstring_queue *out, guint16 requestI
g_string_append_len(record, (const gchar*) &appStatus, sizeof(appStatus)); g_string_append_len(record, (const gchar*) &appStatus, sizeof(appStatus));
g_string_append_c(record, status); g_string_append_c(record, status);
g_string_append_len(record, __padding, 3); g_string_append_len(record, __padding, 3);
fastcgi_gstring_queue_append(out, record); fastcgi_queue_append_string(out, record);
} }
static void write_queue(fastcgi_connection *fcon) { static void write_queue(fastcgi_connection *fcon) {
const gssize max_rem_write = 256*1024;
gssize rem_write = 256*1024;
#ifdef TCP_CORK
int corked = 0;
#endif
if (fcon->closing) return; if (fcon->closing) return;
#ifdef TCP_CORK if (fastcgi_queue_write(fcon->fd, &fcon->write_queue, 256*1024) < 0) {
/* Linux: put a cork into the socket as we want to combine the write() calls
* but only if we really have multiple chunks
*/
if (fcon->write_queue->queue.length > 1) {
corked = 1;
setsockopt(fd, IPPROTO_TCP, TCP_CORK, &corked, sizeof(corked));
}
#endif
while (rem_write > 0 && fcon->write_queue.length > 0) {
GString *s = g_queue_peek_head(&fcon->write_queue.queue);
gssize towrite = s->len - fcon->write_queue.offset, res;
if (towrite > max_rem_write) towrite = max_rem_write;
res = write(fcon->fd, s->str + fcon->write_queue.offset, towrite);
if (-1 == res) {
#ifdef TCP_CORK
if (corked) {
corked = 0;
setsockopt(fd, IPPROTO_TCP, TCP_CORK, &corked, sizeof(corked));
}
#endif
switch (errno) {
case EINTR:
case EAGAIN:
#if EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
goto out; /* try again later */
case ECONNRESET:
case EPIPE:
fastcgi_connection_close(fcon);
return;
default:
ERROR("write to fd=%d failed, %s\n", fcon->fd, g_strerror(errno) );
fastcgi_connection_close(fcon); fastcgi_connection_close(fcon);
return; return;
} }
} else {
fcon->write_queue.offset += res;
rem_write -= res;
if (fcon->write_queue.offset == s->len) {
g_queue_pop_head(&fcon->write_queue.queue);
fcon->write_queue.offset = 0;
fcon->write_queue.length -= s->len;
g_string_free(s, TRUE);
}
}
}
#ifdef TCP_CORK
if (corked) {
corked = 0;
setsockopt(fd, IPPROTO_TCP, TCP_CORK, &corked, sizeof(corked));
}
#endif
out:
if (!fcon->closing && rem_write != max_rem_write) {
if (fcon->fsrv->callbacks->cb_wrote_data) { if (fcon->fsrv->callbacks->cb_wrote_data) {
fcon->fsrv->callbacks->cb_wrote_data(fcon); fcon->fsrv->callbacks->cb_wrote_data(fcon);
} }
}
if (!fcon->closing) { if (!fcon->closing) {
if (fcon->write_queue.length > 0) { if (fcon->write_queue.length > 0) {
@ -516,7 +603,7 @@ static void fastcgi_connection_free(fastcgi_connection *fcon) {
fcon->fd = -1; fcon->fd = -1;
} }
fastcgi_gstring_queue_clear(&fcon->write_queue); fastcgi_queue_clear(&fcon->write_queue);
fastcgi_connection_environ_clear(fcon); fastcgi_connection_environ_clear(fcon);
g_ptr_array_free(fcon->environ, TRUE); g_ptr_array_free(fcon->environ, TRUE);
g_string_free(fcon->buffer, TRUE); g_string_free(fcon->buffer, TRUE);
@ -533,7 +620,7 @@ void fastcgi_connection_close(fastcgi_connection *fcon) {
fcon->fd = -1; fcon->fd = -1;
} }
fastcgi_gstring_queue_clear(&fcon->write_queue); fastcgi_queue_clear(&fcon->write_queue);
g_string_truncate(fcon->buffer, 0); g_string_truncate(fcon->buffer, 0);
g_string_truncate(fcon->parambuf, 0); g_string_truncate(fcon->parambuf, 0);
@ -723,18 +810,3 @@ const gchar* fastcgi_connection_environ_lookup(fastcgi_connection *fcon, const g
} }
return NULL; return NULL;
} }
void fastcgi_gstring_queue_append(fastcgi_gstring_queue *queue, GString *buf) {
if (!buf) return;
g_queue_push_tail(&queue->queue, buf);
queue->length += buf->len;
}
void fastcgi_gstring_queue_clear(fastcgi_gstring_queue *queue) {
GString *s;
queue->length = 0;
queue->offset = 0;
while (NULL != (s = g_queue_pop_head(&queue->queue))) {
g_string_free(s, TRUE);
}
}

View File

@ -56,8 +56,8 @@ typedef struct fastcgi_callbacks fastcgi_callbacks;
struct fastcgi_connection; struct fastcgi_connection;
typedef struct fastcgi_connection fastcgi_connection; typedef struct fastcgi_connection fastcgi_connection;
struct fastcgi_gstring_queue; struct fastcgi_queue;
typedef struct fastcgi_gstring_queue fastcgi_gstring_queue; typedef struct fastcgi_queue fastcgi_queue;
struct fastcgi_server { struct fastcgi_server {
/* custom user data */ /* custom user data */
@ -88,7 +88,7 @@ struct fastcgi_callbacks {
void (*cb_reset_connection)(fastcgi_connection *fcon); /* cleanup custom data before fcon is freed, not for keep-alive */ void (*cb_reset_connection)(fastcgi_connection *fcon); /* cleanup custom data before fcon is freed, not for keep-alive */
}; };
struct fastcgi_gstring_queue { struct fastcgi_queue {
GQueue queue; GQueue queue;
gsize offset; /* offset in first chunk */ gsize offset; /* offset in first chunk */
gsize length; gsize length;
@ -135,7 +135,7 @@ struct fastcgi_connection {
gboolean read_suspended; gboolean read_suspended;
/* write queue */ /* write queue */
fastcgi_gstring_queue write_queue; fastcgi_queue write_queue;
}; };
fastcgi_server *fastcgi_server_create(struct ev_loop *loop, gint socketfd, const fastcgi_callbacks *callbacks, guint max_connections); fastcgi_server *fastcgi_server_create(struct ev_loop *loop, gint socketfd, const fastcgi_callbacks *callbacks, guint max_connections);
@ -154,7 +154,11 @@ void fastcgi_connection_close(fastcgi_connection *fcon); /* shouldn't be needed
void fastcgi_connection_environ_clear(fastcgi_connection *fcon); void fastcgi_connection_environ_clear(fastcgi_connection *fcon);
const gchar* fastcgi_connection_environ_lookup(fastcgi_connection *fcon, const gchar* key, gsize keylen); const gchar* fastcgi_connection_environ_lookup(fastcgi_connection *fcon, const gchar* key, gsize keylen);
void fastcgi_gstring_queue_append(fastcgi_gstring_queue *queue, GString *buf); void fastcgi_queue_append_string(fastcgi_queue *queue, GString *buf);
void fastcgi_gstring_queue_clear(fastcgi_gstring_queue *queue); void fastcgi_queue_append_bytearray(fastcgi_queue *queue, GByteArray *buf);
void fastcgi_queue_clear(fastcgi_queue *queue);
/* return values: 0 ok, -1 error, -2 con closed */
gint fastcgi_queue_write(int fd, fastcgi_queue *queue, gsize max_write);
#endif #endif

View File

@ -53,7 +53,7 @@ struct fcgi_cgi_child {
ev_io pipe_in_watcher, pipe_out_watcher, pipe_err_watcher; ev_io pipe_in_watcher, pipe_out_watcher, pipe_err_watcher;
/* write queue */ /* write queue */
fastcgi_gstring_queue write_queue; fastcgi_queue write_queue;
}; };
static fcgi_cgi_child* fcgi_cgi_child_create(fcgi_cgi_server *srv, fastcgi_connection *fcon); static fcgi_cgi_child* fcgi_cgi_child_create(fcgi_cgi_server *srv, fastcgi_connection *fcon);
@ -104,45 +104,13 @@ static void fcgi_cgi_child_child_cb(struct ev_loop *loop, ev_child *w, int reven
} }
static void write_queue(fcgi_cgi_child *cld) { static void write_queue(fcgi_cgi_child *cld) {
const gssize max_rem_write = 256*1024;
gssize rem_write = 256*1024;
if (-1 == cld->pipe_out) return; if (-1 == cld->pipe_out) return;
while (rem_write > 0 && cld->write_queue.length > 0) { if (fastcgi_queue_write(cld->pipe_out, &cld->write_queue, 256*1024) < 0) {
GString *s = g_queue_peek_head(&cld->write_queue.queue);
gssize towrite = s->len - cld->write_queue.offset, res;
if (towrite > max_rem_write) towrite = max_rem_write;
res = write(cld->pipe_out, s->str + cld->write_queue.offset, towrite);
if (-1 == res) {
switch (errno) {
case EINTR:
case EAGAIN:
#if EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
goto out; /* try again later */
case ECONNRESET:
case EPIPE:
fcgi_cgi_child_close_write(cld); fcgi_cgi_child_close_write(cld);
return; return;
default:
ERROR("write to fd=%d failed, %s\n", cld->pipe_out, g_strerror(errno));
fcgi_cgi_child_close_write(cld);
return;
}
} else {
cld->write_queue.offset += res;
rem_write -= res;
if (cld->write_queue.offset == s->len) {
g_queue_pop_head(&cld->write_queue.queue);
cld->write_queue.offset = 0;
cld->write_queue.length -= s->len;
g_string_free(s, TRUE);
}
}
} }
out:
if (-1 != cld->pipe_out) { if (-1 != cld->pipe_out) {
if (cld->write_queue.length > 0) { if (cld->write_queue.length > 0) {
ev_io_start(cld->srv->loop, &cld->pipe_out_watcher); ev_io_start(cld->srv->loop, &cld->pipe_out_watcher);
@ -296,7 +264,7 @@ static void fcgi_cgi_child_close_write(fcgi_cgi_child *cld) {
ev_io_stop(cld->srv->loop, &cld->pipe_out_watcher); ev_io_stop(cld->srv->loop, &cld->pipe_out_watcher);
close(cld->pipe_out); close(cld->pipe_out);
cld->pipe_out = -1; cld->pipe_out = -1;
fastcgi_gstring_queue_clear(&cld->write_queue); fastcgi_queue_clear(&cld->write_queue);
cld->write_queue.closed = TRUE; cld->write_queue.closed = TRUE;
fcgi_cgi_child_check_done(cld); fcgi_cgi_child_check_done(cld);
} }
@ -418,7 +386,7 @@ static void fcgi_cgi_direct_result(fastcgi_connection *fcon, int status) {
static void fcgi_cgi_new_request(fastcgi_connection *fcon) { static void fcgi_cgi_new_request(fastcgi_connection *fcon) {
fcgi_cgi_child *cld = (fcgi_cgi_child*) fcon->data; fcgi_cgi_child *cld = (fcgi_cgi_child*) fcon->data;
gchar *binpath; const gchar *binpath;
struct stat st; struct stat st;
if (cld) return; if (cld) return;
@ -476,7 +444,7 @@ static void fcgi_cgi_received_stdin(fastcgi_connection *fcon, GString *data) {
if (data) g_string_free(data, TRUE); if (data) g_string_free(data, TRUE);
return; return;
} }
fastcgi_gstring_queue_append(&cld->write_queue, data); fastcgi_queue_append_string(&cld->write_queue, data);
write_queue(cld); /* if we don't call this we have to check the write-queue length */ write_queue(cld); /* if we don't call this we have to check the write-queue length */
} }
@ -524,6 +492,7 @@ static void fcgi_cgi_server_free(fcgi_cgi_server* srv) {
for (i = 0; i < srv->aborted_pending_childs->len; i++) { for (i = 0; i < srv->aborted_pending_childs->len; i++) {
fcgi_cgi_child_free(g_ptr_array_index(srv->aborted_pending_childs, i)); fcgi_cgi_child_free(g_ptr_array_index(srv->aborted_pending_childs, i));
} }
g_ptr_array_free(srv->aborted_pending_childs, TRUE);
fastcgi_server_free(srv->fsrv); fastcgi_server_free(srv->fsrv);
g_slice_free(fcgi_cgi_server, srv); g_slice_free(fcgi_cgi_server, srv);
} }