Răsfoiți Sursa

- Fixes an issue on read code
- Prevents disconnection code from being called more then one time(futher calls to close shall be ignored now)
- Refactoring
- Minor fix on ant buildfile

KenM 17 ani în urmă
părinte
comite
bb9fe0e6f3

+ 6 - 0
MMOCore/build.xml

@@ -56,6 +56,12 @@
 				<include name="*.jar"/>
 			</fileset>
 		</copy>
+		
+		<copy todir="${build.dist}">
+			<fileset dir="${build}">
+				<include name="*.jar"/>
+			</fileset>
+		</copy>
 	</target>
 	
 	<target name="dist"

BIN
MMOCore/lib/javolution.jar


+ 1 - 1
MMOCore/src/org/mmocore/network/MMOClient.java

@@ -58,7 +58,7 @@ public abstract class MMOClient<T extends MMOConnection>
     
     public abstract boolean encrypt(ByteBuffer buf, int size);
     
-    protected void onDisconection()
+    protected void onDisconnection()
     {
     }
     

+ 26 - 15
MMOCore/src/org/mmocore/network/MMOConnection.java

@@ -23,7 +23,7 @@ import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.WritableByteChannel;
 
-import org.mmocore.util.collections.concurrent.SemiConcurrentLinkedList;
+import javolution.util.FastList;
 
 
 /**
@@ -39,7 +39,7 @@ public class MMOConnection<T extends MMOClient>
     private WritableByteChannel _writableByteChannel;
     private ReadableByteChannel _readableByteChannel;
     
-    private SemiConcurrentLinkedList<SendablePacket<T>> _sendQueue = new SemiConcurrentLinkedList<SendablePacket<T>>();
+    private FastList<SendablePacket<T>> _sendQueue = new FastList<SendablePacket<T>>();
     private SelectionKey _selectionKey;
     
     private int _readHeaderPending;
@@ -194,7 +194,7 @@ public class MMOConnection<T extends MMOClient>
         return _readableByteChannel;
     }
     
-    protected SemiConcurrentLinkedList<SendablePacket<T>> getSendQueue()
+    protected FastList<SendablePacket<T>> getSendQueue()
     {
         return _sendQueue;
     }
@@ -395,31 +395,42 @@ public class MMOConnection<T extends MMOClient>
     {
         synchronized (this.getSendQueue())
         {
-            _pendingClose = true;
-            this.getSendQueue().clear();
-            this.disableWriteInterest();
+            if (!this.isClosed())
+            {
+                _pendingClose = true;
+                this.getSendQueue().clear();
+                this.disableWriteInterest();
+                this.getSelectorThread().closeConnection(this);
+            }
         }
-        this.getSelectorThread().closeConnection(this);
+        
     }
     
     public void close(SendablePacket<T> sp)
     {
         synchronized (this.getSendQueue())
         {
-            this.getSendQueue().clear();
-            this.sendPacket(sp);
-            _pendingClose = true;
+            if (!this.isClosed())
+            {
+                this.getSendQueue().clear();
+                this.sendPacket(sp);
+                _pendingClose = true;
+                this.getSelectorThread().closeConnection(this);
+            }
         }
-        this.getSelectorThread().closeConnection(this);
     }
     
     protected void closeLater()
     {
         synchronized (this.getSendQueue())
         {
-            _pendingClose = true;
+            if (!this.isClosed())
+            {
+                _pendingClose = true;
+                this.getSelectorThread().closeConnection(this);
+            }
         }
-        this.getSelectorThread().closeConnection(this);
+        
     }
     
     protected void releaseBuffers()
@@ -441,9 +452,9 @@ public class MMOConnection<T extends MMOClient>
         }
     }
     
-    protected void onDisconection()
+    protected void onDisconnection()
     {
-        this.getClient().onDisconection();
+        this.getClient().onDisconnection();
     }
     
     protected void onForcedDisconnection()

+ 1 - 1
MMOCore/src/org/mmocore/network/ReceivablePacket.java

@@ -84,7 +84,7 @@ public abstract class ReceivablePacket<T extends MMOClient> extends AbstractPack
         {
             tb.append(ch);
         }
-        String str = tb.stringValue();
+        String str = tb.toString();
         TextBuilder.recycle(tb);
         return str;
     }

+ 19 - 1
MMOCore/src/org/mmocore/network/SelectorConfig.java

@@ -25,8 +25,13 @@ import java.nio.ByteOrder;
  */
 public class SelectorConfig<T extends MMOClient>
 {
+    // UDP
     private final UDPHeaderHandler<T> UDP_HEADER_HANDLER;
+    private final IPacketHandler<T> UDP_PACKET_HANDLER;
+    
+    // TCP
     private final TCPHeaderHandler<T> TCP_HEADER_HANDLER;
+    private final IPacketHandler<T> TCP_PACKET_HANDLER;
     
     private int READ_BUFFER_SIZE = 64*1024;
     private int WRITE_BUFFER_SIZE = 64*1024;
@@ -50,10 +55,13 @@ public class SelectorConfig<T extends MMOClient>
         INT_HEADER,
     }
     
