From d6b2e72612a076eb87db188d69d265af37d57550 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20B=C3=BChler?= Date: Thu, 5 Aug 2010 14:42:59 +0200 Subject: [PATCH] Initial commit --- src/Makefile | 8 ++ src/io/base.d | 243 ++++++++++++++++++++++++++++++++++++++++++++++ src/io/base_c.c | 45 +++++++++ src/io/base_c.o | Bin 0 -> 1752 bytes src/io/events.d | 177 +++++++++++++++++++++++++++++++++ src/io/events_c.c | 50 ++++++++++ src/io/events_c.o | Bin 0 -> 2136 bytes 7 files changed, 523 insertions(+) create mode 100644 src/Makefile create mode 100644 src/io/base.d create mode 100644 src/io/base_c.c create mode 100644 src/io/base_c.o create mode 100644 src/io/events.d create mode 100644 src/io/events_c.c create mode 100644 src/io/events_c.o diff --git a/src/Makefile b/src/Makefile new file mode 100644 index 0000000..8ca56e1 --- /dev/null +++ b/src/Makefile @@ -0,0 +1,8 @@ + +LDC=ldc + +override DFLAGS+=-O2 +override CFLAGS+=-O2 + +delight: io/base.d io/base_c.o io/events.d io/events_c.o + gdc $(DFLAGS) -o "$@" $^ -lev diff --git a/src/io/base.d b/src/io/base.d new file mode 100644 index 0000000..161ac1f --- /dev/null +++ b/src/io/base.d @@ -0,0 +1,243 @@ + +module io.base; + +import std.socket; + +import io.events; + +R delegate(T) makeDelegate(R, T)(R function(T) f) { + struct delegateClass { + R function(T) m_f; + + R callback(T t) { return m_f(t); } + } + + delegateClass* o = new delegateClass; + o.m_f = f; + return &o.callback; +} + +private extern (C) int iobase_write(int fd, void *buf, int len); +private extern (C) int iobase_read(int fd, void *buf, int len); + +class BufferedSocket { + private Socket m_socket; + private IOWatcher m_watcher; + private void delegate(void[] buf) m_readcb; + private void delegate() m_eofcb, m_closecb; + + private bool m_closed, m_eof, m_sendeof; + + private class chunk { + chunk next; + void[] buf; + this(void[] b) { buf = b.dup; } + } + private chunk m_cfirst, m_clast; + + final void delegate(void[] buf) readcb() { return m_readcb; } + final void delegate(void[] buf) readcb(void delegate(void[] buf) r) { + m_readcb = r; + if (m_readcb is null || m_eof || m_closed) { + m_watcher.events = m_watcher.events & ~Event.READ; + } else { + m_watcher.events = m_watcher.events | Event.READ; + } + return m_readcb; + } + + final void delegate() eofcb() { return m_eofcb; } + final void delegate() eofcb(void delegate() f) { return m_eofcb = f; } + + final void delegate() closecb() { return m_closecb; } + final void delegate() closecb(void delegate() f) { return m_closecb = f; } + + this(Loop loop, Socket sock, void delegate(void[] buf) readcb = null) { + m_socket = sock; + m_readcb = readcb; + + m_watcher = new IOWatcher(); + m_watcher.socket = m_socket; + m_watcher.events = Event.READ; + m_watcher.read = &this.handleread; + m_watcher.write = &this.handlewrite; + m_watcher.start(loop); + } + + ~this() { + m_cfirst = m_clast = null; + shutdown(); + delete m_watcher; + delete m_socket; + } + + void write(void[] buf) { + if (m_closed || m_sendeof || 0 == buf.length) return; + chunk c = new chunk(buf); + if (m_clast !is null) { + m_clast.next = c; + m_clast = c; + } else { + m_cfirst = m_clast = c; + } + m_watcher.events = m_watcher.events | Event.WRITE; + } + + void shutdown() { + if (m_closed || m_sendeof) return; + m_sendeof = true; + if (m_cfirst is null) { + m_socket.shutdown(SocketShutdown.SEND); + } + } + + /* may destroy object */ + private final void handleeof() { + m_eof = true; + m_watcher.events = m_watcher.events & ~Event.READ; + if (m_eofcb !is null) { + m_eofcb(); + } else { + delete this; + } + } + + /* may destroy object */ + private final void handleclose() { + m_closed = true; + m_watcher.events = Event.NONE; + if (m_closecb !is null) { + m_closecb(); + } else { + delete this; + } + } + + private final void handleread() { + if (m_readcb is null) { + m_watcher.events = m_watcher.events & ~Event.READ; + return; + } + ubyte buf[4096]; + int res = iobase_read(m_socket.handle, buf.ptr, buf.length); + if (res == 0) { + handleeof(); + } else if (res > 0) { + m_readcb(buf[0 .. res]); + } else if (res != -1) { + handleclose(); + } + } + private final void handlewrite() { + while (m_cfirst !is null) { + chunk c = m_cfirst; + int res = iobase_write(m_socket.handle, c.buf.ptr, c.buf.length); + if (res > 0) { + if (res == c.buf.length) { + m_cfirst = c.next; + } else { + c.buf = c.buf[res .. $]; + return; + } + } else if (res == -1) { + return; + } else { + handleclose(); + return; + } + } + if (m_sendeof) { + m_socket.shutdown(SocketShutdown.SEND); + } + m_clast = null; + m_watcher.events = m_watcher.events & ~Event.WRITE; + } +} + +class TcpServer { + private Socket m_socket; + private IOWatcher m_watcher; + private void delegate(Socket con) m_accept_cb; + + public final Socket socket() { return m_socket; } + + this(Loop loop, Address bind, void delegate(Socket con) accept) { + m_accept_cb = accept; + + m_socket = new TcpSocket(bind.addressFamily); + m_socket.blocking = false; + m_socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1); +// m_socket.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, 1); + m_socket.bind(bind); + m_socket.listen(1024); + + m_watcher = new IOWatcher(); + m_watcher.socket = m_socket; + m_watcher.events = Event.READ; + m_watcher.read = &this.acceptcb; + m_watcher.start(loop); + } + + private final void acceptcb() { + Socket con = m_socket.accept(); + if (con !is null) m_accept_cb(con); + } +} + +class EchoConnection { + protected int m_idx = -1; + protected EchoServer m_srv; + protected Socket m_socket; + protected BufferedSocket m_bufsock; + + this(Socket s) { + m_socket = s; + m_bufsock = new BufferedSocket(Loop.defaultloop, s, &this.handleread); + } + + ~this() { + if (m_srv !is null) m_srv.delCon(this); + } + + private final void handleread(void[] buf) { + m_bufsock.write(buf); + } +}; + +class EchoServer { + private EchoConnection m_con[]; + private Loop m_loop; + private TcpServer m_server; + + this(Loop loop, Address address) { + m_loop = loop; + m_server = new TcpServer(m_loop, address, &this.accept); + m_con = new EchoConnection[0]; + } + + protected final void addCon(EchoConnection con) { + con.m_srv = this; + con.m_idx = m_con.length; + m_con.length = m_con.length + 1; + m_con[con.m_idx] = con; + } + + protected final void delCon(EchoConnection con) { + if (con.m_srv !is this) return; + m_con[con.m_idx] = m_con[m_con.length - 1]; + m_con.length = m_con.length - 1; + con.m_srv = null; + con.m_idx = -1; + } + + private final void accept(Socket s) { + auto con = new EchoConnection(s); + addCon(con); + } +}; + +void main(string args[]) { + Loop loop = Loop.defaultloop; + EchoServer srv = new EchoServer(loop, new InternetAddress(8080)); + Loop.defaultloop.run(); +} diff --git a/src/io/base_c.c b/src/io/base_c.c new file mode 100644 index 0000000..56f5c8e --- /dev/null +++ b/src/io/base_c.c @@ -0,0 +1,45 @@ + +#include +#include + +/* -1: not now (try later), -2: con closed, -3: unexpected error */ + +int iobase_write(int fd, const void* buf, int len) { + int res = write(fd, buf, len); + if (res >= 0) return res; + switch (errno) { + case EAGAIN: +#if EAGAIN != EWOULDBLOCK + case EWOULDBLOCK: +#endif + case EINTR: + return -1; + break; + case ECONNRESET: + case EPIPE: + case ETIMEDOUT: + return -2; + default: + return -3; + } +} + +/* 0: eof, -1: not now (try later), -2: con closed, -3: unexpected error */ +int iobase_read(int fd, void* buf, int len) { + int res = read(fd, buf, len); + if (res >= 0) return res; + switch (errno) { + case EAGAIN: +#if EAGAIN != EWOULDBLOCK + case EWOULDBLOCK: +#endif + case EINTR: + return -1; + break; + case ECONNRESET: + case ETIMEDOUT: + return -2; + default: + return -3; + } +} diff --git a/src/io/base_c.o b/src/io/base_c.o new file mode 100644 index 0000000000000000000000000000000000000000..5af6341269c180c7b4954c61e4e637975ef37a15 GIT binary patch literal 1752 zcmbu9L2DC16oB70somOOqKFEuWWA`RcBxQMd(c3m4I&DH2fZYl?UV$YUD=Eny%bqJ zdGTN9$)n^V2nymwioZgL7jHSJ@x9qMI*fZL`oipc@0&O8&AiF(&gE-YrW^;fIB*gE zc$xz28nC8t4N5Qzjr6PA2tR)z^z`k(ZKUtrcjYRSt4~yvns^53cut-wS8K3SNykOG zF#Iu@OvoR7O2-{JosOgXqv214qc=l*PDZb{%GEmTJ-;WHRQ{use+lbl>!qhkjXq!* znm0?S`meHbFKD+L>+5UY>AGllgUDO;@wa?#F*DR*@Nt+8=h01Y1{G(%G~+(U-c*n; z@Z25J^#c`RE6lpg!0$io$)Jr|CYkPND-qj)FU3HDzZu8?{C2+&ei-+9B0?6$QurI! zZ!Py_5Z(n}g^NyeD+zjn%lxGnz7wPSFHxH#lL7{r#w^j?4Zr z)+d=4*w?CL{Qoo~ZPz-3jB@_#`m?x}O6hw;Q;DvBTm#m6TUxIn*R~nDmuVO*{nPj=8g@@;?j`qMpNslGgF~;^{XdAz?j0hy$MtWMV~Bk$^6kA-_%3hi zJTI6-Mz!twv&i!5yS)EPIB$ww-<&gTiYvmmiOIBG$h|SzN8V0|=XIDkZ&D{_T~M92 zAi4#Z{tME*95VK!z9qz}Y zunFqXY>Fg_;^uZ72C^GRI^sdnl>)No?|Y=Xr#t*kA90iuXp<19vuWNTh(f%CL-WT3 UQS5k6(TvlamQ^;0(P**%3tsugWB>pF literal 0 HcmV?d00001 diff --git a/src/io/events.d b/src/io/events.d new file mode 100644 index 0000000..e0c6591 --- /dev/null +++ b/src/io/events.d @@ -0,0 +1,177 @@ + +module io.events; + +import std.socket; +import std.outofmemory; + +private struct evloop { +} + +private struct ev_io { +} + +enum Event { + NONE = 0, + READ = 1, + WRITE = 2, + TIMEOUT = 4, + ERROR = 128, +} + +private extern (C) void ioev_event_callback(Watcher watcher, int revents) { + watcher.callback(revents); +} + +private extern (C) evloop* ioev_ev_default_loop(uint flags); /* is a inline function... */ +private extern (C) evloop* ev_loop_new(uint flags); +private extern (C) void ev_default_destroy(evloop *loop); +private extern (C) void ev_loop_destroy(evloop *loop); +private extern (C) void ev_loop(evloop *loop, int flags); + +private extern (C) void ev_io_start(evloop *loop, ev_io *w); +private extern (C) void ev_io_stop(evloop *loop, ev_io *w); + +private extern (C) ev_io* ioev_io_watcher_new(Watcher watcher); +private extern (C) void ioev_io_watcher_free(ev_io *w); +private extern (C) void ioev_io_set(ev_io *w, int fd, int events); + +class Watcher { + protected Loop m_cur_loop = null; + protected bool m_active = false; + + protected final Loop loop(Loop l) { + if (m_cur_loop is l) return l; + if (m_cur_loop !is null) stop(); + return m_cur_loop = l; + } + + ~this() { + loop = null; + } + + abstract void callback(int revents); + abstract protected void do_start(); + abstract protected void do_stop(); + + final public Loop loop() { + return m_cur_loop; + } + + final public void start(Loop l) { + if (l is null) throw new Exception("Cannot start on null loop"); + loop = l; + do_start(); + m_active = true; + } + + final public void start() { + start(m_cur_loop); + } + + final public void stop() { + if (!m_active) return; + if (m_cur_loop !is null && m_cur_loop.loop !is null) + do_stop(); + m_active = false; + } +} + +class IOWatcher : Watcher { + private ev_io *m_w; + private Socket m_socket; + private Event m_events; + private int m_fd = -1; + + void delegate() read, write, error; + + this() { + m_w = ioev_io_watcher_new(this); + if (m_w is null) throw new OutOfMemoryException; + } + ~this() { + ioev_io_watcher_free(m_w); + } + + override final void callback(int revents) { + if (revents & Event.READ) read(); + if (revents & Event.WRITE) write(); + if (revents & Event.ERROR) error(); + } + + private final void set(int fd, int ev) { + bool active = m_active; + stop(); + m_fd = fd; m_events = cast(Event)(ev); + ioev_io_set(m_w, m_fd, m_events); + if (active) start(); + } + + final Socket socket(Socket s) { + m_socket = s; + if (m_fd != s.handle) + set(s.handle, m_events); + return m_socket; + } + + final Event events() { return m_events; } + + final Event events(int ev) { + if (m_events != ev) set(m_fd, ev); + return m_events; + } + + final void events(bool waitForRead, bool waitForWrite) { + Event ev = Event.NONE; + if (waitForRead) ev = ev | Event.READ; + if (waitForWrite) ev = ev | Event.WRITE; + events(ev); + } + + override final protected void do_start() { + ev_io_start(loop.loop, m_w); + } + + override final protected void do_stop() { + ev_io_stop(loop.loop, m_w); + } +} + +class Loop { + protected evloop* loop; + private bool default_loop; + + private static Loop def = null; + + private this(evloop* loop) { + this.loop = loop; + default_loop = true; + } + + static synchronized Loop defaultloop() { + if (def !is null) return def; + evloop* l = ioev_ev_default_loop(0); + if (l is null) throw new Exception("ev_default_loop failed"); + def = new Loop(l); + return def; + } + + this() { + default_loop = false; + loop = ev_loop_new(0); + if (loop is null) throw new Exception("ev_loop_new failed"); + } + + ~this() { + if (loop !is null) { + if (default_loop) { + ev_default_destroy(loop); + } else { + ev_loop_destroy(loop); + } + } + } + + final void run() { + ev_loop(loop, 0); + } +} diff --git a/src/io/events_c.c b/src/io/events_c.c new file mode 100644 index 0000000..ae45e8b --- /dev/null +++ b/src/io/events_c.c @@ -0,0 +1,50 @@ + +#include + +#include + +#define UNUSED(x) ((void)(x)) + +#define IOEV_NONE 0 +#define IOEV_READ 1 +#define IOEV_WRITE 2 +#define IOEV_TIMEOUT 4 +#define IOEV_ERROR 128 + +void ioev_event_callback(void* watcher, int revents); + +struct ev_loop *ioev_ev_default_loop (unsigned int flags) { + return ev_default_loop(flags); +} + +static void io_cb(struct ev_loop *loop, ev_io *w, int revents) { + int ev = 0; + UNUSED(loop); + + if (revents & EV_READ) ev = ev | IOEV_READ; + if (revents & EV_WRITE) ev = ev | IOEV_WRITE; + if (revents & EV_ERROR) ev = ev | IOEV_ERROR; + + ioev_event_callback(w->data, ev); +} + +ev_io* ioev_io_watcher_new(void *watcher) { + ev_io *w = malloc(sizeof(ev_io)); + ev_io_init(w, io_cb, -1, 0); + w->data = watcher; + + return w; +} + +void ioev_io_watcher_free(ev_io *w) { + free(w); +} + +void ioev_io_set(ev_io *w, int fd, int ev) { + int events = 0; + + if (ev & IOEV_READ) events = events | EV_READ; + if (ev & IOEV_WRITE) events = events | EV_WRITE; + + ev_io_set(w, fd, events); +} diff --git a/src/io/events_c.o b/src/io/events_c.o new file mode 100644 index 0000000000000000000000000000000000000000..21bac7fbf2ea8d933e0c73922c8c3610367413a2 GIT binary patch literal 2136 zcmbtV%}Z2K6u&cLsWs&!35J#9CJgjpBor=WLd_whA}NZ3*PHQ84$T)cFFzt`U~=)0 zV3sX~2wK*nb(xT~ll}r}Rm%uMD6sRp@1F5?9XEA`d(Sz)^L5|3AM<2l>S8z)5=7@Vb-4F?1p{*xcB*R2X8(4BZ(H*G$+eGV$fhth{X6*J_rtWWWv7aAMZ0Yl2I9g3E zKaTkyRjmwBqh4}(ACe`P-g_-SC|>#inH(D%iT97YsjO3o4_ovbJa;+}U_Oe8DD?1} z2ra}yJ<-;-I`tq%*l}qQ;+W#V@JS$9jX2`R0T7L1x^Or)RtwAcJJ5;x0w%GNlDiZa zR%J2oIVs{^IpDJ_mED|UdG3NItQp61gq5mPgq1Gl^KOB#LeX=r$;;OVE1r|SFD&Wq z&f1x>lXp4&|MEB`z>(N*BuXg#t%0LQI`xnu_g)7{zSJU))YQceI@rYbio(K%U$=^{ z8aVfl;Y%c#J%Mdg^?yM*qW^$6p8J1a_|uNLV`! zvp%q9^F2k=yeBahFmLrYbVdfX~@+DeOT(N3pmcIRvoE3Vg&=N&IS>z3)Mb6p`P_F_xH zofmm0mn$L-C6HG-e=h?Ypu|%c+((RI0-i2igSr6Zbdkw>t-~{k{zwIQu0Hd`Rhc R4V>36KbmaFjj=)l^an3f=Y;?O literal 0 HcmV?d00001