#include "stdafx.h" #include "RylServerDispatch.h" #include "../Packet/PacketBase.h" #include "../Packet/PacketCommand.h" #include "../XORCrypt/XORCrypt.h" #include #include #include #include #include #include #include #include enum RylServerDispatchFlags { PROCESS_PACKET_NOW = (1 << 0), SUSPEND_RECV = (1 << 1) }; CRylServerDispatch::CRylServerDispatch(CSession& Session, unsigned long dwMaxProcessPacketPerPulse) : CPacketDispatch(Session), m_SendStream(Session), m_dwMaxProcessPacketPerPulse(dwMaxProcessPacketPerPulse), m_dwFlags(0) { } CRylServerDispatch::~CRylServerDispatch() { BufferQueueLock::Syncronize sync(m_BufferQueueLock); m_ProcessQueue.clear(); } CBufferFactory& CRylServerDispatch::GetBufferFactory() { return m_Session.GetPolicy().GetBufferFactory(); } INET_Addr& CRylServerDispatch::GetRemoteAddr() { return m_Session.GetRemoteAddr(); } INET_Addr& CRylServerDispatch::GetLocalAddr() { return m_Session.GetLocalAddr(); } bool CRylServerDispatch::Shutdown() { return m_Session.Shutdown(); } void CRylServerDispatch::CloseSession() { m_Session.CloseSession(); } void CRylServerDispatch::LogErrorPacket(const char* szDetailText, const unsigned char cCmd) { ERRLOG5(g_SessionLog, "SP:0x%p/DP:0x%p/IP:%15s/PktCmd:0x%02x/%s", &m_Session, this, m_Session.GetRemoteAddr().get_addr_string(), cCmd, szDetailText); } CRylServerDispatch::CreationResult CRylServerDispatch::CreatePacket( CBufferFactory& bufferFactory, CBufferQueue& bufferQueue, char* const lpStream_In, unsigned long* dwStreamSize_InOut) { CXORCrypt& Crypt = CXORCrypt::GetInstance(); CBuffer* lpBuffer = 0; PktBase* lpPktBase = 0; char* lpBufferPos = lpStream_In; unsigned long dwStreamSize = *dwStreamSize_InOut; unsigned long dwDecompressedSize = 0; unsigned long dwPacketNum = 0; while(sizeof(PktBase) <= dwStreamSize) { lpPktBase = reinterpret_cast(lpBufferPos); // ÆÐŶ Çì´õ µðÄÚµù Crypt.DecodeHeader(lpBufferPos + 1, sizeof(PktBase) - 1, 0, 0); // Çì´õ ±æÀÌ ¾ò¾î ¿À±â. const PktBase::CMDType nPacketCMD = lpPktBase->GetCmd(); const PktBase::LengthType nPacketLength = lpPktBase->GetLen(); // ÆÐŶ Çì´õ°¡ vaildÇÑÁö È®ÀÎÇÑ´Ù. invalidÇÑ °æ¿ì´Â ±×³É ²÷¾î¹ö¸°´Ù. if(StartBit != lpPktBase->GetStartBit()) { return E_INVALID_STARTBIT; } if(dwStreamSize < nPacketLength) { // Àüü ½ºÆ®¸² »çÀÌÁî°¡ ÆÄ½ÌÀ» Çϱ⿡´Â ¸ðÀÚ¶÷. ´Ù½Ã ÀÎÄÚµù ÇÑ ÈÄ ´ÙÀ½ ÅÏÀ» ±â´Ù¸² Crypt.EncodeHeader(lpBufferPos + 1, sizeof(PktBase) - 1, 0, 0); break; } else { // ÆÐŶ µðÄÚµù. if(lpPktBase->IsCrypt()) { Crypt.DecodePacket(lpBufferPos + sizeof(PktBase), nPacketLength - sizeof(PktBase), lpPktBase->GetCodePage()); } // ÆÐŶ ¾ÐÃà ÇØÁ¦. if(lpPktBase->IsCompress()) { dwDecompressedSize = PktMaxLen - sizeof(PktBase); lpBuffer = CREATE_BUFFER(bufferFactory, PktMaxLen); if(0 == lpBuffer) { return E_ALLOCATE_BUFFER_FAILED; } char* szDecompressedPacket = lpBuffer->wr_ptr(); memcpy(szDecompressedPacket, lpPktBase, sizeof(PktBase)); // ¾ÐÃà ÇØÁ¦. if(!CMiniLZO::Decompress( lpBufferPos + sizeof(PktBase), // src Pos nPacketLength - sizeof(PktBase), // src Size szDecompressedPacket + sizeof(PktBase), // dst Pos &dwDecompressedSize)) // dst Size { SAFE_RELEASE_BUFFER(lpBuffer); return E_DECOMPRESS_FAILED; } lpBuffer->wr_ptr(dwDecompressedSize + sizeof(PktBase)); lpPktBase = reinterpret_cast(szDecompressedPacket); lpPktBase->SetLen(static_cast(dwDecompressedSize + sizeof(PktBase))); } else { lpBuffer = CREATE_BUFFER(bufferFactory, nPacketLength); if(NULL == lpBuffer) { return E_ALLOCATE_BUFFER_FAILED; } lpBuffer->push(lpBufferPos, nPacketLength); } // ÆÐŶÀ» ¹è¿­¿¡ ÀúÀå. bufferQueue.enqueue(lpBuffer); dwStreamSize -= nPacketLength; lpBufferPos += nPacketLength; } } *dwStreamSize_InOut -= dwStreamSize; return S_CREATE_SUCCESS; } void CRylServerDispatch::LogErrorPacketCreation(CRylServerDispatch::CreationResult eResult) { const char* lpErrString = 0; switch(eResult) { case E_INVALID_STARTBIT: lpErrString = "Invalid packet startbit"; break; case E_ALLOCATE_BUFFER_FAILED: lpErrString = "Allocate packetbuffer failed"; break; case E_DECOMPRESS_FAILED: lpErrString = "Decompress packet failed"; break; default: lpErrString = "Unknown error occured"; break; } if(0 != lpErrString) { LogErrorPacket(lpErrString, 0); } } void CRylServerDispatch::InternalCheckSuspendRecv(CBufferQueue& bufferQueue) { const unsigned long MAX_PACKET = m_dwMaxProcessPacketPerPulse * 5; const unsigned long CUR_PACKET = bufferQueue.getBufferNum(); if (!(m_dwFlags & SUSPEND_RECV) && MAX_PACKET < CUR_PACKET) { // Recv¸ØÃãÀÌ ¾È °É·Á ÀÖ°í, ÆÐŶ ¼ö°¡ ³Ê¹« ¸¹À¸¸é Recv¸¦ ¸ØÃá´Ù. GetSession().SuspendRecv(); m_dwFlags |= SUSPEND_RECV; } else if ((m_dwFlags & SUSPEND_RECV) && CUR_PACKET < MAX_PACKET) { // Recv¸ØÃãÀÌ °É·Á ÀÖ°í, ÆÐŶ ¼ö°¡ ´Ù½Ã ¿ø»ó ȸº¹µÇ¾î ÀÖÀ¸¸é Recv¸¦ ´Ù½Ã °Ç´Ù. GetSession().ResumeRecv(); m_dwFlags &= ~ SUSPEND_RECV; } } bool CRylServerDispatch::ParsePacket(char *const lpStream_In, unsigned long* dwStreamSize_InOut) { CBufferFactory& bufferFactory = m_Session.GetPolicy().GetBufferFactory(); CBufferQueue bufferQueue; CreationResult eCreationResult = CreatePacket( bufferFactory, bufferQueue, lpStream_In, dwStreamSize_InOut); if(S_CREATE_SUCCESS != eCreationResult) { LogErrorPacketCreation(eCreationResult); return false; } BufferQueueLock::Syncronize sync(m_BufferQueueLock); m_ProcessQueue.splice(bufferQueue); InternalCheckSuspendRecv(m_ProcessQueue); return true; } bool CRylServerDispatch::Dispatch() { CBufferQueue ProcessQueue; CBufferQueue Processed; PktBase* lpPktBase = 0; { BufferQueueLock::Syncronize sync(m_BufferQueueLock); if((m_dwFlags & PROCESS_PACKET_NOW)) { // ÀÌ¹Ì ÆÐŶÀÌ Ã³¸®ÁßÀε¥ ÁøÀÔÇÏ¿´À½. return true; } // ÀÌÁ¦ ÆÐŶÀ» ó¸®ÇÑ´Ù. m_dwFlags |= PROCESS_PACKET_NOW; InternalCheckSuspendRecv(m_ProcessQueue); ProcessQueue.splice(m_ProcessQueue); } for(unsigned long dwCount = 0; dwCount < GetMaxProcessPacketPerPulse(); ++dwCount) { CBuffer* lpBuffer = ProcessQueue.dequeue(); if(0 == lpBuffer) { break; } else { lpPktBase = reinterpret_cast(lpBuffer->rd_ptr()); // ÆÐŶ Åë°è. CPacketStatistics::GetInstance().Recv(lpPktBase->GetCmd(), lpPktBase->GetLen()); // ÆÐŶ ó¸®. bool bResult = DispatchPacket(lpPktBase); Processed.enqueue(lpBuffer); if(!bResult) { return false; } } } if (!ProcessQueue.empty()) { ProcessTooManyPacket(ProcessQueue); } { BufferQueueLock::Syncronize sync(m_BufferQueueLock); if(!ProcessQueue.empty()) { BufferQueueLock::Syncronize sync(m_BufferQueueLock); m_ProcessQueue.splice(ProcessQueue, true); } m_dwFlags &= ~PROCESS_PACKET_NOW; } return true; } template bool greator_second(const T& lhspair, const T& rhspair) { return rhspair.second < lhspair.second; } void CRylServerDispatch::ProcessTooManyPacket(CBufferQueue& bufferQueue) { typedef std::pair PacketInfoPair; typedef std::vector PacketStatistics; const int MAX_BUFFER = 3072; char szBuffer[MAX_BUFFER]; int nTotalLength = _snprintf(szBuffer, MAX_BUFFER, "Processed:%d/Remain:%d/Remain packets(cmd:num)/", m_dwMaxProcessPacketPerPulse, bufferQueue.getBufferNum()); PacketStatistics statistics; PacketStatistics::iterator spos; PacketStatistics::iterator send; statistics.reserve(bufferQueue.getBufferNum()); for(CBuffer* lpBuffer = bufferQueue.getHead(); 0 != lpBuffer; lpBuffer = lpBuffer->next()) { PktBase::CMDType cmd = reinterpret_cast(lpBuffer->rd_ptr())->GetCmd(); spos = statistics.begin(); send = statistics.end(); for(;spos != send; ++spos) { if(spos->first == cmd) { ++spos->second; break; } } if(spos == send) { statistics.push_back(PacketInfoPair(cmd, 1)); } } std::sort(statistics.begin(), statistics.end(), &greator_second); spos = statistics.begin(); send = statistics.end(); for(; spos != send; ++spos) { int nLength = _snprintf(szBuffer + nTotalLength, MAX_BUFFER - nTotalLength, "0x%02x:%5d/", spos->first, spos->second); if(0 < nLength) { nTotalLength += nLength; } } LogErrorPacket(szBuffer, 0); } CSendPacketAllServer::CSendPacketAllServer(const char* szData, unsigned long dwDataLen, unsigned char cPacketCmd) : m_szData(szData), m_dwDataLen(dwDataLen), m_cPacketCmd(cPacketCmd) { } bool CSendPacketAllServer::operator () (unsigned long dwServerID, CPacketDispatch& packetDispatch) { return static_cast(packetDispatch).GetSendStream().PutBuffer( m_szData, m_dwDataLen, m_cPacketCmd); }