1
0
mirror of https://github.com/lidgren/lidgren-network-gen3.git synced 2026-05-18 08:06:33 +09:00

Sequence numbers now stored in dictionary per connection to enable sending the same message reliable ordered message to several connections

This commit is contained in:
lidgren
2010-05-20 18:36:19 +00:00
parent 00edeaabf4
commit 046ca06ec6
11 changed files with 84 additions and 79 deletions

View File

@@ -100,9 +100,10 @@ namespace Lidgren.Network
} }
m_owner.LogVerbose("Sending Connect"); m_owner.LogVerbose("Sending Connect");
m_owner.SendImmediately(this, om); double now = NetTime.Now;
m_owner.SendImmediately(now, this, om);
m_connectInitationTime = NetTime.Now; m_connectInitationTime = now;
SetStatus(NetConnectionStatus.Connecting, "Connecting"); SetStatus(NetConnectionStatus.Connecting, "Connecting");
return; return;
@@ -110,24 +111,28 @@ namespace Lidgren.Network
internal void SendConnectResponse() internal void SendConnectResponse()
{ {
double now = NetTime.Now;
NetOutgoingMessage reply = m_owner.CreateMessage(4); NetOutgoingMessage reply = m_owner.CreateMessage(4);
reply.m_type = NetMessageType.Library; reply.m_type = NetMessageType.Library;
reply.m_libType = NetMessageLibraryType.ConnectResponse; reply.m_libType = NetMessageLibraryType.ConnectResponse;
reply.Write((float)NetTime.Now); reply.Write((float)now);
m_owner.LogVerbose("Sending LibraryConnectResponse"); m_owner.LogVerbose("Sending LibraryConnectResponse");
m_owner.SendImmediately(this, reply); m_owner.SendImmediately(now, this, reply);
} }
internal void SendConnectionEstablished() internal void SendConnectionEstablished()
{ {
double now = NetTime.Now;
NetOutgoingMessage ce = m_owner.CreateMessage(4); NetOutgoingMessage ce = m_owner.CreateMessage(4);
ce.m_type = NetMessageType.Library; ce.m_type = NetMessageType.Library;
ce.m_libType = NetMessageLibraryType.ConnectionEstablished; ce.m_libType = NetMessageLibraryType.ConnectionEstablished;
ce.Write((float)NetTime.Now); ce.Write((float)now);
m_owner.LogVerbose("Sending LibraryConnectionEstablished"); m_owner.LogVerbose("Sending LibraryConnectionEstablished");
m_owner.SendImmediately(this, ce); m_owner.SendImmediately(now, this, ce);
} }
internal void ExecuteDisconnect(bool sendByeMessage) internal void ExecuteDisconnect(bool sendByeMessage)
@@ -160,9 +165,9 @@ namespace Lidgren.Network
// release some held memory // release some held memory
if (m_storedMessages != null) if (m_storedMessages != null)
{ {
foreach (List<NetOutgoingMessage> oml in m_storedMessages) foreach (var dict in m_storedMessages)
if (oml != null) if (dict != null)
oml.Clear(); dict.Clear();
} }
m_acknowledgesToSend.Clear(); m_acknowledgesToSend.Clear();

View File

@@ -59,15 +59,17 @@ namespace Lidgren.Network
internal void HandleIncomingPing(byte pingNumber) internal void HandleIncomingPing(byte pingNumber)
{ {
double now = NetTime.Now;
// send pong // send pong
NetOutgoingMessage pong = m_owner.CreateMessage(1); NetOutgoingMessage pong = m_owner.CreateMessage(1);
pong.m_type = NetMessageType.Library; pong.m_type = NetMessageType.Library;
pong.m_libType = NetMessageLibraryType.Pong; pong.m_libType = NetMessageLibraryType.Pong;
pong.Write((byte)pingNumber); pong.Write((byte)pingNumber);
pong.Write(NetTime.Now); pong.Write(now);
// send immediately // send immediately
m_owner.SendImmediately(this, pong); m_owner.SendImmediately(now, this, pong);
} }
internal void HandleIncomingPong(double receiveNow, byte pingNumber, double remoteNetTime) internal void HandleIncomingPong(double receiveNow, byte pingNumber, double remoteNetTime)
@@ -146,7 +148,7 @@ namespace Lidgren.Network
ping.m_libType = NetMessageLibraryType.Ping; ping.m_libType = NetMessageLibraryType.Ping;
ping.Write((byte)m_lastSentPingNumber); ping.Write((byte)m_lastSentPingNumber);
m_owner.SendImmediately(this, ping); m_owner.SendImmediately(now, this, ping);
m_lastPingSendTime = NetTime.Now; // need exact number m_lastPingSendTime = NetTime.Now; // need exact number
m_nextPing = now + m_owner.Configuration.m_pingFrequency; m_nextPing = now + m_owner.Configuration.m_pingFrequency;

