You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

243 lines
5.4 KiB

module io.base;
import std.socket;
import io.events;
R delegate(T) makeDelegate(R, T)(R function(T) f) {
struct delegateClass {
R function(T) m_f;
R callback(T t) { return m_f(t); }
}
delegateClass* o = new delegateClass;
o.m_f = f;
return &o.callback;
}
private extern (C) int iobase_write(int fd, void *buf, int len);
private extern (C) int iobase_read(int fd, void *buf, int len);
class BufferedSocket {
private Socket m_socket;
private IOWatcher m_watcher;
private void delegate(void[] buf) m_readcb;
private void delegate() m_eofcb, m_closecb;
private bool m_closed, m_eof, m_sendeof;
private class chunk {
chunk next;
void[] buf;
this(void[] b) { buf = b.dup; }
}
private chunk m_cfirst, m_clast;
final void delegate(void[] buf) readcb() { return m_readcb; }
final void delegate(void[] buf) readcb(void delegate(void[] buf) r) {
m_readcb = r;
if (m_readcb is null || m_eof || m_closed) {
m_watcher.events = m_watcher.events & ~Event.READ;
} else {
m_watcher.events = m_watcher.events | Event.READ;
}
return m_readcb;
}
final void delegate() eofcb() { return m_eofcb; }
final void delegate() eofcb(void delegate() f) { return m_eofcb = f; }
final void delegate() closecb() { return m_closecb; }
final void delegate() closecb(void delegate() f) { return m_closecb = f; }
this(Loop loop, Socket sock, void delegate(void[] buf) readcb = null) {
m_socket = sock;
m_readcb = readcb;
m_watcher = new IOWatcher();
m_watcher.socket = m_socket;
m_watcher.events = Event.READ;
m_watcher.read = &this.handleread;
m_watcher.write = &this.handlewrite;
m_watcher.start(loop);
}
~this() {
m_cfirst = m_clast = null;
shutdown();
delete m_watcher;
delete m_socket;
}
void write(void[] buf) {
if (m_closed || m_sendeof || 0 == buf.length) return;
chunk c = new chunk(buf);
if (m_clast !is null) {
m_clast.next = c;
m_clast = c;
} else {
m_cfirst = m_clast = c;
}
m_watcher.events = m_watcher.events | Event.WRITE;
}
void shutdown() {
if (m_closed || m_sendeof) return;
m_sendeof = true;
if (m_cfirst is null) {
m_socket.shutdown(SocketShutdown.SEND);
}
}
/* may destroy object */
private final void handleeof() {
m_eof = true;
m_watcher.events = m_watcher.events & ~Event.READ;
if (m_eofcb !is null) {
m_eofcb();
} else {
delete this;
}
}
/* may destroy object */
private final void handleclose() {
m_closed = true;
m_watcher.events = Event.NONE;
if (m_closecb !is null) {
m_closecb();
} else {
delete this;
}
}
private final void handleread() {
if (m_readcb is null) {
m_watcher.events = m_watcher.events & ~Event.READ;
return;
}
ubyte buf[4096];
int res = iobase_read(m_socket.handle, buf.ptr, buf.length);
if (res == 0) {
handleeof();
} else if (res > 0) {
m_readcb(buf[0 .. res]);
} else if (res != -1) {
handleclose();
}
}
private final void handlewrite() {
while (m_cfirst !is null) {
chunk c = m_cfirst;
int res = iobase_write(m_socket.handle, c.buf.ptr, c.buf.length);
if (res > 0) {
if (res == c.buf.length) {
m_cfirst = c.next;
} else {
c.buf = c.buf[res .. $];
return;
}
} else if (res == -1) {
return;
} else {
handleclose();
return;
}
}
if (m_sendeof) {
m_socket.shutdown(SocketShutdown.SEND);
}
m_clast = null;
m_watcher.events = m_watcher.events & ~Event.WRITE;
}
}
class TcpServer {
private Socket m_socket;
private IOWatcher m_watcher;
private void delegate(Socket con) m_accept_cb;
public final Socket socket() { return m_socket; }
this(Loop loop, Address bind, void delegate(Socket con) accept) {
m_accept_cb = accept;
m_socket = new TcpSocket(bind.addressFamily);
m_socket.blocking = false;
m_socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1);
// m_socket.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, 1);
m_socket.bind(bind);
m_socket.listen(1024);
m_watcher = new IOWatcher();
m_watcher.socket = m_socket;
m_watcher.events = Event.READ;
m_watcher.read = &this.acceptcb;
m_watcher.start(loop);
}
private final void acceptcb() {
Socket con = m_socket.accept();
if (con !is null) m_accept_cb(con);
}
}
class EchoConnection {
protected int m_idx = -1;
protected EchoServer m_srv;
protected Socket m_socket;
protected BufferedSocket m_bufsock;
this(Socket s) {
m_socket = s;
m_bufsock = new BufferedSocket(Loop.defaultloop, s, &this.handleread);
}
~this() {
if (m_srv !is null) m_srv.delCon(this);
}
private final void handleread(void[] buf) {
m_bufsock.write(buf);
}
};
class EchoServer {
private EchoConnection m_con[];
private Loop m_loop;
private TcpServer m_server;
this(Loop loop, Address address) {
m_loop = loop;
m_server = new TcpServer(m_loop, address, &this.accept);
m_con = new EchoConnection[0];
}
protected final void addCon(EchoConnection con) {
con.m_srv = this;
con.m_idx = m_con.length;
m_con.length = m_con.length + 1;
m_con[con.m_idx] = con;
}
protected final void delCon(EchoConnection con) {
if (con.m_srv !is this) return;
m_con[con.m_idx] = m_con[m_con.length - 1];
m_con.length = m_con.length - 1;
con.m_srv = null;
con.m_idx = -1;
}
private final void accept(Socket s) {
auto con = new EchoConnection(s);
addCon(con);
}
};
void main(string args[]) {
Loop loop = Loop.defaultloop;
EchoServer srv = new EchoServer(loop, new InternetAddress(8080));
Loop.defaultloop.run();
}