1
0
mirror of https://github.com/lidgren/lidgren-network-gen3.git synced 2026-05-16 15:16:33 +09:00

Major refactoring of sending messages to multiple recipients

This commit is contained in:
lidgren
2010-06-21 19:22:21 +00:00
parent d843a5944a
commit 14d3f3b390
18 changed files with 417 additions and 302 deletions

View File

@@ -30,10 +30,10 @@ namespace Lidgren.Network
{
private static readonly NetFragmentationInfo s_genericFragmentationInfo = new NetFragmentationInfo();
private readonly NetPeer m_owner;
internal readonly NetPeer m_owner;
internal readonly IPEndPoint m_remoteEndpoint;
internal double m_lastHeardFrom;
internal NetQueue<NetOutgoingMessage> m_unsentMessages;
internal readonly NetQueue<NetSending> m_unsentMessages;
internal NetConnectionStatus m_status;
private NetConnectionStatus m_visibleStatus;
private double m_lastSentUnsentMessages;
@@ -91,10 +91,10 @@ namespace Lidgren.Network
m_owner = owner;
m_peerConfiguration = m_owner.m_configuration;
m_remoteEndpoint = remoteEndpoint;
m_unsentMessages = new NetQueue<NetOutgoingMessage>(16);
m_fragmentGroups = new Dictionary<int, NetIncomingMessage>();
m_status = NetConnectionStatus.None;
m_visibleStatus = NetConnectionStatus.None;
m_unsentMessages = new NetQueue<NetSending>(8);
double now = NetTime.Now;
m_nextPing = now + 5.0f;
@@ -143,21 +143,41 @@ namespace Lidgren.Network
}
// queue resends
foreach (NetSending send in m_unackedSends)
{
if (now > send.NextResend)
{
m_unsentMessages.EnqueueFirst(send);
send.SetNextResend(this);
}
}
/*
if (!m_storedMessagesNotEmpty.IsEmpty())
{
int first = m_storedMessagesNotEmpty.GetFirstSetIndex();
#if DEBUG
// slow slow verification
for (int i = 0; i < first; i++)
if (m_storedMessages[i] != null && m_storedMessages[i].Count > 0)
throw new NetException("m_storedMessagesNotEmpty mismatch; first is " + first + " but actual first is " + i);
if (m_storedMessages[first] == null || m_storedMessages[first].Count < 1)
throw new NetException("m_storedMessagesNotEmpty failure; first is " + first + ", but that entry is empty!");
#endif
for (int i = first; i < m_storedMessages.Length; i++)
{
if (m_storedMessagesNotEmpty.Get(i))
{
Dictionary<ushort, NetOutgoingMessage> dict = m_storedMessages[i];
RestartCheck:
foreach (ushort seqNr in m_storedMessages[i].Keys)
{
NetOutgoingMessage om = dict[seqNr];
if (now >= om.m_nextResendTime)
{
Resend(now, seqNr, om);
break; // need to break out here; collection may have been modified
goto RestartCheck; // need to break out here; collection may have been modified
}
}
}
@@ -169,6 +189,7 @@ namespace Lidgren.Network
#endif
}
}
*/
}
// send unsent messages; high priority first
@@ -199,13 +220,14 @@ namespace Lidgren.Network
if (m_throttleDebt >= throttleThreshold)
break;
NetOutgoingMessage msg = m_unsentMessages.TryDequeue();
if (msg == null)
NetSending send = m_unsentMessages.TryDequeue();
if (send == null)
continue;
Interlocked.Decrement(ref msg.m_inQueueCount);
send.NumSends++;
NetOutgoingMessage msg = send.Message;
int msgPayloadLength = msg.LengthBytes;
msg.m_lastSentTime = now;
if (ptr > 0)
{
@@ -233,9 +255,27 @@ namespace Lidgren.Network
// encode message
//
ptr = msg.Encode(now, buffer, ptr, this);
if (send.FragmentGroupId > 0)
ptr = msg.EncodeFragmented(buffer, ptr, send, mtu);
else
ptr = msg.EncodeUnfragmented(buffer, ptr, send.MessageType, send.SequenceNumber);
numIncludedMessages++;
if (send.MessageType >= NetMessageType.UserReliableUnordered)
{
// store for reliability
if (send.NumSends == 0)
m_unackedSends.Add(send);
}
else
{
// unreliable message; recycle if all sendings done
int unfin = msg.m_numUnfinishedSendings;
msg.m_numUnfinishedSendings = unfin - 1;
if (unfin <= 1)
m_owner.Recycle(msg);
}
// room to piggyback some acks?
if (m_acknowledgesToSend.Count > 0)
{
@@ -250,14 +290,11 @@ namespace Lidgren.Network
}
}
if (msg.m_type == NetMessageType.Library && msg.m_libType == NetMessageLibraryType.Disconnect)
if (send.MessageType == NetMessageType.Library && msg.m_libType == NetMessageLibraryType.Disconnect)
{
FinishDisconnect();
break;
}
if (msg.m_inQueueCount < 1)
m_owner.Recycle(msg);
}
if (ptr > 0)
@@ -340,7 +377,7 @@ namespace Lidgren.Network
{
// Expected sequence number
AcceptMessage(mtp, isFragment, channelSequenceNumber, ptr, payloadLengthBits);
ExpectedReliableSequenceArrived(reliableSlot, isFragment);
return;
}
@@ -368,13 +405,12 @@ namespace Lidgren.Network
}
// It's an early reliable message
recList[channelSequenceNumber] = true;
m_owner.LogVerbose("Received early reliable message: " + channelSequenceNumber);
//
// It's not a duplicate; mark as received. Release if it's unordered, else withhold
//
recList[channelSequenceNumber] = true;
if (ndm == NetDeliveryMethod.ReliableUnordered)
{
@@ -563,43 +599,58 @@ namespace Lidgren.Network
return;
}
public void SendMessage(NetOutgoingMessage msg, NetDeliveryMethod method)
internal void SendLibrary(NetOutgoingMessage msg)
{
if (msg.IsSent)
throw new NetException("Message has already been sent!");
msg.m_type = (NetMessageType)method;
EnqueueOutgoingMessage(msg);
NetException.Assert(msg.m_libType != NetMessageLibraryType.Error);
NetSending send = new NetSending(msg, NetMessageType.Library, 0);
msg.m_wasSent = true;
msg.m_numUnfinishedSendings++;
m_unsentMessages.Enqueue(send);
}
public void SendMessage(NetOutgoingMessage msg, NetDeliveryMethod method, int sequenceChannel)
public void SendMessage(NetOutgoingMessage msg, NetDeliveryMethod method)
{
SendMessage(msg, method, 0);
}
public bool SendMessage(NetOutgoingMessage msg, NetDeliveryMethod method, int sequenceChannel)
{
NetException.Assert(msg.m_libType == NetMessageLibraryType.Error, "Use SendLibrary() instead!");
if (msg.IsSent)
throw new NetException("Message has already been sent!");
NetException.Assert(sequenceChannel >= 0 && sequenceChannel < NetConstants.NetChannelsPerDeliveryMethod, "Sequence channel must be between 0 and NetConstants.NetChannelsPerDeliveryMethod (" + NetConstants.NetChannelsPerDeliveryMethod + ")");
msg.m_type = (NetMessageType)((int)method + sequenceChannel);
EnqueueOutgoingMessage(msg);
if (m_owner == null)
return false; // we've been disposed
msg.m_wasSent = true;
NetMessageType tp = (NetMessageType)((int)method + sequenceChannel);
return EnqueueSendMessage(msg, tp);
}
// called by user and network thread
internal void EnqueueOutgoingMessage(NetOutgoingMessage msg)
{
if (m_owner == null)
return; // we've been disposed
internal bool EnqueueSendMessage(NetOutgoingMessage msg, NetMessageType tp)
{
int msgLen = msg.LengthBytes;
int mtu = m_owner.m_configuration.m_maximumTransmissionUnit;
if (msgLen <= mtu)
{
Interlocked.Increment(ref msg.m_inQueueCount);
m_unsentMessages.Enqueue(msg);
return;
NetSending send = new NetSending(msg, tp, GetSendSequenceNumber(tp));
msg.m_numUnfinishedSendings++;
send.SetNextResend(this);
m_unsentMessages.Enqueue(send);
return true;
}
#if DEBUG
if ((int)msg.m_type < (int)NetMessageType.UserReliableUnordered)
if (tp < NetMessageType.UserReliableUnordered)
{
// unreliable
m_owner.LogWarning("Sending more than MTU (currently " + mtu + ") bytes unreliably is not recommended!");
@@ -609,6 +660,7 @@ namespace Lidgren.Network
// message must be fragmented
int fgi = Interlocked.Increment(ref m_nextFragmentGroupId);
// TODO: loop group id?
int numFragments = (msgLen + mtu - 1) / mtu;
@@ -616,16 +668,16 @@ namespace Lidgren.Network
{
int flen = (i == numFragments - 1 ? (msgLen - (mtu * (numFragments - 1))) : mtu);
NetOutgoingMessage fm = m_owner.CreateMessage(flen);
fm.m_fragmentGroupId = fgi;
fm.m_fragmentNumber = i;
fm.m_fragmentTotalCount = numFragments;
fm.Write(msg.m_data, mtu * i, flen);
fm.m_type = msg.m_type;
Interlocked.Increment(ref fm.m_inQueueCount);
m_unsentMessages.Enqueue(fm);
NetSending fs = new NetSending(msg, tp, GetSendSequenceNumber(tp));
fs.FragmentGroupId = fgi;
fs.FragmentNumber = i;
fs.FragmentTotalCount = numFragments;
msg.m_numUnfinishedSendings++;
m_unsentMessages.Enqueue(fs);
fs.SetNextResend(this);
}
return true;
}
public void Disconnect(string byeMessage)
@@ -640,26 +692,13 @@ namespace Lidgren.Network
// loosen up throttling
m_throttleDebt = -m_owner.m_configuration.m_throttlePeakBytes;
// shorten resend times
for (int i = 0; i < m_storedMessages.Length; i++)
{
Dictionary<ushort, NetOutgoingMessage> dict = m_storedMessages[i];
if (dict != null)
{
try
{
foreach (NetOutgoingMessage om in dict.Values)
om.m_nextResendTime = (om.m_nextResendTime * 0.8) - 0.05;
}
catch (InvalidOperationException)
{
// ok, collection was modified, never mind then - it was worth a shot
}
}
}
// instantly resend all unacked
double now = NetTime.Now;
foreach(NetSending send in m_unackedSends)
send.NextResend = now;
NetOutgoingMessage bye = m_owner.CreateLibraryMessage(NetMessageLibraryType.Disconnect, byeMessage);
EnqueueOutgoingMessage(bye);
SendLibrary(bye);
}
public void Approve()