diff --git a/hawtdispatch-example/src/main/scala/org/fusesource/hawtdispatch/example/EchoServer.scala b/hawtdispatch-example/src/main/scala/org/fusesource/hawtdispatch/example/EchoServer.scala index 52e22f9..682deb3 100644 --- a/hawtdispatch-example/src/main/scala/org/fusesource/hawtdispatch/example/EchoServer.scala +++ b/hawtdispatch-example/src/main/scala/org/fusesource/hawtdispatch/example/EchoServer.scala @@ -17,14 +17,14 @@ package org.fusesource.hawtdispatch.example -import java.io.{IOException} -import java.net.{InetSocketAddress} +import java.io.IOException +import java.net.InetSocketAddress import org.fusesource.hawtdispatch._ -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; +import java.nio.ByteBuffer +import java.nio.channels.SelectionKey +import java.nio.channels.ServerSocketChannel +import java.nio.channels.SocketChannel /** * A simple echo server example. @@ -33,113 +33,117 @@ import java.nio.channels.SocketChannel; */ object EchoServer { - var port=4444; - - def main(args:Array[String]):Unit = { - run + def main(args:Array[String]) { + val PORT = 4444 + run(PORT) } - def run() = { - val server = new Server(port).start - println("Press enter to shutdown."); + def run(port: Int) { + val server = new Server(port) + server.start() + println("Press enter to shutdown.") System.in.read - server.stop + server.stop() } class Server(val port: Int) { - val channel = ServerSocketChannel.open(); - channel.socket().bind(new InetSocketAddress(port)); - channel.configureBlocking(false); + val channel = ServerSocketChannel.open() + channel.socket().bind(new InetSocketAddress(port)) + channel.configureBlocking(false) + val queue = createQueue("server") - val accept_source = createSource(channel, SelectionKey.OP_ACCEPT, queue); + val accept_source = createSource(channel, SelectionKey.OP_ACCEPT, queue) accept_source.setEventHandler(^ { - val socket = channel.accept(); + val socket = channel.accept() try { - socket.configureBlocking(false); + socket.configureBlocking(false) new Session(socket).start() } catch { case e: Exception => - socket.close + socket.close() } - }); + }) - println("Listening on port: "+port); + println("Listening on port: " + port) - def start() = { - accept_source.resume + def start(): Server = { + accept_source.resume() this } - def stop() = { - accept_source.cancel + def stop() { + accept_source.cancel() } accept_source.onCancel { - channel.close(); - println("Closed port: "+port); + channel.close() + println("Closed port: " + port) } - + } class Session(val channel: SocketChannel) { - - val buffer = ByteBuffer.allocate(1024); + val CAPACITY = 1024 + val buffer = ByteBuffer.allocate(CAPACITY) val queue = createQueue("session") - val read_source = createSource(channel, SelectionKey.OP_READ, queue); - val write_source = createSource(channel, SelectionKey.OP_WRITE, queue); + val read_source = createSource(channel, SelectionKey.OP_READ, queue) + val write_source = createSource(channel, SelectionKey.OP_WRITE, queue) val remote_address = channel.socket.getRemoteSocketAddress.toString - def start() = { - println("Accepted connection from: "+remote_address); - read_source.resume + def start() { + println("Accepted connection from: " + remote_address) + read_source.resume() } - def close() = { - read_source.cancel + def close() { + read_source.cancel() } read_source.onCancel { - write_source.cancel + write_source.cancel() } write_source.onCancel { - channel.close - println("Closed connection from: "+remote_address); + channel.close() + println("Closed connection from: " + remote_address) } read_source.setEventHandler(^{ try { - if (channel.read(buffer) == -1) { - close - } else { - buffer.flip; - if (buffer.remaining > 0) { - read_source.suspend - write_source.resume - } else { - buffer.clear + channel.read(buffer) match { + case -1 => close() + case _ => { + buffer.flip + if(buffer.hasRemaining) { + read_source.suspend() + write_source.resume() + } + else{ + buffer.clear + } } } } catch { - case e:IOException => close + case e:IOException => close() } }) write_source.setEventHandler(^{ try { channel.write(buffer) - if (buffer.remaining == 0) { + if (!buffer.hasRemaining) { buffer.clear - write_source.suspend - read_source.resume + write_source.suspend() + read_source.resume() } } catch { - case e:IOException => close + case e:IOException => close() } }) } + }