00001
00002
00003 #include "pch.h"
00004 #include "network.h"
00005 #include "wait.h"
00006
00007 #define CRYPTOPP_TRACE_NETWORK 0
00008
00009 NAMESPACE_BEGIN(CryptoPP)
00010
00011 unsigned int NonblockingSource::PumpMessages2(unsigned int &messageCount, bool blocking)
00012 {
00013 if (messageCount == 0)
00014 return 0;
00015
00016 unsigned long byteCount = ULONG_MAX;
00017 messageCount = 0;
00018 RETURN_IF_NONZERO(Pump2(byteCount, blocking));
00019 if (!m_messageEndSent && SourceExhausted())
00020 {
00021 RETURN_IF_NONZERO(AttachedTransformation()->Put2(NULL, 0, GetAutoSignalPropagation(), true));
00022 m_messageEndSent = true;
00023 messageCount = 1;
00024 }
00025 return 0;
00026 }
00027
00028 bool NonblockingSink::IsolatedFlush(bool hardFlush, bool blocking)
00029 {
00030 TimedFlush(blocking ? INFINITE_TIME : 0);
00031 return hardFlush && !!GetCurrentBufferSize();
00032 }
00033
00034
00035
00036 #ifdef HIGHRES_TIMER_AVAILABLE
00037
00038 NetworkSource::NetworkSource(BufferedTransformation *attachment)
00039 : NonblockingSource(attachment), m_buf(1024*16)
00040 , m_dataBegin(0), m_dataEnd(0)
00041 , m_waitingForResult(false), m_outputBlocked(false)
00042 {
00043 }
00044
00045 void NetworkSource::GetWaitObjects(WaitObjectContainer &container)
00046 {
00047 if (!m_outputBlocked)
00048 {
00049 if (m_dataBegin == m_dataEnd)
00050 AccessReceiver().GetWaitObjects(container);
00051 else
00052 container.SetNoWait();
00053 }
00054 AttachedTransformation()->GetWaitObjects(container);
00055 }
00056
00057 unsigned int NetworkSource::GeneralPump2(unsigned long &byteCount, bool blockingOutput, unsigned long maxTime, bool checkDelimiter, byte delimiter)
00058 {
00059 NetworkReceiver &receiver = AccessReceiver();
00060
00061 unsigned long maxSize = byteCount;
00062 byteCount = 0;
00063 bool forever = maxTime == INFINITE_TIME;
00064 Timer timer(Timer::MILLISECONDS, forever);
00065 BufferedTransformation *t = AttachedTransformation();
00066
00067 if (m_outputBlocked)
00068 goto DoOutput;
00069
00070 while (true)
00071 {
00072 if (m_dataBegin == m_dataEnd)
00073 {
00074 if (receiver.EofReceived())
00075 break;
00076
00077 if (m_waitingForResult)
00078 {
00079 if (receiver.MustWaitForResult() && !receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime())))
00080 break;
00081
00082 unsigned int recvResult = receiver.GetReceiveResult();
00083 #if CRYPTOPP_TRACE_NETWORK
00084 OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
00085 #endif
00086 m_dataEnd += recvResult;
00087 m_waitingForResult = false;
00088
00089 if (!receiver.MustWaitToReceive() && !receiver.EofReceived() && m_dataEnd != m_buf.size())
00090 goto ReceiveNoWait;
00091 }
00092 else
00093 {
00094 m_dataEnd = m_dataBegin = 0;
00095
00096 if (receiver.MustWaitToReceive())
00097 {
00098 if (!receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime())))
00099 break;
00100
00101 receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd);
00102 m_waitingForResult = true;
00103 }
00104 else
00105 {
00106 ReceiveNoWait:
00107 m_waitingForResult = true;
00108
00109
00110 #if CRYPTOPP_TRACE_NETWORK
00111 OutputDebugString((IntToString((unsigned int)this) + ": Receiving " + IntToString(m_buf.size()-m_dataEnd) + " bytes\n").c_str());
00112 #endif
00113 while (receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd))
00114 {
00115 unsigned int recvResult = receiver.GetReceiveResult();
00116 #if CRYPTOPP_TRACE_NETWORK
00117 OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
00118 #endif
00119 m_dataEnd += recvResult;
00120 if (receiver.EofReceived() || m_dataEnd > m_buf.size() /2)
00121 {
00122 m_waitingForResult = false;
00123 break;
00124 }
00125 }
00126 }
00127 }
00128 }
00129 else
00130 {
00131 m_putSize = STDMIN((unsigned long)m_dataEnd-m_dataBegin, maxSize-byteCount);
00132 if (checkDelimiter)
00133 m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin);
00134
00135 DoOutput:
00136 unsigned int result = t->PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput);
00137 if (result)
00138 {
00139 if (t->Wait(SaturatingSubtract(maxTime, timer.ElapsedTime())))
00140 goto DoOutput;
00141 else
00142 {
00143 m_outputBlocked = true;
00144 return result;
00145 }
00146 }
00147 m_outputBlocked = false;
00148
00149 byteCount += m_putSize;
00150 m_dataBegin += m_putSize;
00151 if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter)
00152 break;
00153 if (byteCount == maxSize)
00154 break;
00155
00156
00157
00158 if (maxTime > 0 && timer.ElapsedTime() > maxTime)
00159 break;
00160 }
00161 }
00162
00163 return 0;
00164 }
00165
00166
00167
00168 NetworkSink::NetworkSink(unsigned int maxBufferSize, unsigned int autoFlushBound)
00169 : m_maxBufferSize(maxBufferSize), m_autoFlushBound(autoFlushBound)
00170 , m_needSendResult(false), m_wasBlocked(false)
00171 , m_buffer(STDMIN(16U*1024U+256, maxBufferSize)), m_skipBytes(0)
00172 , m_speedTimer(Timer::MILLISECONDS), m_byteCountSinceLastTimerReset(0)
00173 , m_currentSpeed(0), m_maxObservedSpeed(0)
00174 {
00175 }
00176
00177 float NetworkSink::ComputeCurrentSpeed()
00178 {
00179 if (m_speedTimer.ElapsedTime() > 1000)
00180 {
00181 m_currentSpeed = m_byteCountSinceLastTimerReset * 1000 / m_speedTimer.ElapsedTime();
00182 m_maxObservedSpeed = STDMAX(m_currentSpeed, m_maxObservedSpeed * 0.98f);
00183 m_byteCountSinceLastTimerReset = 0;
00184 m_speedTimer.StartTimer();
00185
00186 }
00187 return m_currentSpeed;
00188 }
00189
00190 unsigned int NetworkSink::Put2(const byte *inString, unsigned int length, int messageEnd, bool blocking)
00191 {
00192 if (m_skipBytes)
00193 {
00194 assert(length >= m_skipBytes);
00195 inString += m_skipBytes;
00196 length -= m_skipBytes;
00197 }
00198 m_buffer.LazyPut(inString, length);
00199
00200 if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound)
00201 TimedFlush(0, 0);
00202
00203 unsigned int targetSize = messageEnd ? 0 : m_maxBufferSize;
00204 if (blocking)
00205 TimedFlush(INFINITE_TIME, targetSize);
00206
00207 if (m_buffer.CurrentSize() > targetSize)
00208 {
00209 assert(!blocking);
00210 unsigned int blockedBytes = STDMIN(m_buffer.CurrentSize() - targetSize, (unsigned long)length);
00211 m_buffer.UndoLazyPut(blockedBytes);
00212 m_buffer.FinalizeLazyPut();
00213 m_wasBlocked = true;
00214 m_skipBytes += length - blockedBytes;
00215 return STDMAX(blockedBytes, 1U);
00216 }
00217
00218 m_buffer.FinalizeLazyPut();
00219 m_wasBlocked = false;
00220 m_skipBytes = 0;
00221
00222 if (messageEnd)
00223 AccessSender().SendEof();
00224 return 0;
00225 }
00226
00227 unsigned int NetworkSink::TimedFlush(unsigned long maxTime, unsigned int targetSize)
00228 {
00229 NetworkSender &sender = AccessSender();
00230
00231 bool forever = maxTime == INFINITE_TIME;
00232 Timer timer(Timer::MILLISECONDS, forever);
00233 unsigned int totalFlushSize = 0;
00234
00235 while (true)
00236 {
00237 if (m_buffer.CurrentSize() <= targetSize)
00238 break;
00239
00240 if (m_needSendResult)
00241 {
00242 if (sender.MustWaitForResult() && !sender.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime())))
00243 break;
00244
00245 unsigned int sendResult = sender.GetSendResult();
00246 #if CRYPTOPP_TRACE_NETWORK
00247 OutputDebugString((IntToString((unsigned int)this) + ": Sent " + IntToString(sendResult) + " bytes\n").c_str());
00248 #endif
00249 m_buffer.Skip(sendResult);
00250 totalFlushSize += sendResult;
00251 m_needSendResult = false;
00252
00253 if (!m_buffer.AnyRetrievable())
00254 break;
00255 }
00256
00257 unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
00258 if (sender.MustWaitToSend() && !sender.Wait(timeOut))
00259 break;
00260
00261 unsigned int contiguousSize = 0;
00262 const byte *block = m_buffer.Spy(contiguousSize);
00263
00264 #if CRYPTOPP_TRACE_NETWORK
00265 OutputDebugString((IntToString((unsigned int)this) + ": Sending " + IntToString(contiguousSize) + " bytes\n").c_str());
00266 #endif
00267 sender.Send(block, contiguousSize);
00268 m_needSendResult = true;
00269
00270 if (maxTime > 0 && timeOut == 0)
00271 break;
00272 }
00273
00274 m_byteCountSinceLastTimerReset += totalFlushSize;
00275 ComputeCurrentSpeed();
00276
00277 return totalFlushSize;
00278 }
00279
00280 #endif // #ifdef HIGHRES_TIMER_AVAILABLE
00281
00282 NAMESPACE_END