wvstream.cc

00001 /*
00002  * Worldvisions Weaver Software:
00003  *   Copyright (C) 1997-2002 Net Integration Technologies, Inc.
00004  * 
00005  * Unified support for streams, that is, sequences of bytes that may or
00006  * may not be ready for read/write at any given time.
00007  * 
00008  * We provide typical read and write routines, as well as a select() function
00009  * for each stream.
00010  */
00011 #include <time.h>
00012 #include <sys/types.h>
00013 #include <assert.h>
00014 #define __WVSTREAM_UNIT_TEST 1
00015 #include "wvstream.h"
00016 #include "wvtimeutils.h"
00017 #include "wvcont.h"
00018 
00019 #ifdef _WIN32
00020 #define ENOBUFS WSAENOBUFS
00021 #undef errno
00022 #define errno GetLastError()
00023 #ifdef __GNUC__
00024 #include <sys/socket.h>
00025 #endif
00026 #include "streams.h"
00027 #else
00028 #include <errno.h>
00029 #endif
00030 
00031 // enable this to add some read/write trace messages (this can be VERY
00032 // verbose)
00033 #if 0
00034 # ifndef _MSC_VER
00035 #  define TRACE(x, y...) fprintf(stderr, x, ## y); fflush(stderr);
00036 # else
00037 #  define TRACE printf
00038 # endif
00039 #else
00040 # ifndef _MSC_VER
00041 #  define TRACE(x, y...)
00042 # else
00043 #  define TRACE
00044 # endif
00045 #endif
00046 
00047 WvStream *WvStream::globalstream = NULL;
00048 
00049 UUID_MAP_BEGIN(WvStream)
00050   UUID_MAP_ENTRY(IObject)
00051   UUID_MAP_ENTRY(IWvStream)
00052   UUID_MAP_END
00053 
00054 
00055 WvStream::WvStream():
00056     read_requires_writable(NULL),
00057     write_requires_readable(NULL),
00058     uses_continue_select(false),
00059     personal_stack_size(65536),
00060     alarm_was_ticking(false),
00061     stop_read(false),
00062     stop_write(false),
00063     closed(false),
00064     userdata(NULL),
00065     readcb(this, &WvStream::legacy_callback),
00066     max_outbuf_size(0),
00067     outbuf_delayed_flush(false),
00068     is_auto_flush(true),
00069     want_to_flush(true),
00070     is_flushing(false),
00071     queue_min(0),
00072     autoclose_time(0),
00073     alarm_time(wvtime_zero),
00074     last_alarm_check(wvtime_zero)
00075 {
00076     TRACE("Creating wvstream %p\n", this);
00077     
00078 #ifdef _WIN32
00079     WSAData wsaData;
00080     int result = WSAStartup(MAKEWORD(2,0), &wsaData); 
00081     assert(result == 0);
00082 #endif
00083 }
00084 
00085 
00086 // FIXME: interfaces (IWvStream) shouldn't have implementations!
00087 IWvStream::IWvStream()
00088 {
00089 }
00090 
00091 
00092 IWvStream::~IWvStream()
00093 {
00094 }
00095 
00096 
00097 WvStream::~WvStream()
00098 {
00099     TRACE("destroying %p\n", this);
00100     close();
00101     
00102     // if this assertion fails, then uses_continue_select is true, but you
00103     // didn't call terminate_continue_select() or close() before destroying
00104     // your object.  Shame on you!
00105     assert(!uses_continue_select || !call_ctx);
00106     
00107     call_ctx = 0; // finish running the suspended callback, if any
00108     TRACE("done destroying %p\n", this);
00109 }
00110 
00111 
00112 void WvStream::close()
00113 {
00114     TRACE("flushing in wvstream...\n");
00115     flush(2000); // fixme: should not hardcode this stuff
00116     TRACE("(flushed)\n");
00117 
00118     closed = true;
00119     
00120     if (!!closecb)
00121     {
00122         IWvStreamCallback cb = closecb;
00123         closecb = 0; // ensure callback is only called once
00124         cb(*this);
00125     }
00126     
00127     // I would like to delete call_ctx here, but then if someone calls
00128     // close() from *inside* a continuable callback, we explode.  Oops!
00129     //call_ctx = 0; // destroy the context, if necessary
00130 }
00131 
00132 
00133 void WvStream::autoforward(WvStream &s)
00134 {
00135     setcallback(autoforward_callback, &s);
00136     read_requires_writable = &s;
00137 }
00138 
00139 
00140 void WvStream::noautoforward()
00141 {
00142     setcallback(0, NULL);
00143     read_requires_writable = NULL;
00144 }
00145 
00146 
00147 void WvStream::autoforward_callback(WvStream &s, void *userdata)
00148 {
00149     WvStream &s2 = *(WvStream *)userdata;
00150     char buf[1024];
00151     size_t len;
00152     
00153     len = s.read(buf, sizeof(buf));
00154     // fprintf(stderr, "autoforward read %d bytes\n", (int)len);
00155     s2.write(buf, len);
00156 }
00157 
00158 
00159 void WvStream::_callback()
00160 {
00161     execute();
00162     if (!! callfunc)
00163         callfunc(*this, userdata);
00164 }
00165 
00166 
00167 void *WvStream::_callwrap(void *)
00168 {
00169     _callback();
00170     return NULL;
00171 }
00172 
00173 
00174 void WvStream::callback()
00175 {
00176     TRACE("(?)");
00177     
00178     // if the alarm has gone off and we're calling callback... good!
00179     if (alarm_remaining() == 0)
00180     {
00181         alarm_time = wvtime_zero;
00182         alarm_was_ticking = true;
00183     }
00184     else
00185         alarm_was_ticking = false;
00186     
00187     assert(!uses_continue_select || personal_stack_size >= 1024);
00188 
00189 #define TEST_CONTINUES_HARSHLY 0
00190 #if TEST_CONTINUES_HARSHLY
00191 #ifndef _WIN32
00192 # warning "Using WvCont for *all* streams for testing!"
00193 #endif
00194     if (1)
00195 #else
00196     if (uses_continue_select && personal_stack_size >= 1024)
00197 #endif
00198     {
00199         if (!call_ctx) // no context exists yet!
00200         {
00201             call_ctx = WvCont(WvCallback<void*,void*>
00202                               (this, &WvStream::_callwrap),
00203                               personal_stack_size);
00204         }
00205         
00206         call_ctx(NULL);
00207     }
00208     else
00209         _callback();
00210 
00211     // if this assertion fails, a derived class's virtual execute() function
00212     // didn't call its parent's execute() function, and we didn't make it
00213     // all the way back up to WvStream::execute().  This doesn't always
00214     // matter right now, but it could lead to obscure bugs later, so we'll
00215     // enforce it.
00216 }
00217 
00218 
00219 bool WvStream::isok() const
00220 {
00221     return !closed && WvErrorBase::isok();
00222 }
00223 
00224 
00225 void WvStream::seterr(int _errnum)
00226 {
00227     if (!geterr()) // no pre-existing error
00228     {
00229         WvErrorBase::seterr(_errnum);
00230         close();
00231     }
00232 }
00233 
00234 
00235 size_t WvStream::read(WvBuf &outbuf, size_t count)
00236 {
00237     // for now, just wrap the older read function
00238     size_t free = outbuf.free();
00239     if (count > free)
00240         count = free;
00241 
00242     WvDynBuf tmp;
00243     unsigned char *buf = tmp.alloc(count);
00244     size_t len = read(buf, count);
00245     tmp.unalloc(count - len);
00246     outbuf.merge(tmp);
00247     return len;
00248 }
00249 
00250 
00251 size_t WvStream::write(WvBuf &inbuf, size_t count)
00252 {
00253     // for now, just wrap the older write function
00254     size_t avail = inbuf.used();
00255     if (count > avail)
00256         count = avail;
00257     const unsigned char *buf = inbuf.get(count);
00258     size_t len = write(buf, count);
00259     inbuf.unget(count - len);
00260     return len;
00261 }
00262 
00263 
00264 size_t WvStream::read(void *buf, size_t count)
00265 {
00266     assert(!count || buf);
00267     
00268     size_t bufu, i;
00269     unsigned char *newbuf;
00270 
00271     bufu = inbuf.used();
00272     if (bufu < queue_min)
00273     {
00274         newbuf = inbuf.alloc(queue_min - bufu);
00275         assert(newbuf);
00276         i = uread(newbuf, queue_min - bufu);
00277         inbuf.unalloc(queue_min - bufu - i);
00278         
00279         bufu = inbuf.used();
00280     }
00281     
00282     if (bufu < queue_min)
00283     {
00284         maybe_autoclose();
00285         return 0;
00286     }
00287         
00288     // if buffer is empty, do a hard read
00289     if (!bufu)
00290         bufu = uread(buf, count);
00291     else
00292     {
00293         // otherwise just read from the buffer
00294         if (bufu > count)
00295             bufu = count;
00296     
00297         memcpy(buf, inbuf.get(bufu), bufu);
00298     }
00299     
00300     TRACE("read  obj 0x%08x, bytes %d/%d\n", (unsigned int)this, bufu, count);
00301     maybe_autoclose();
00302     return bufu;
00303 }
00304 
00305 
00306 size_t WvStream::write(const void *buf, size_t count)
00307 {
00308     assert(!count || buf);
00309     if (!isok() || !buf || !count || stop_write) return 0;
00310     
00311     size_t wrote = 0;
00312     if (!outbuf_delayed_flush && !outbuf.used())
00313     {
00314         wrote = uwrite(buf, count);
00315         count -= wrote;
00316         buf = (const unsigned char *)buf + wrote;
00317         // if (!count) return wrote; // short circuit if no buffering needed
00318     }
00319     if (max_outbuf_size != 0)
00320     {
00321         size_t canbuffer = max_outbuf_size - outbuf.used();
00322         if (count > canbuffer)
00323             count = canbuffer; // can't write the whole amount
00324     }
00325     if (count != 0)
00326     {
00327         outbuf.put(buf, count);
00328         wrote += count;
00329     }
00330 
00331     if (should_flush())
00332     {
00333         if (is_auto_flush)
00334             flush(0);
00335         else 
00336             flush_outbuf(0);
00337     }
00338 
00339     return wrote;
00340 }
00341 
00342 
00343 void WvStream::noread()
00344 {
00345     stop_read = true;
00346     maybe_autoclose();
00347 }
00348 
00349 
00350 void WvStream::nowrite()
00351 {
00352     stop_write = true;
00353     maybe_autoclose();
00354 }
00355 
00356 
00357 void WvStream::maybe_autoclose()
00358 {
00359     if (stop_read && stop_write && !outbuf.used() && !inbuf.used() && !closed)
00360         close();
00361 }
00362 
00363 
00364 bool WvStream::isreadable()
00365 {
00366     return isok() && select(0, true, false, false);
00367 }
00368 
00369 
00370 bool WvStream::iswritable()
00371 {
00372     return !stop_write && isok() && select(0, false, true, false);
00373 }
00374 
00375 
00376 char *WvStream::blocking_getline(time_t wait_msec, int separator,
00377                                  int readahead)
00378 {
00379     assert(separator >= 0);
00380     assert(separator <= 255);
00381     
00382     //assert(uses_continue_select || wait_msec == 0);
00383 
00384     struct timeval timeout_time;
00385     if (wait_msec > 0)
00386         timeout_time = msecadd(wvtime(), wait_msec);
00387     
00388     maybe_autoclose();
00389 
00390     // if we get here, we either want to wait a bit or there is data
00391     // available.
00392     while (isok())
00393     {
00394         // fprintf(stderr, "(inbuf used = %d)\n", inbuf.used()); fflush(stderr);
00395         queuemin(0);
00396     
00397         // if there is a newline already, we have enough data.
00398         if (inbuf.strchr(separator) > 0)
00399             break;
00400         else if (!isok() || stop_read)    // uh oh, stream is in trouble.
00401             break;
00402 
00403         // make select not return true until more data is available
00404         queuemin(inbuf.used() + 1);
00405 
00406         // compute remaining timeout
00407         if (wait_msec > 0)
00408         {
00409             wait_msec = msecdiff(timeout_time, wvtime());
00410             if (wait_msec < 0)
00411                 wait_msec = 0;
00412         }
00413         
00414         // FIXME: this is blocking_getline.  It shouldn't
00415         // call continue_select()!
00416         bool hasdata;
00417         if (wait_msec != 0 && uses_continue_select)
00418             hasdata = continue_select(wait_msec);
00419         else
00420             hasdata = select(wait_msec, true, false);
00421         
00422         if (!isok())
00423             break;
00424 
00425         if (hasdata)
00426         {
00427             // read a few bytes
00428             WvDynBuf tmp;
00429             unsigned char *buf = tmp.alloc(readahead);
00430             assert(buf);
00431             size_t len = uread(buf, readahead);
00432             tmp.unalloc(readahead - len);
00433             size_t tmp_size = tmp.used();
00434             inbuf.put(tmp.get(tmp_size), tmp_size);
00435             hasdata = len > 0; // enough?
00436         }
00437 
00438         if (!isok())
00439             break;
00440         
00441         if (!hasdata && wait_msec == 0)
00442             return NULL; // handle timeout
00443     }
00444     if (!inbuf.used())
00445         return NULL;
00446 
00447     // return the appropriate data
00448     size_t i = 0;
00449     i = inbuf.strchr(separator);
00450     if (i > 0) {
00451         char *eol = (char *)inbuf.mutablepeek(i - 1, 1);
00452         assert(eol);
00453         *eol = 0;
00454         return const_cast<char*>((const char *)inbuf.get(i));
00455     } else {
00456         // handle "EOF without newline" condition
00457         // FIXME: it's very silly that buffers can't return editable
00458         // char* arrays.
00459         inbuf.alloc(1)[0] = 0; // null-terminate it
00460         return const_cast<char *>((const char *)inbuf.get(inbuf.used()));
00461     }
00462 }
00463 
00464 
00465 char *WvStream::continue_getline(time_t wait_msec, int separator,
00466                                  int readahead)
00467 {
00468     assert(false && "not implemented, come back later!");
00469     assert(uses_continue_select);
00470     return NULL;
00471 }
00472 
00473 
00474 void WvStream::drain()
00475 {
00476     char buf[1024];
00477     while (isreadable())
00478         read(buf, sizeof(buf));
00479 }
00480 
00481 
00482 bool WvStream::flush(time_t msec_timeout)
00483 {
00484     if (is_flushing) return false;
00485     
00486     TRACE("%p flush starts\n", this);
00487 
00488     is_flushing = true;
00489     want_to_flush = true;
00490     bool done = flush_internal(msec_timeout) // any other internal buffers
00491         && flush_outbuf(msec_timeout);  // our own outbuf
00492     is_flushing = false;
00493 
00494     TRACE("flush stops (%d)\n", done);
00495     return done;
00496 }
00497 
00498 
00499 bool WvStream::should_flush()
00500 {
00501     return want_to_flush;
00502 }
00503 
00504 
00505 bool WvStream::flush_outbuf(time_t msec_timeout)
00506 {
00507     TRACE("%p flush_outbuf starts (isok=%d)\n", this, isok());
00508     bool outbuf_was_used = outbuf.used();
00509     
00510     // do-nothing shortcut for speed
00511     // FIXME: definitely makes a "measurable" difference...
00512     //   but is it worth the risk?
00513     if (!outbuf_was_used && !autoclose_time && !outbuf_delayed_flush)
00514     {
00515         maybe_autoclose();
00516         return true;
00517     }
00518     
00519     WvTime stoptime = msecadd(wvtime(), msec_timeout);
00520     
00521     // flush outbuf
00522     while (outbuf_was_used && isok())
00523     {
00524 //      fprintf(stderr, "%p: fd:%d/%d, used:%d\n", 
00525 //              this, getrfd(), getwfd(), outbuf.used());
00526         
00527         size_t attempt = outbuf.optgettable();
00528         size_t real = uwrite(outbuf.get(attempt), attempt);
00529         
00530         // WARNING: uwrite() may have messed up our outbuf!
00531         // This probably only happens if uwrite() closed the stream because
00532         // of an error, so we'll check isok().
00533         if (isok() && real < attempt)
00534         {
00535             TRACE("flush_outbuf: unget %d-%d\n", attempt, real);
00536             assert(outbuf.ungettable() >= attempt - real);
00537             outbuf.unget(attempt - real);
00538         }
00539         
00540         // since post_select() can call us, and select() calls post_select(),
00541         // we need to be careful not to call select() if we don't need to!
00542         // post_select() will only call us with msec_timeout==0, and we don't
00543         // need to do select() in that case anyway.
00544         if (!msec_timeout)
00545             break;
00546         if (msec_timeout >= 0 
00547           && (stoptime < wvtime() || !select(msec_timeout, false, true)))
00548             break;
00549         
00550         outbuf_was_used = outbuf.used();
00551     }
00552 
00553     // handle autoclose
00554     if (autoclose_time && isok())
00555     {
00556         time_t now = time(NULL);
00557         TRACE("Autoclose enabled for 0x%p - now-time=%ld, buf %d bytes\n", 
00558               this, now - autoclose_time, outbuf.used());
00559         if ((flush_internal(0) && !outbuf.used()) || now > autoclose_time)
00560         {
00561             autoclose_time = 0; // avoid infinite recursion!
00562             close();
00563         }
00564     }
00565 
00566     TRACE("flush_outbuf: after autoclose chunk\n");
00567     if (outbuf_delayed_flush && !outbuf_was_used)
00568         want_to_flush = false;
00569     
00570     TRACE("flush_outbuf: now isok=%d\n", isok());
00571 
00572     // if we can't flush the outbuf, at least empty it!
00573     if (outbuf_was_used && !isok())
00574         outbuf.zap();
00575 
00576     maybe_autoclose();
00577     TRACE("flush_outbuf stops\n");
00578     
00579     return !outbuf_was_used;
00580 }
00581 
00582 
00583 bool WvStream::flush_internal(time_t msec_timeout)
00584 {
00585     // once outbuf emptied, that's it for most streams
00586     return true;
00587 }
00588 
00589 
00590 int WvStream::getrfd() const
00591 {
00592     return -1;
00593 }
00594 
00595 
00596 int WvStream::getwfd() const
00597 {
00598     return -1;
00599 }
00600 
00601 
00602 void WvStream::flush_then_close(int msec_timeout)
00603 {
00604     time_t now = time(NULL);
00605     autoclose_time = now + (msec_timeout + 999) / 1000;
00606     
00607     TRACE("Autoclose SETUP for 0x%p - buf %d bytes, timeout %ld sec\n", 
00608             this, outbuf.used(), autoclose_time - now);
00609 
00610     // as a fast track, we _could_ close here: but that's not a good idea,
00611     // since flush_then_close() deals with obscure situations, and we don't
00612     // want the caller to use it incorrectly.  So we make things _always_
00613     // break when the caller forgets to call select() later.
00614     
00615     flush(0);
00616 }
00617 
00618 
00619 bool WvStream::pre_select(SelectInfo &si)
00620 {
00621     maybe_autoclose();
00622     
00623     time_t alarmleft = alarm_remaining();
00624     
00625     if (!si.inherit_request && alarmleft == 0)
00626         return true; // alarm has rung
00627 
00628     if (!si.inherit_request)
00629     {
00630         si.wants.readable |= readcb;
00631         si.wants.writable |= writecb;
00632         si.wants.isexception |= exceptcb;
00633     }
00634     
00635     // handle read-ahead buffering
00636     if (si.wants.readable && inbuf.used() && inbuf.used() >= queue_min)
00637         return true; // already ready
00638     if (alarmleft >= 0
00639       && (alarmleft < si.msec_timeout || si.msec_timeout < 0))
00640         si.msec_timeout = alarmleft + 10;
00641     return false;
00642 }
00643 
00644 
00645 bool WvStream::post_select(SelectInfo &si)
00646 {
00647     // FIXME: need sane buffer flush support for non FD-based streams
00648     // FIXME: need read_requires_writable and write_requires_readable
00649     //        support for non FD-based streams
00650     
00651     // note: flush(nonzero) might call select(), but flush(0) never does,
00652     // so this is safe.
00653     if (should_flush())
00654         flush(0);
00655     if (!si.inherit_request && alarm_remaining() == 0)
00656         return true; // alarm ticked
00657     return false;
00658 }
00659 
00660 
00661 bool WvStream::_build_selectinfo(SelectInfo &si, time_t msec_timeout,
00662     bool readable, bool writable, bool isexcept, bool forceable)
00663 {
00664     FD_ZERO(&si.read);
00665     FD_ZERO(&si.write);
00666     FD_ZERO(&si.except);
00667     
00668     if (forceable)
00669     {
00670         si.wants.readable = readcb;
00671         si.wants.writable = writecb;
00672         si.wants.isexception = exceptcb;
00673     }
00674     else
00675     {
00676         si.wants.readable = readable;
00677         si.wants.writable = writable;
00678         si.wants.isexception = isexcept;
00679     }
00680     
00681     si.max_fd = -1;
00682     si.msec_timeout = msec_timeout;
00683     si.inherit_request = ! forceable;
00684     si.global_sure = false;
00685 
00686     if (!isok()) return false;
00687 
00688     bool sure = pre_select(si);
00689     if (globalstream && forceable && (globalstream != this))
00690     {
00691         WvStream *s = globalstream;
00692         globalstream = NULL; // prevent recursion
00693         si.global_sure = s->xpre_select(si, SelectRequest(false, false, false));
00694         globalstream = s;
00695     }
00696     if (sure || si.global_sure)
00697         si.msec_timeout = 0;
00698     return sure;
00699 }
00700 
00701 
00702 int WvStream::_do_select(SelectInfo &si)
00703 {
00704     // prepare timeout
00705     timeval tv;
00706     tv.tv_sec = si.msec_timeout / 1000;
00707     tv.tv_usec = (si.msec_timeout % 1000) * 1000;
00708     
00709 #ifdef _WIN32
00710     // selecting on an empty set of sockets doesn't cause a delay in win32.
00711     SOCKET fakefd = socket(PF_INET, SOCK_STREAM, 0);
00712     FD_SET(fakefd, &si.except);
00713 #endif    
00714     
00715     // block
00716     int sel = ::select(si.max_fd+1, &si.read, &si.write, &si.except,
00717         si.msec_timeout >= 0 ? &tv : (timeval*)NULL);
00718 
00719     // handle errors.
00720     //   EAGAIN and EINTR don't matter because they're totally normal.
00721     //   ENOBUFS is hopefully transient.
00722     //   EBADF is kind of gross and might imply that something is wrong,
00723     //      but it happens sometimes...
00724     if (sel < 0 
00725       && errno != EAGAIN && errno != EINTR 
00726       && errno != EBADF
00727       && errno != ENOBUFS
00728       )
00729     {
00730         seterr(errno);
00731     }
00732 #ifdef _WIN32
00733     ::close(fakefd);
00734 #endif
00735     TRACE("select() returned %d\n", sel);
00736     return sel;
00737 }
00738 
00739 
00740 bool WvStream::_process_selectinfo(SelectInfo &si, bool forceable)
00741 {
00742     if (!isok()) return false;
00743     
00744     bool sure = post_select(si);
00745     if (globalstream && forceable && (globalstream != this))
00746     {
00747         WvStream *s = globalstream;
00748         globalstream = NULL; // prevent recursion
00749         si.global_sure = s->xpost_select(si, SelectRequest(false, false, false))
00750                                          || si.global_sure;
00751         globalstream = s;
00752     }
00753     return sure;
00754 }
00755 
00756 
00757 bool WvStream::_select(time_t msec_timeout,
00758     bool readable, bool writable, bool isexcept, bool forceable)
00759 {
00760     SelectInfo si;
00761     bool sure = _build_selectinfo(si, msec_timeout,
00762                                   readable, writable, isexcept, forceable);
00763     
00764     if (!isok())
00765         return false;
00766     
00767     // the eternal question: if 'sure' is true already, do we need to do the
00768     // rest of this stuff?  If we do, it might increase fairness a bit, but
00769     // it encourages select()ing when we know something fishy has happened -
00770     // when a stream is !isok() in a list, for example, pre_select() returns
00771     // true.  If that's the case, our SelectInfo structure might not be
00772     // quite right (eg. it might be selecting on invalid fds).  That doesn't
00773     // sound *too* bad, so let's go for the fairness.
00774 
00775     int sel = _do_select(si);
00776     if (sel >= 0)
00777         sure = _process_selectinfo(si, forceable) || sure; // note the order
00778     if (si.global_sure && globalstream && forceable && (globalstream != this))
00779         globalstream->callback();
00780     return sure;
00781 }
00782 
00783 
00784 IWvStream::SelectRequest WvStream::get_select_request()
00785 {
00786     return IWvStream::SelectRequest(readcb, writecb, exceptcb);
00787 }
00788 
00789 
00790 void WvStream::force_select(bool readable, bool writable, bool isexception)
00791 {
00792     if (readable)
00793         readcb = IWvStreamCallback(this, &WvStream::legacy_callback);
00794     if (writable)
00795         writecb = IWvStreamCallback(this, &WvStream::legacy_callback);
00796     if (isexception)
00797         exceptcb = IWvStreamCallback(this, &WvStream::legacy_callback);
00798 }
00799 
00800 
00801 void WvStream::undo_force_select(bool readable, bool writable, bool isexception)
00802 {
00803     if (readable)
00804         readcb = 0;
00805     if (writable)
00806         writecb = 0;
00807     if (isexception)
00808         exceptcb = 0;
00809 }
00810 
00811 
00812 void WvStream::alarm(time_t msec_timeout)
00813 {
00814     if (msec_timeout >= 0)
00815         alarm_time = msecadd(wvtime(), msec_timeout);
00816     else
00817         alarm_time = wvtime_zero;
00818 }
00819 
00820 
00821 time_t WvStream::alarm_remaining()
00822 {
00823     if (alarm_time.tv_sec)
00824     {
00825         WvTime now = wvtime();
00826 
00827         // Time is going backward!
00828         if (now < last_alarm_check)
00829         {
00830 #if 0 // okay, I give up.  Time just plain goes backwards on some systems.
00831             // warn only if it's a "big" difference (sigh...)
00832             if (msecdiff(last_alarm_check, now) > 200)
00833                 fprintf(stderr, " ************* TIME WENT BACKWARDS! "
00834                         "(%ld:%ld %ld:%ld)\n",
00835                         last_alarm_check.tv_sec, last_alarm_check.tv_usec,
00836                         now.tv_sec, now.tv_usec);
00837 #endif
00838             alarm_time = tvdiff(alarm_time, tvdiff(last_alarm_check, now));
00839         }
00840 
00841         last_alarm_check = now;
00842 
00843         time_t remaining = msecdiff(alarm_time, now);
00844         if (remaining < 0)
00845             remaining = 0;
00846         return remaining;
00847     }
00848     return -1;
00849 }
00850 
00851 
00852 bool WvStream::continue_select(time_t msec_timeout)
00853 {
00854     assert(uses_continue_select);
00855     
00856     // if this assertion triggers, you probably tried to do continue_select()
00857     // while inside terminate_continue_select().
00858     assert(call_ctx);
00859     
00860     if (msec_timeout >= 0)
00861         alarm(msec_timeout);
00862 
00863     alarm(msec_timeout);
00864     WvCont::yield();
00865     alarm(-1); // cancel the still-pending alarm, or it might go off later!
00866     
00867     // when we get here, someone has jumped back into our task.
00868     // We have to select(0) here because it's possible that the alarm was 
00869     // ticking _and_ data was available.  This is aggravated especially if
00870     // msec_delay was zero.  Note that running select() here isn't
00871     // inefficient, because if the alarm was expired then pre_select()
00872     // returned true anyway and short-circuited the previous select().
00873     TRACE("hello-%p\n", this);
00874     return !alarm_was_ticking || select(0, readcb, writecb, exceptcb);
00875 }
00876 
00877 
00878 void WvStream::terminate_continue_select()
00879 {
00880     close();
00881     call_ctx = 0; // destroy the context, if necessary
00882 }
00883 
00884 
00885 const WvAddr *WvStream::src() const
00886 {
00887     return NULL;
00888 }
00889 
00890 
00891 void WvStream::setcallback(WvStreamCallback _callfunc, void *_userdata)
00892 { 
00893     callfunc = _callfunc;
00894     userdata = _userdata;
00895     call_ctx = 0; // delete any in-progress WvCont
00896 }
00897 
00898 
00899 void WvStream::legacy_callback(IWvStream& s)
00900 {
00901     execute();
00902     if (!! callfunc)
00903         callfunc(*this, userdata);
00904 }
00905 
00906 
00907 IWvStreamCallback WvStream::setreadcallback(IWvStreamCallback _callback)
00908 {
00909     IWvStreamCallback tmp = readcb;
00910 
00911     readcb = _callback;
00912 
00913     return tmp;
00914 }
00915 
00916 
00917 IWvStreamCallback WvStream::setwritecallback(IWvStreamCallback _callback)
00918 {
00919     IWvStreamCallback tmp = writecb;
00920 
00921     writecb = _callback;
00922 
00923     return tmp;
00924 }
00925 
00926 
00927 IWvStreamCallback WvStream::setexceptcallback(IWvStreamCallback _callback)
00928 {
00929     IWvStreamCallback tmp = exceptcb;
00930 
00931     exceptcb = _callback;
00932 
00933     return tmp;
00934 }
00935 
00936 
00937 IWvStreamCallback WvStream::setclosecallback(IWvStreamCallback _callback)
00938 {
00939     IWvStreamCallback tmp = closecb;
00940     if (isok())
00941         closecb = _callback;
00942     else
00943     {
00944         // already closed?  notify immediately!
00945         closecb = 0;
00946         if (!!_callback)
00947             _callback(*this);
00948     }
00949     return tmp;
00950 }
00951 
00952 
00953 void WvStream::unread(WvBuf &unreadbuf, size_t count)
00954 {
00955     WvDynBuf tmp;
00956     tmp.merge(unreadbuf, count);
00957     tmp.merge(inbuf);
00958     inbuf.zap();
00959     inbuf.merge(tmp);
00960 }

Generated on Sun Sep 24 20:10:52 2006 for WvStreams by  doxygen 1.4.7