module io.base; import std.socket; import io.events; R delegate(T) makeDelegate(R, T)(R function(T) f) { struct delegateClass { R function(T) m_f; R callback(T t) { return m_f(t); } } delegateClass* o = new delegateClass; o.m_f = f; return &o.callback; } private extern (C) int iobase_write(int fd, void *buf, int len); private extern (C) int iobase_read(int fd, void *buf, int len); class BufferedSocket { private Socket m_socket; private IOWatcher m_watcher; private void delegate(void[] buf) m_readcb; private void delegate() m_eofcb, m_closecb; private bool m_closed, m_eof, m_sendeof; private class chunk { chunk next; void[] buf; this(void[] b) { buf = b.dup; } } private chunk m_cfirst, m_clast; final void delegate(void[] buf) readcb() { return m_readcb; } final void delegate(void[] buf) readcb(void delegate(void[] buf) r) { m_readcb = r; if (m_readcb is null || m_eof || m_closed) { m_watcher.events = m_watcher.events & ~Event.READ; } else { m_watcher.events = m_watcher.events | Event.READ; } return m_readcb; } final void delegate() eofcb() { return m_eofcb; } final void delegate() eofcb(void delegate() f) { return m_eofcb = f; } final void delegate() closecb() { return m_closecb; } final void delegate() closecb(void delegate() f) { return m_closecb = f; } this(Loop loop, Socket sock, void delegate(void[] buf) readcb = null) { m_socket = sock; m_readcb = readcb; m_watcher = new IOWatcher(); m_watcher.socket = m_socket; m_watcher.events = Event.READ; m_watcher.read = &this.handleread; m_watcher.write = &this.handlewrite; m_watcher.start(loop); } ~this() { m_cfirst = m_clast = null; shutdown(); delete m_watcher; delete m_socket; } void write(void[] buf) { if (m_closed || m_sendeof || 0 == buf.length) return; chunk c = new chunk(buf); if (m_clast !is null) { m_clast.next = c; m_clast = c; } else { m_cfirst = m_clast = c; } m_watcher.events = m_watcher.events | Event.WRITE; } void shutdown() { if (m_closed || m_sendeof) return; m_sendeof = true; if (m_cfirst is null) { m_socket.shutdown(SocketShutdown.SEND); } } /* may destroy object */ private final void handleeof() { m_eof = true; m_watcher.events = m_watcher.events & ~Event.READ; if (m_eofcb !is null) { m_eofcb(); } else { delete this; } } /* may destroy object */ private final void handleclose() { m_closed = true; m_watcher.events = Event.NONE; if (m_closecb !is null) { m_closecb(); } else { delete this; } } private final void handleread() { if (m_readcb is null) { m_watcher.events = m_watcher.events & ~Event.READ; return; } ubyte buf[4096]; int res = iobase_read(m_socket.handle, buf.ptr, buf.length); if (res == 0) { handleeof(); } else if (res > 0) { m_readcb(buf[0 .. res]); } else if (res != -1) { handleclose(); } } private final void handlewrite() { while (m_cfirst !is null) { chunk c = m_cfirst; int res = iobase_write(m_socket.handle, c.buf.ptr, c.buf.length); if (res > 0) { if (res == c.buf.length) { m_cfirst = c.next; } else { c.buf = c.buf[res .. $]; return; } } else if (res == -1) { return; } else { handleclose(); return; } } if (m_sendeof) { m_socket.shutdown(SocketShutdown.SEND); } m_clast = null; m_watcher.events = m_watcher.events & ~Event.WRITE; } } class TcpServer { private Socket m_socket; private IOWatcher m_watcher; private void delegate(Socket con) m_accept_cb; public final Socket socket() { return m_socket; } this(Loop loop, Address bind, void delegate(Socket con) accept) { m_accept_cb = accept; m_socket = new TcpSocket(bind.addressFamily); m_socket.blocking = false; m_socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1); // m_socket.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, 1); m_socket.bind(bind); m_socket.listen(1024); m_watcher = new IOWatcher(); m_watcher.socket = m_socket; m_watcher.events = Event.READ; m_watcher.read = &this.acceptcb; m_watcher.start(loop); } private final void acceptcb() { Socket con = m_socket.accept(); if (con !is null) m_accept_cb(con); } } class EchoConnection { protected int m_idx = -1; protected EchoServer m_srv; protected Socket m_socket; protected BufferedSocket m_bufsock; this(Socket s) { m_socket = s; m_bufsock = new BufferedSocket(Loop.defaultloop, s, &this.handleread); } ~this() { if (m_srv !is null) m_srv.delCon(this); } private final void handleread(void[] buf) { m_bufsock.write(buf); } }; class EchoServer { private EchoConnection m_con[]; private Loop m_loop; private TcpServer m_server; this(Loop loop, Address address) { m_loop = loop; m_server = new TcpServer(m_loop, address, &this.accept); m_con = new EchoConnection[0]; } protected final void addCon(EchoConnection con) { con.m_srv = this; con.m_idx = m_con.length; m_con.length = m_con.length + 1; m_con[con.m_idx] = con; } protected final void delCon(EchoConnection con) { if (con.m_srv !is this) return; m_con[con.m_idx] = m_con[m_con.length - 1]; m_con.length = m_con.length - 1; con.m_srv = null; con.m_idx = -1; } private final void accept(Socket s) { auto con = new EchoConnection(s); addCon(con); } }; void main(string args[]) { Loop loop = Loop.defaultloop; EchoServer srv = new EchoServer(loop, new InternetAddress(8080)); Loop.defaultloop.run(); }