00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include <time.h>
00012 #include <sys/types.h>
00013 #include <assert.h>
00014 #define __WVSTREAM_UNIT_TEST 1
00015 #include "wvstream.h"
00016 #include "wvtimeutils.h"
00017 #include "wvcont.h"
00018
00019 #ifdef _WIN32
00020 #define ENOBUFS WSAENOBUFS
00021 #undef errno
00022 #define errno GetLastError()
00023 #ifdef __GNUC__
00024 #include <sys/socket.h>
00025 #endif
00026 #include "streams.h"
00027 #else
00028 #include <errno.h>
00029 #endif
00030
00031
00032
00033 #if 0
00034 # ifndef _MSC_VER
00035 # define TRACE(x, y...) fprintf(stderr, x, ## y); fflush(stderr);
00036 # else
00037 # define TRACE printf
00038 # endif
00039 #else
00040 # ifndef _MSC_VER
00041 # define TRACE(x, y...)
00042 # else
00043 # define TRACE
00044 # endif
00045 #endif
00046
00047 WvStream *WvStream::globalstream = NULL;
00048
00049 UUID_MAP_BEGIN(WvStream)
00050 UUID_MAP_ENTRY(IObject)
00051 UUID_MAP_ENTRY(IWvStream)
00052 UUID_MAP_END
00053
00054
00055 WvStream::WvStream():
00056 read_requires_writable(NULL),
00057 write_requires_readable(NULL),
00058 uses_continue_select(false),
00059 personal_stack_size(65536),
00060 alarm_was_ticking(false),
00061 stop_read(false),
00062 stop_write(false),
00063 closed(false),
00064 userdata(NULL),
00065 readcb(this, &WvStream::legacy_callback),
00066 max_outbuf_size(0),
00067 outbuf_delayed_flush(false),
00068 is_auto_flush(true),
00069 want_to_flush(true),
00070 is_flushing(false),
00071 queue_min(0),
00072 autoclose_time(0),
00073 alarm_time(wvtime_zero),
00074 last_alarm_check(wvtime_zero)
00075 {
00076 TRACE("Creating wvstream %p\n", this);
00077
00078 #ifdef _WIN32
00079 WSAData wsaData;
00080 int result = WSAStartup(MAKEWORD(2,0), &wsaData);
00081 assert(result == 0);
00082 #endif
00083 }
00084
00085
00086
00087 IWvStream::IWvStream()
00088 {
00089 }
00090
00091
00092 IWvStream::~IWvStream()
00093 {
00094 }
00095
00096
00097 WvStream::~WvStream()
00098 {
00099 TRACE("destroying %p\n", this);
00100 close();
00101
00102
00103
00104
00105 assert(!uses_continue_select || !call_ctx);
00106
00107 call_ctx = 0;
00108 TRACE("done destroying %p\n", this);
00109 }
00110
00111
00112 void WvStream::close()
00113 {
00114 TRACE("flushing in wvstream...\n");
00115 flush(2000);
00116 TRACE("(flushed)\n");
00117
00118 closed = true;
00119
00120 if (!!closecb)
00121 {
00122 IWvStreamCallback cb = closecb;
00123 closecb = 0;
00124 cb(*this);
00125 }
00126
00127
00128
00129
00130 }
00131
00132
00133 void WvStream::autoforward(WvStream &s)
00134 {
00135 setcallback(autoforward_callback, &s);
00136 read_requires_writable = &s;
00137 }
00138
00139
00140 void WvStream::noautoforward()
00141 {
00142 setcallback(0, NULL);
00143 read_requires_writable = NULL;
00144 }
00145
00146
00147 void WvStream::autoforward_callback(WvStream &s, void *userdata)
00148 {
00149 WvStream &s2 = *(WvStream *)userdata;
00150 char buf[1024];
00151 size_t len;
00152
00153 len = s.read(buf, sizeof(buf));
00154
00155 s2.write(buf, len);
00156 }
00157
00158
00159 void WvStream::_callback()
00160 {
00161 execute();
00162 if (!! callfunc)
00163 callfunc(*this, userdata);
00164 }
00165
00166
00167 void *WvStream::_callwrap(void *)
00168 {
00169 _callback();
00170 return NULL;
00171 }
00172
00173
00174 void WvStream::callback()
00175 {
00176 TRACE("(?)");
00177
00178
00179 if (alarm_remaining() == 0)
00180 {
00181 alarm_time = wvtime_zero;
00182 alarm_was_ticking = true;
00183 }
00184 else
00185 alarm_was_ticking = false;
00186
00187 assert(!uses_continue_select || personal_stack_size >= 1024);
00188
00189 #define TEST_CONTINUES_HARSHLY 0
00190 #if TEST_CONTINUES_HARSHLY
00191 #ifndef _WIN32
00192 # warning "Using WvCont for *all* streams for testing!"
00193 #endif
00194 if (1)
00195 #else
00196 if (uses_continue_select && personal_stack_size >= 1024)
00197 #endif
00198 {
00199 if (!call_ctx)
00200 {
00201 call_ctx = WvCont(WvCallback<void*,void*>
00202 (this, &WvStream::_callwrap),
00203 personal_stack_size);
00204 }
00205
00206 call_ctx(NULL);
00207 }
00208 else
00209 _callback();
00210
00211
00212
00213
00214
00215
00216 }
00217
00218
00219 bool WvStream::isok() const
00220 {
00221 return !closed && WvErrorBase::isok();
00222 }
00223
00224
00225 void WvStream::seterr(int _errnum)
00226 {
00227 if (!geterr())
00228 {
00229 WvErrorBase::seterr(_errnum);
00230 close();
00231 }
00232 }
00233
00234
00235 size_t WvStream::read(WvBuf &outbuf, size_t count)
00236 {
00237
00238 size_t free = outbuf.free();
00239 if (count > free)
00240 count = free;
00241
00242 WvDynBuf tmp;
00243 unsigned char *buf = tmp.alloc(count);
00244 size_t len = read(buf, count);
00245 tmp.unalloc(count - len);
00246 outbuf.merge(tmp);
00247 return len;
00248 }
00249
00250
00251 size_t WvStream::write(WvBuf &inbuf, size_t count)
00252 {
00253
00254 size_t avail = inbuf.used();
00255 if (count > avail)
00256 count = avail;
00257 const unsigned char *buf = inbuf.get(count);
00258 size_t len = write(buf, count);
00259 inbuf.unget(count - len);
00260 return len;
00261 }
00262
00263
00264 size_t WvStream::read(void *buf, size_t count)
00265 {
00266 assert(!count || buf);
00267
00268 size_t bufu, i;
00269 unsigned char *newbuf;
00270
00271 bufu = inbuf.used();
00272 if (bufu < queue_min)
00273 {
00274 newbuf = inbuf.alloc(queue_min - bufu);
00275 assert(newbuf);
00276 i = uread(newbuf, queue_min - bufu);
00277 inbuf.unalloc(queue_min - bufu - i);
00278
00279 bufu = inbuf.used();
00280 }
00281
00282 if (bufu < queue_min)
00283 {
00284 maybe_autoclose();
00285 return 0;
00286 }
00287
00288
00289 if (!bufu)
00290 bufu = uread(buf, count);
00291 else
00292 {
00293
00294 if (bufu > count)
00295 bufu = count;
00296
00297 memcpy(buf, inbuf.get(bufu), bufu);
00298 }
00299
00300 TRACE("read obj 0x%08x, bytes %d/%d\n", (unsigned int)this, bufu, count);
00301 maybe_autoclose();
00302 return bufu;
00303 }
00304
00305
00306 size_t WvStream::write(const void *buf, size_t count)
00307 {
00308 assert(!count || buf);
00309 if (!isok() || !buf || !count || stop_write) return 0;
00310
00311 size_t wrote = 0;
00312 if (!outbuf_delayed_flush && !outbuf.used())
00313 {
00314 wrote = uwrite(buf, count);
00315 count -= wrote;
00316 buf = (const unsigned char *)buf + wrote;
00317
00318 }
00319 if (max_outbuf_size != 0)
00320 {
00321 size_t canbuffer = max_outbuf_size - outbuf.used();
00322 if (count > canbuffer)
00323 count = canbuffer;
00324 }
00325 if (count != 0)
00326 {
00327 outbuf.put(buf, count);
00328 wrote += count;
00329 }
00330
00331 if (should_flush())
00332 {
00333 if (is_auto_flush)
00334 flush(0);
00335 else
00336 flush_outbuf(0);
00337 }
00338
00339 return wrote;
00340 }
00341
00342
00343 void WvStream::noread()
00344 {
00345 stop_read = true;
00346 maybe_autoclose();
00347 }
00348
00349
00350 void WvStream::nowrite()
00351 {
00352 stop_write = true;
00353 maybe_autoclose();
00354 }
00355
00356
00357 void WvStream::maybe_autoclose()
00358 {
00359 if (stop_read && stop_write && !outbuf.used() && !inbuf.used() && !closed)
00360 close();
00361 }
00362
00363
00364 bool WvStream::isreadable()
00365 {
00366 return isok() && select(0, true, false, false);
00367 }
00368
00369
00370 bool WvStream::iswritable()
00371 {
00372 return !stop_write && isok() && select(0, false, true, false);
00373 }
00374
00375
00376 char *WvStream::blocking_getline(time_t wait_msec, int separator,
00377 int readahead)
00378 {
00379 assert(separator >= 0);
00380 assert(separator <= 255);
00381
00382
00383
00384 struct timeval timeout_time;
00385 if (wait_msec > 0)
00386 timeout_time = msecadd(wvtime(), wait_msec);
00387
00388 maybe_autoclose();
00389
00390
00391
00392 while (isok())
00393 {
00394
00395 queuemin(0);
00396
00397
00398 if (inbuf.strchr(separator) > 0)
00399 break;
00400 else if (!isok() || stop_read)
00401 break;
00402
00403
00404 queuemin(inbuf.used() + 1);
00405
00406
00407 if (wait_msec > 0)
00408 {
00409 wait_msec = msecdiff(timeout_time, wvtime());
00410 if (wait_msec < 0)
00411 wait_msec = 0;
00412 }
00413
00414
00415
00416 bool hasdata;
00417 if (wait_msec != 0 && uses_continue_select)
00418 hasdata = continue_select(wait_msec);
00419 else
00420 hasdata = select(wait_msec, true, false);
00421
00422 if (!isok())
00423 break;
00424
00425 if (hasdata)
00426 {
00427
00428 WvDynBuf tmp;
00429 unsigned char *buf = tmp.alloc(readahead);
00430 assert(buf);
00431 size_t len = uread(buf, readahead);
00432 tmp.unalloc(readahead - len);
00433 size_t tmp_size = tmp.used();
00434 inbuf.put(tmp.get(tmp_size), tmp_size);
00435 hasdata = len > 0;
00436 }
00437
00438 if (!isok())
00439 break;
00440
00441 if (!hasdata && wait_msec == 0)
00442 return NULL;
00443 }
00444 if (!inbuf.used())
00445 return NULL;
00446
00447
00448 size_t i = 0;
00449 i = inbuf.strchr(separator);
00450 if (i > 0) {
00451 char *eol = (char *)inbuf.mutablepeek(i - 1, 1);
00452 assert(eol);
00453 *eol = 0;
00454 return const_cast<char*>((const char *)inbuf.get(i));
00455 } else {
00456
00457
00458
00459 inbuf.alloc(1)[0] = 0;
00460 return const_cast<char *>((const char *)inbuf.get(inbuf.used()));
00461 }
00462 }
00463
00464
00465 char *WvStream::continue_getline(time_t wait_msec, int separator,
00466 int readahead)
00467 {
00468 assert(false && "not implemented, come back later!");
00469 assert(uses_continue_select);
00470 return NULL;
00471 }
00472
00473
00474 void WvStream::drain()
00475 {
00476 char buf[1024];
00477 while (isreadable())
00478 read(buf, sizeof(buf));
00479 }
00480
00481
00482 bool WvStream::flush(time_t msec_timeout)
00483 {
00484 if (is_flushing) return false;
00485
00486 TRACE("%p flush starts\n", this);
00487
00488 is_flushing = true;
00489 want_to_flush = true;
00490 bool done = flush_internal(msec_timeout)
00491 && flush_outbuf(msec_timeout);
00492 is_flushing = false;
00493
00494 TRACE("flush stops (%d)\n", done);
00495 return done;
00496 }
00497
00498
00499 bool WvStream::should_flush()
00500 {
00501 return want_to_flush;
00502 }
00503
00504
00505 bool WvStream::flush_outbuf(time_t msec_timeout)
00506 {
00507 TRACE("%p flush_outbuf starts (isok=%d)\n", this, isok());
00508 bool outbuf_was_used = outbuf.used();
00509
00510
00511
00512
00513 if (!outbuf_was_used && !autoclose_time && !outbuf_delayed_flush)
00514 {
00515 maybe_autoclose();
00516 return true;
00517 }
00518
00519 WvTime stoptime = msecadd(wvtime(), msec_timeout);
00520
00521
00522 while (outbuf_was_used && isok())
00523 {
00524
00525
00526
00527 size_t attempt = outbuf.optgettable();
00528 size_t real = uwrite(outbuf.get(attempt), attempt);
00529
00530
00531
00532
00533 if (isok() && real < attempt)
00534 {
00535 TRACE("flush_outbuf: unget %d-%d\n", attempt, real);
00536 assert(outbuf.ungettable() >= attempt - real);
00537 outbuf.unget(attempt - real);
00538 }
00539
00540
00541
00542
00543
00544 if (!msec_timeout)
00545 break;
00546 if (msec_timeout >= 0
00547 && (stoptime < wvtime() || !select(msec_timeout, false, true)))
00548 break;
00549
00550 outbuf_was_used = outbuf.used();
00551 }
00552
00553
00554 if (autoclose_time && isok())
00555 {
00556 time_t now = time(NULL);
00557 TRACE("Autoclose enabled for 0x%p - now-time=%ld, buf %d bytes\n",
00558 this, now - autoclose_time, outbuf.used());
00559 if ((flush_internal(0) && !outbuf.used()) || now > autoclose_time)
00560 {
00561 autoclose_time = 0;
00562 close();
00563 }
00564 }
00565
00566 TRACE("flush_outbuf: after autoclose chunk\n");
00567 if (outbuf_delayed_flush && !outbuf_was_used)
00568 want_to_flush = false;
00569
00570 TRACE("flush_outbuf: now isok=%d\n", isok());
00571
00572
00573 if (outbuf_was_used && !isok())
00574 outbuf.zap();
00575
00576 maybe_autoclose();
00577 TRACE("flush_outbuf stops\n");
00578
00579 return !outbuf_was_used;
00580 }
00581
00582
00583 bool WvStream::flush_internal(time_t msec_timeout)
00584 {
00585
00586 return true;
00587 }
00588
00589
00590 int WvStream::getrfd() const
00591 {
00592 return -1;
00593 }
00594
00595
00596 int WvStream::getwfd() const
00597 {
00598 return -1;
00599 }
00600
00601
00602 void WvStream::flush_then_close(int msec_timeout)
00603 {
00604 time_t now = time(NULL);
00605 autoclose_time = now + (msec_timeout + 999) / 1000;
00606
00607 TRACE("Autoclose SETUP for 0x%p - buf %d bytes, timeout %ld sec\n",
00608 this, outbuf.used(), autoclose_time - now);
00609
00610
00611
00612
00613
00614
00615 flush(0);
00616 }
00617
00618
00619 bool WvStream::pre_select(SelectInfo &si)
00620 {
00621 maybe_autoclose();
00622
00623 time_t alarmleft = alarm_remaining();
00624
00625 if (!si.inherit_request && alarmleft == 0)
00626 return true;
00627
00628 if (!si.inherit_request)
00629 {
00630 si.wants.readable |= readcb;
00631 si.wants.writable |= writecb;
00632 si.wants.isexception |= exceptcb;
00633 }
00634
00635
00636 if (si.wants.readable && inbuf.used() && inbuf.used() >= queue_min)
00637 return true;
00638 if (alarmleft >= 0
00639 && (alarmleft < si.msec_timeout || si.msec_timeout < 0))
00640 si.msec_timeout = alarmleft + 10;
00641 return false;
00642 }
00643
00644
00645 bool WvStream::post_select(SelectInfo &si)
00646 {
00647
00648
00649
00650
00651
00652
00653 if (should_flush())
00654 flush(0);
00655 if (!si.inherit_request && alarm_remaining() == 0)
00656 return true;
00657 return false;
00658 }
00659
00660
00661 bool WvStream::_build_selectinfo(SelectInfo &si, time_t msec_timeout,
00662 bool readable, bool writable, bool isexcept, bool forceable)
00663 {
00664 FD_ZERO(&si.read);
00665 FD_ZERO(&si.write);
00666 FD_ZERO(&si.except);
00667
00668 if (forceable)
00669 {
00670 si.wants.readable = readcb;
00671 si.wants.writable = writecb;
00672 si.wants.isexception = exceptcb;
00673 }
00674 else
00675 {
00676 si.wants.readable = readable;
00677 si.wants.writable = writable;
00678 si.wants.isexception = isexcept;
00679 }
00680
00681 si.max_fd = -1;
00682 si.msec_timeout = msec_timeout;
00683 si.inherit_request = ! forceable;
00684 si.global_sure = false;
00685
00686 if (!isok()) return false;
00687
00688 bool sure = pre_select(si);
00689 if (globalstream && forceable && (globalstream != this))
00690 {
00691 WvStream *s = globalstream;
00692 globalstream = NULL;
00693 si.global_sure = s->xpre_select(si, SelectRequest(false, false, false));
00694 globalstream = s;
00695 }
00696 if (sure || si.global_sure)
00697 si.msec_timeout = 0;
00698 return sure;
00699 }
00700
00701
00702 int WvStream::_do_select(SelectInfo &si)
00703 {
00704
00705 timeval tv;
00706 tv.tv_sec = si.msec_timeout / 1000;
00707 tv.tv_usec = (si.msec_timeout % 1000) * 1000;
00708
00709 #ifdef _WIN32
00710
00711 SOCKET fakefd = socket(PF_INET, SOCK_STREAM, 0);
00712 FD_SET(fakefd, &si.except);
00713 #endif
00714
00715
00716 int sel = ::select(si.max_fd+1, &si.read, &si.write, &si.except,
00717 si.msec_timeout >= 0 ? &tv : (timeval*)NULL);
00718
00719
00720
00721
00722
00723
00724 if (sel < 0
00725 && errno != EAGAIN && errno != EINTR
00726 && errno != EBADF
00727 && errno != ENOBUFS
00728 )
00729 {
00730 seterr(errno);
00731 }
00732 #ifdef _WIN32
00733 ::close(fakefd);
00734 #endif
00735 TRACE("select() returned %d\n", sel);
00736 return sel;
00737 }
00738
00739
00740 bool WvStream::_process_selectinfo(SelectInfo &si, bool forceable)
00741 {
00742 if (!isok()) return false;
00743
00744 bool sure = post_select(si);
00745 if (globalstream && forceable && (globalstream != this))
00746 {
00747 WvStream *s = globalstream;
00748 globalstream = NULL;
00749 si.global_sure = s->xpost_select(si, SelectRequest(false, false, false))
00750 || si.global_sure;
00751 globalstream = s;
00752 }
00753 return sure;
00754 }
00755
00756
00757 bool WvStream::_select(time_t msec_timeout,
00758 bool readable, bool writable, bool isexcept, bool forceable)
00759 {
00760 SelectInfo si;
00761 bool sure = _build_selectinfo(si, msec_timeout,
00762 readable, writable, isexcept, forceable);
00763
00764 if (!isok())
00765 return false;
00766
00767
00768
00769
00770
00771
00772
00773
00774
00775 int sel = _do_select(si);
00776 if (sel >= 0)
00777 sure = _process_selectinfo(si, forceable) || sure;
00778 if (si.global_sure && globalstream && forceable && (globalstream != this))
00779 globalstream->callback();
00780 return sure;
00781 }
00782
00783
00784 IWvStream::SelectRequest WvStream::get_select_request()
00785 {
00786 return IWvStream::SelectRequest(readcb, writecb, exceptcb);
00787 }
00788
00789
00790 void WvStream::force_select(bool readable, bool writable, bool isexception)
00791 {
00792 if (readable)
00793 readcb = IWvStreamCallback(this, &WvStream::legacy_callback);
00794 if (writable)
00795 writecb = IWvStreamCallback(this, &WvStream::legacy_callback);
00796 if (isexception)
00797 exceptcb = IWvStreamCallback(this, &WvStream::legacy_callback);
00798 }
00799
00800
00801 void WvStream::undo_force_select(bool readable, bool writable, bool isexception)
00802 {
00803 if (readable)
00804 readcb = 0;
00805 if (writable)
00806 writecb = 0;
00807 if (isexception)
00808 exceptcb = 0;
00809 }
00810
00811
00812 void WvStream::alarm(time_t msec_timeout)
00813 {
00814 if (msec_timeout >= 0)
00815 alarm_time = msecadd(wvtime(), msec_timeout);
00816 else
00817 alarm_time = wvtime_zero;
00818 }
00819
00820
00821 time_t WvStream::alarm_remaining()
00822 {
00823 if (alarm_time.tv_sec)
00824 {
00825 WvTime now = wvtime();
00826
00827
00828 if (now < last_alarm_check)
00829 {
00830 #if 0 // okay, I give up. Time just plain goes backwards on some systems.
00831
00832 if (msecdiff(last_alarm_check, now) > 200)
00833 fprintf(stderr, " ************* TIME WENT BACKWARDS! "
00834 "(%ld:%ld %ld:%ld)\n",
00835 last_alarm_check.tv_sec, last_alarm_check.tv_usec,
00836 now.tv_sec, now.tv_usec);
00837 #endif
00838 alarm_time = tvdiff(alarm_time, tvdiff(last_alarm_check, now));
00839 }
00840
00841 last_alarm_check = now;
00842
00843 time_t remaining = msecdiff(alarm_time, now);
00844 if (remaining < 0)
00845 remaining = 0;
00846 return remaining;
00847 }
00848 return -1;
00849 }
00850
00851
00852 bool WvStream::continue_select(time_t msec_timeout)
00853 {
00854 assert(uses_continue_select);
00855
00856
00857
00858 assert(call_ctx);
00859
00860 if (msec_timeout >= 0)
00861 alarm(msec_timeout);
00862
00863 alarm(msec_timeout);
00864 WvCont::yield();
00865 alarm(-1);
00866
00867
00868
00869
00870
00871
00872
00873 TRACE("hello-%p\n", this);
00874 return !alarm_was_ticking || select(0, readcb, writecb, exceptcb);
00875 }
00876
00877
00878 void WvStream::terminate_continue_select()
00879 {
00880 close();
00881 call_ctx = 0;
00882 }
00883
00884
00885 const WvAddr *WvStream::src() const
00886 {
00887 return NULL;
00888 }
00889
00890
00891 void WvStream::setcallback(WvStreamCallback _callfunc, void *_userdata)
00892 {
00893 callfunc = _callfunc;
00894 userdata = _userdata;
00895 call_ctx = 0;
00896 }
00897
00898
00899 void WvStream::legacy_callback(IWvStream& s)
00900 {
00901 execute();
00902 if (!! callfunc)
00903 callfunc(*this, userdata);
00904 }
00905
00906
00907 IWvStreamCallback WvStream::setreadcallback(IWvStreamCallback _callback)
00908 {
00909 IWvStreamCallback tmp = readcb;
00910
00911 readcb = _callback;
00912
00913 return tmp;
00914 }
00915
00916
00917 IWvStreamCallback WvStream::setwritecallback(IWvStreamCallback _callback)
00918 {
00919 IWvStreamCallback tmp = writecb;
00920
00921 writecb = _callback;
00922
00923 return tmp;
00924 }
00925
00926
00927 IWvStreamCallback WvStream::setexceptcallback(IWvStreamCallback _callback)
00928 {
00929 IWvStreamCallback tmp = exceptcb;
00930
00931 exceptcb = _callback;
00932
00933 return tmp;
00934 }
00935
00936
00937 IWvStreamCallback WvStream::setclosecallback(IWvStreamCallback _callback)
00938 {
00939 IWvStreamCallback tmp = closecb;
00940 if (isok())
00941 closecb = _callback;
00942 else
00943 {
00944
00945 closecb = 0;
00946 if (!!_callback)
00947 _callback(*this);
00948 }
00949 return tmp;
00950 }
00951
00952
00953 void WvStream::unread(WvBuf &unreadbuf, size_t count)
00954 {
00955 WvDynBuf tmp;
00956 tmp.merge(unreadbuf, count);
00957 tmp.merge(inbuf);
00958 inbuf.zap();
00959 inbuf.merge(tmp);
00960 }