View File

@@ -28,8 +28,7 @@ namespace Lidgren.Network
private ushort[] m_nextSendSequenceNumber; private ushort[] m_nextSendSequenceNumber;
private ushort[] m_lastReceivedSequenced; private ushort[] m_lastReceivedSequenced;
// TODO: naïve! replace by something better? internal readonly Dictionary<ushort, NetOutgoingMessage>[] m_storedMessages = new Dictionary<ushort, NetOutgoingMessage>[NetConstants.NumReliableChannels];
internal readonly List<NetOutgoingMessage>[] m_storedMessages = new List<NetOutgoingMessage>[NetConstants.NumReliableChannels];
internal readonly NetBitVector m_storedMessagesNotEmpty = new NetBitVector(NetConstants.NumReliableChannels); internal readonly NetBitVector m_storedMessagesNotEmpty = new NetBitVector(NetConstants.NumReliableChannels);
private readonly ushort[] m_nextExpectedReliableSequence = new ushort[NetConstants.NumReliableChannels]; private readonly ushort[] m_nextExpectedReliableSequence = new ushort[NetConstants.NumReliableChannels];
@@ -44,7 +43,7 @@ namespace Lidgren.Network
int retval = 0; int retval = 0;
for (int i = 0; i < m_storedMessages.Length; i++) for (int i = 0; i < m_storedMessages.Length; i++)
{ {
List<NetOutgoingMessage> list = m_storedMessages[i]; var list = m_storedMessages[i];
if (list != null) if (list != null)
retval += list.Count; retval += list.Count;
} }
@@ -87,23 +86,26 @@ namespace Lidgren.Network
return false; return false;
} }
// called the FIRST time a reliable message is sent // called by Encode() to retrieve a sequence number and store the message for potential resending
private void StoreReliableMessage(double now, NetOutgoingMessage msg) internal ushort StoreReliableMessage(double now, NetOutgoingMessage msg)
{ {
m_owner.VerifyNetworkThread(); m_owner.VerifyNetworkThread();
ushort seqNr = GetSendSequenceNumber(msg.m_type);
int reliableSlot = (int)msg.m_type - (int)NetMessageType.UserReliableUnordered; int reliableSlot = (int)msg.m_type - (int)NetMessageType.UserReliableUnordered;
List<NetOutgoingMessage> list = m_storedMessages[reliableSlot]; Dictionary<ushort, NetOutgoingMessage> slotDict = m_storedMessages[reliableSlot];
if (list == null) if (slotDict == null)
{ {
list = new List<NetOutgoingMessage>(); slotDict = new Dictionary<ushort, NetOutgoingMessage>();
m_storedMessages[reliableSlot] = list; m_storedMessages[reliableSlot] = slotDict;
} }
Interlocked.Increment(ref msg.m_inQueueCount);
list.Add(msg);
if (list.Count == 1) Interlocked.Increment(ref msg.m_inQueueCount);
slotDict.Add(seqNr, msg);
if (slotDict.Count > 0)
m_storedMessagesNotEmpty.Set(reliableSlot, true); m_storedMessagesNotEmpty.Set(reliableSlot, true);
// schedule next resend // schedule next resend
@@ -111,9 +113,11 @@ namespace Lidgren.Network
float[] baseTimes = m_peerConfiguration.m_resendBaseTime; float[] baseTimes = m_peerConfiguration.m_resendBaseTime;
float[] multiplers = m_peerConfiguration.m_resendRTTMultiplier; float[] multiplers = m_peerConfiguration.m_resendRTTMultiplier;
msg.m_nextResendTime = now + baseTimes[numSends] + (m_averageRoundtripTime * multiplers[numSends]); msg.m_nextResendTime = now + baseTimes[numSends] + (m_averageRoundtripTime * multiplers[numSends]);
return seqNr;
} }
private void Resend(double now, NetOutgoingMessage msg) private void Resend(double now, ushort seqNr, NetOutgoingMessage msg)
{ {
m_owner.VerifyNetworkThread(); m_owner.VerifyNetworkThread();
@@ -123,8 +127,7 @@ namespace Lidgren.Network
{ {
// no more resends! We failed! // no more resends! We failed!
int reliableSlot = (int)msg.m_type - (int)NetMessageType.UserReliableUnordered; int reliableSlot = (int)msg.m_type - (int)NetMessageType.UserReliableUnordered;
List<NetOutgoingMessage> list = m_storedMessages[reliableSlot]; m_storedMessages[reliableSlot].Remove(seqNr);
list.Remove(msg);
m_owner.LogWarning("Failed to deliver reliable message " + msg); m_owner.LogWarning("Failed to deliver reliable message " + msg);
Disconnect("Failed to deliver reliable message!"); Disconnect("Failed to deliver reliable message!");
@@ -162,30 +165,25 @@ namespace Lidgren.Network
// remove stored message // remove stored message
int reliableSlot = (int)tp - (int)NetMessageType.UserReliableUnordered; int reliableSlot = (int)tp - (int)NetMessageType.UserReliableUnordered;
List<NetOutgoingMessage> list = m_storedMessages[reliableSlot]; var dict = m_storedMessages[reliableSlot];
if (list == null) if (dict == null)
continue; continue;
// find message // find message
for (int a = 0; a < list.Count; a++) NetOutgoingMessage om;
if (dict.TryGetValue(seqNr, out om))
{ {
NetOutgoingMessage om = list[a]; // found!
if (om.m_sequenceNumber == seqNr) dict.Remove(seqNr);
{ Interlocked.Decrement(ref om.m_inQueueCount);
// found!
list.RemoveAt(a);
Interlocked.Decrement(ref om.m_inQueueCount);
NetException.Assert(om.m_lastSentTime != 0); NetException.Assert(om.m_lastSentTime != 0);
if (om.m_lastSentTime > m_lastSendRespondedTo) if (om.m_lastSentTime > m_lastSendRespondedTo)
m_lastSendRespondedTo = om.m_lastSentTime; m_lastSendRespondedTo = om.m_lastSentTime;
if (om.m_inQueueCount < 1) if (om.m_inQueueCount < 1)
m_owner.Recycle(om); m_owner.Recycle(om);
break;
}
} }
// TODO: receipt handling // TODO: receipt handling

View File

@@ -148,11 +148,13 @@ namespace Lidgren.Network
{ {
if (m_storedMessagesNotEmpty.Get(i)) if (m_storedMessagesNotEmpty.Get(i))
{ {
foreach (NetOutgoingMessage om in m_storedMessages[i]) Dictionary<ushort, NetOutgoingMessage> dict = m_storedMessages[i];
foreach (ushort seqNr in m_storedMessages[i].Keys)
{ {
NetOutgoingMessage om = dict[seqNr];
if (now >= om.m_nextResendTime) if (now >= om.m_nextResendTime)
{ {
Resend(now, om); Resend(now, seqNr, om);
break; // need to break out here; collection may have been modified break; // need to break out here; collection may have been modified
} }
} }
@@ -216,15 +218,9 @@ namespace Lidgren.Network
// encode message // encode message
// //
ptr = msg.Encode(buffer, ptr, this); ptr = msg.Encode(now, buffer, ptr, this);
numIncludedMessages++; numIncludedMessages++;
if (msg.m_type >= NetMessageType.UserReliableUnordered && msg.m_numSends == 1)
{
// message is sent for the first time, and is reliable, store for resend
StoreReliableMessage(now, msg);
}
// room to piggyback some acks? // room to piggyback some acks?
if (m_acknowledgesToSend.Count > 0) if (m_acknowledgesToSend.Count > 0)
{ {
@@ -612,12 +608,12 @@ namespace Lidgren.Network
// shorten resend times // shorten resend times
for (int i = 0; i < m_storedMessages.Length; i++) for (int i = 0; i < m_storedMessages.Length; i++)
{ {
List<NetOutgoingMessage> list = m_storedMessages[i]; Dictionary<ushort, NetOutgoingMessage> dict = m_storedMessages[i];
if (list != null) if (dict != null)
{ {
try try
{ {
foreach (NetOutgoingMessage om in list) foreach (NetOutgoingMessage om in dict.Values)
om.m_nextResendTime = (om.m_nextResendTime * 0.8) - 0.05; om.m_nextResendTime = (om.m_nextResendTime * 0.8) - 0.05;
} }
catch (InvalidOperationException) catch (InvalidOperationException)

View File

@@ -113,6 +113,7 @@ namespace Lidgren.Network
{ {
m_bitLength = 0; m_bitLength = 0;
m_readPosition = 0; m_readPosition = 0;
m_status = NetIncomingMessageReleaseStatus.NotReleased;
m_fragmentationInfo = null; m_fragmentationInfo = null;
} }

View File

@@ -32,7 +32,6 @@ namespace Lidgren.Network
internal NetMessageType m_type; internal NetMessageType m_type;
internal NetMessageLibraryType m_libType; internal NetMessageLibraryType m_libType;
internal ushort m_sequenceNumber;
internal IPEndPoint m_unconnectedRecipient; internal IPEndPoint m_unconnectedRecipient;
@@ -99,7 +98,8 @@ namespace Lidgren.Network
return ptr; return ptr;
} }
internal int Encode(byte[] buffer, int ptr, NetConnection conn) // encode and store for resending (if conn != null and message is reliable)
internal int Encode(double now, byte[] buffer, int ptr, NetConnection conn)
{ {
// message type // message type
buffer[ptr++] = (byte)((int)m_type | (m_fragmentGroupId == -1 ? 0 : 128)); buffer[ptr++] = (byte)((int)m_type | (m_fragmentGroupId == -1 ? 0 : 128));
@@ -112,10 +112,15 @@ namespace Lidgren.Network
{ {
if (conn == null) if (conn == null)
throw new NetException("Trying to encode NetMessageType " + m_type + " to unconnected endpoint!"); throw new NetException("Trying to encode NetMessageType " + m_type + " to unconnected endpoint!");
if (m_numSends == 0)
m_sequenceNumber = conn.GetSendSequenceNumber(m_type); ushort seqNr;
buffer[ptr++] = (byte)m_sequenceNumber; if (m_type >= NetMessageType.UserReliableUnordered)
buffer[ptr++] = (byte)(m_sequenceNumber >> 8); seqNr = conn.GetSendSequenceNumber(m_type); // "disposable" sequence number
else
seqNr = conn.StoreReliableMessage(now, this);
buffer[ptr++] = (byte)seqNr;
buffer[ptr++] = (byte)(seqNr >> 8);
} }
// payload length // payload length
@@ -187,8 +192,6 @@ namespace Lidgren.Network
bdr.Append('|'); bdr.Append('|');
bdr.Append(m_libType.ToString()); bdr.Append(m_libType.ToString());
} }
bdr.Append(" #");
bdr.Append(m_sequenceNumber);
bdr.Append(" sent "); bdr.Append(" sent ");
bdr.Append(m_numSends); bdr.Append(m_numSends);
bdr.Append(" times]"); bdr.Append(" times]");

View File

@@ -229,7 +229,7 @@ namespace Lidgren.Network
// TODO: use throttling here // TODO: use throttling here
// //
int ptr = um.Encode(m_sendBuffer, 0, null); int ptr = um.Encode(now, m_sendBuffer, 0, null);
if (recipient.Address.Equals(IPAddress.Broadcast)) if (recipient.Address.Equals(IPAddress.Broadcast))
{ {
@@ -577,12 +577,12 @@ namespace Lidgren.Network
return NetDeliveryMethod.Unreliable; return NetDeliveryMethod.Unreliable;
} }
internal void SendImmediately(NetConnection conn, NetOutgoingMessage msg) internal void SendImmediately(double now, NetConnection conn, NetOutgoingMessage msg)
{ {
NetException.Assert(msg.m_type == NetMessageType.Library, "SendImmediately can only send library (non-reliable) messages"); NetException.Assert(msg.m_type == NetMessageType.Library, "SendImmediately can only send library (non-reliable) messages");
msg.m_inQueueCount = 1; msg.m_inQueueCount = 1;
int len = msg.Encode(m_sendBuffer, 0, conn); int len = msg.Encode(now, m_sendBuffer, 0, conn);
Interlocked.Decrement(ref msg.m_inQueueCount); Interlocked.Decrement(ref msg.m_inQueueCount);
SendPacket(len, conn.m_remoteEndpoint, 1); SendPacket(len, conn.m_remoteEndpoint, 1);

View File

@@ -144,14 +144,11 @@ namespace Lidgren.Network
for(int i=0;i<conn.m_storedMessages.Length;i++) for(int i=0;i<conn.m_storedMessages.Length;i++)
{ {
List<NetOutgoingMessage> list = conn.m_storedMessages[i]; var dict = conn.m_storedMessages[i];
if (list != null && list.Count > 0) if (dict != null)
{ {
foreach (NetOutgoingMessage om in conn.m_storedMessages[i]) if (dict.ContainsValue(msg))
{ throw new NetException("Ouch! Recycling stored message!");
if (om == msg)
throw new NetException("Ouch! Recycling stored message!");
}
} }
} }
} }
@@ -237,7 +234,7 @@ namespace Lidgren.Network
else else
retval.Reset(); retval.Reset();
NetException.Assert(retval.m_status != NetIncomingMessageReleaseStatus.ReleasedToApplication); NetException.Assert(retval.m_status == NetIncomingMessageReleaseStatus.NotReleased);
retval.m_incomingType = tp; retval.m_incomingType = tp;
retval.m_senderConnection = null; retval.m_senderConnection = null;
@@ -265,6 +262,8 @@ namespace Lidgren.Network
else else
retval.Reset(); retval.Reset();
NetException.Assert(retval.m_status == NetIncomingMessageReleaseStatus.NotReleased);
retval.m_data = GetStorage(copyLength); retval.m_data = GetStorage(copyLength);
Buffer.BlockCopy(copyFrom, offset, retval.m_data, 0, copyLength); Buffer.BlockCopy(copyFrom, offset, retval.m_data, 0, copyLength);

View File

@@ -27,7 +27,8 @@ namespace ChatClient
Application.SetCompatibleTextRenderingDefault(false); Application.SetCompatibleTextRenderingDefault(false);
MainForm = new Form1(); MainForm = new Form1();
Client = new NetClient(new NetPeerConfiguration("Chat")); NetPeerConfiguration config = new NetPeerConfiguration("Chat");
Client = new NetClient(config);
Client.Start(); Client.Start();
Display("Type 'connect <host>' to connect to a server"); Display("Type 'connect <host>' to connect to a server");

View File

@@ -80,7 +80,7 @@ namespace ChatServer
om.WriteAllProperties(cm, System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.Instance); om.WriteAllProperties(cm, System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.Instance);
Display("Forwarding text from " + cm.Sender + " to all clients: " + cm.Text); Display("Forwarding text from " + cm.Sender + " to all clients: " + cm.Text);
Server.SendMessage(om, Server.Connections, NetDeliveryMethod.ReliableUnordered, 0); Server.SendMessage(om, Server.Connections, NetDeliveryMethod.ReliableOrdered, 0);
break; break;
} }

View File

@@ -118,7 +118,7 @@ namespace ImageServer
// send entire as a large message that will be automatically fragmented by the library // send entire as a large message that will be automatically fragmented by the library
om.Write(ImageData); om.Write(ImageData);
Server.SendMessage(om, inc.SenderConnection, NetDeliveryMethod.ReliableUnordered, 0); Server.SendMessage(om, inc.SenderConnection, NetDeliveryMethod.ReliableOrdered, 0);
// all messages will be sent before disconnect so we can call it here // all messages will be sent before disconnect so we can call it here
// inc.SenderConnection.Disconnect("Bye bye now"); // inc.SenderConnection.Disconnect("Bye bye now");