-    public SelectorConfig(UDPHeaderHandler<T> udpHeaderHandler, TCPHeaderHandler<T> tcpHeaderHandler)
+    public SelectorConfig(UDPHeaderHandler<T> udpHeaderHandler, IPacketHandler<T> udpPacketHandler, TCPHeaderHandler<T> tcpHeaderHandler, IPacketHandler<T> tcpPacketHandler)
     {
         UDP_HEADER_HANDLER = udpHeaderHandler;
+        UDP_PACKET_HANDLER = udpPacketHandler;
+        
         TCP_HEADER_HANDLER = tcpHeaderHandler;
+        TCP_PACKET_HANDLER = tcpPacketHandler;
     }
     
     public int getReadBufferSize()
@@ -91,10 +99,20 @@ public class SelectorConfig<T extends MMOClient>
         return UDP_HEADER_HANDLER;
     }
     
+    public IPacketHandler<T> getUDPPacketHandler()
+    {
+        return UDP_PACKET_HANDLER;
+    }
+    
     public TCPHeaderHandler<T> getTCPHeaderHandler()
     {
         return TCP_HEADER_HANDLER;
     }
+    
+    public IPacketHandler<T> getTCPPacketHandler()
+    {
+        return TCP_PACKET_HANDLER;
+    }
 
     /**
      * Server will try to send maxSendPerPass packets per socket write call however it may send less if the write buffer was filled before achieving this value.

+ 54 - 50
MMOCore/src/org/mmocore/network/SelectorThread.java

@@ -35,11 +35,8 @@ import java.nio.channels.SocketChannel;
 import java.util.Iterator;
 import java.util.Set;
 
-import org.mmocore.util.collections.concurrent.SemiConcurrentLinkedList;
-
-
-
 import javolution.util.FastList;
+import javolution.util.FastList.Node;
 
 /**
  * @author KenM<BR>
@@ -79,7 +76,7 @@ public class SelectorThread<T extends MMOClient> extends Thread
     // ByteBuffers General Purpose Pool
     private final FastList<ByteBuffer> _bufferPool = new FastList<ByteBuffer>();
 
-    public SelectorThread(SelectorConfig<T> sc, IPacketHandler<T> udpPacketHandler, IPacketHandler<T> packetHandler, IMMOExecutor<T> executor, IClientFactory<T> clientFactory, IAcceptFilter acceptFilter) throws IOException
+    public SelectorThread(SelectorConfig<T> sc, IMMOExecutor<T> executor, IClientFactory<T> clientFactory, IAcceptFilter acceptFilter) throws IOException
     {
         HELPER_BUFFER_SIZE = sc.getHelperBufferSize();
         HELPER_BUFFER_COUNT = sc.getHelperBufferCount();
@@ -95,8 +92,8 @@ public class SelectorThread<T extends MMOClient> extends Thread
         _tcpHeaderHandler = sc.getTCPHeaderHandler();
         this.initBufferPool();
         _acceptFilter = acceptFilter;
-        _packetHandler = packetHandler;
-        _udpPacketHandler = udpPacketHandler;
+        _packetHandler = sc.getTCPPacketHandler();
+        _udpPacketHandler = sc.getUDPPacketHandler();
         _clientFactory = clientFactory;
         this.setExecutor(executor);
         this.initializeSelector();
@@ -256,15 +253,18 @@ public class SelectorThread<T extends MMOClient> extends Thread
             }
             
             // process pending close
-            for (n = this.getPendingClose().head(), end = this.getPendingClose().tail(); (n = n.getNext()) != end;)
+            synchronized (this.getPendingClose())
             {
-                con = n.getValue();
-                if (con.getSendQueue().isEmpty())
+                for (n = this.getPendingClose().head(), end = this.getPendingClose().tail(); (n = n.getNext()) != end;)
                 {
-                    temp = n.getPrevious();
-                    this.getPendingClose().delete(n);
-                    n = temp;
-                    this.closeConnectionImpl(con);
+                    con = n.getValue();
+                    if (con.getSendQueue().isEmpty())
+                    {
+                        temp = n.getPrevious();
+                        this.getPendingClose().delete(n);
+                        n = temp;
+                        this.closeConnectionImpl(con);
+                    }
                 }
             }
             
@@ -355,12 +355,6 @@ public class SelectorThread<T extends MMOClient> extends Thread
         {
             buf = READ_BUFFER;
         }
-        else
-        {
-            //buf.limit(buf.capacity());
-            //buf.position()
-            //System.out.println("TEM BUF PENDENTE LIMIT: "+buf.limit());
-        }
         int result = -2;
         
         // if we try to to do a read with no space in the buffer it will read 0 bytes
@@ -368,8 +362,8 @@ public class SelectorThread<T extends MMOClient> extends Thread
         if (buf.position() == buf.limit())
         {
             // should never happen
-            System.out.println("POS ANTES SC.READ(): "+buf.position()+" limit: "+buf.limit());
-            System.out.println("NOOBISH ERROR "+( buf == READ_BUFFER ? "READ_BUFFER" : "temp"));
+            System.err.println("POS ANTES SC.READ(): "+buf.position()+" limit: "+buf.limit());
+            System.err.println("NOOBISH ERROR "+( buf == READ_BUFFER ? "READ_BUFFER" : "temp"));
             System.exit(0);
         }
         
@@ -394,6 +388,13 @@ public class SelectorThread<T extends MMOClient> extends Thread
                 // try to read as many packets as possible
                 while (this.tryReadPacket2(key, client, buf));
             }
+            else
+            {
+                if (buf == READ_BUFFER)
+                {
+                    READ_BUFFER.clear();
+                }
+            }
         }
         else if (result == 0)
         {
@@ -1026,38 +1027,41 @@ public class SelectorThread<T extends MMOClient> extends Thread
             
             int i = 0;
 
-            SemiConcurrentLinkedList<SendablePacket<T>> sendQueue = con.getSendQueue();
-            SemiConcurrentLinkedList<SendablePacket<T>>.Node<SendablePacket<T>> n, temp, end;
+            FastList<SendablePacket<T>> sendQueue = con.getSendQueue();
+            Node<SendablePacket<T>> n, temp, end;
             SendablePacket<T> sp;
 
-            for (n = sendQueue.getStart(), end = sendQueue.getEnd(); (n = n.getNext()) != end && i++ < MAX_SEND_PER_PASS;)
+            synchronized (sendQueue)
             {
-                sp = n.getValue();
-                // put into WriteBuffer
-                this.putPacketIntoWriteBuffer(con.getClient(), sp);
-
-                // delete packet from queue
-                temp = n.getPrevious();
-                sendQueue.remove(n);
-                n = temp;
-
-                WRITE_BUFFER.flip();
-                //System.err.println("WB SIZE: "+WRITE_BUFFER.limit());
-                if (DIRECT_WRITE_BUFFER.remaining() >= WRITE_BUFFER.limit())
+                for (n = sendQueue.head(), end = sendQueue.tail(); (n = n.getNext()) != end && i++ < MAX_SEND_PER_PASS;)
                 {
-                    /*if (i == 0)
+                    sp = n.getValue();
+                    // put into WriteBuffer
+                    this.putPacketIntoWriteBuffer(con.getClient(), sp);
+
+                    // delete packet from queue
+                    temp = n.getPrevious();
+                    sendQueue.delete(n);
+                    n = temp;
+
+                    WRITE_BUFFER.flip();
+                    //System.err.println("WB SIZE: "+WRITE_BUFFER.limit());
+                    if (DIRECT_WRITE_BUFFER.remaining() >= WRITE_BUFFER.limit())
                     {
-                        // mark begining of new data from previous pending data
-                        con.setWriterMark(DIRECT_WRITE_BUFFER.position());
-                    }*/
-                    DIRECT_WRITE_BUFFER.put(WRITE_BUFFER);
-                }
-                else
-                {
-                    // there is no more space in the direct buffer
-                    //con.addWriteBuffer(this.getPooledBuffer().put(WRITE_BUFFER));
-                    con.createWriteBuffer(WRITE_BUFFER);
-                    break;
+                        /*if (i == 0)
+                        {
+                            // mark begining of new data from previous pending data
+                            con.setWriterMark(DIRECT_WRITE_BUFFER.position());
+                        }*/
+                        DIRECT_WRITE_BUFFER.put(WRITE_BUFFER);
+                    }
+                    else
+                    {
+                        // there is no more space in the direct buffer
+                        //con.addWriteBuffer(this.getPooledBuffer().put(WRITE_BUFFER));
+                        con.createWriteBuffer(WRITE_BUFFER);
+                        break;
+                    }
                 }
             }
         }
@@ -1181,7 +1185,7 @@ public class SelectorThread<T extends MMOClient> extends Thread
         try
         {
             // notify connection
-            con.onDisconection();
+            con.onDisconnection();
         }
         finally
         {

+ 2 - 2
MMOCore/src/org/mmocore/test/testserver/TestServer.java

@@ -25,9 +25,9 @@ public class TestServer
         }
         
         SelectorHelper sh = new SelectorHelper();
-        SelectorConfig<ServerClient> ssc = new SelectorConfig<ServerClient>(null, sh);
+        SelectorConfig<ServerClient> ssc = new SelectorConfig<ServerClient>(null, null, sh, sh);
         ssc.setMaxSendPerPass(2);
-        SelectorThread<ServerClient> selector = new SelectorThread<ServerClient>(ssc, null, sh, sh, sh, null);
+        SelectorThread<ServerClient> selector = new SelectorThread<ServerClient>(ssc, sh, sh, null);
         selector.openServerSocket(null, PORT);
         //selector.openDatagramSocket();
         selector.start();