00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include "wvdbusconn.h"
00012 #include "wvmoniker.h"
00013 #include "wvstrutils.h"
00014 #undef interface // windows
00015 #include <dbus/dbus.h>
00016
00017
00018 static WvString translate(WvStringParm dbus_moniker)
00019 {
00020 WvStringList l;
00021 WvStringList::Iter i(l);
00022
00023 if (!strncasecmp(dbus_moniker, "unix:", 5))
00024 {
00025 WvString path, tmpdir;
00026 l.split(dbus_moniker+5, ",");
00027 for (i.rewind(); i.next(); )
00028 {
00029 if (!strncasecmp(*i, "path=", 5))
00030 path = *i + 5;
00031 else if (!strncasecmp(*i, "abstract=", 9))
00032 path = WvString("@%s", *i + 9);
00033 else if (!strncasecmp(*i, "tmpdir=", 7))
00034 tmpdir = *i + 7;
00035 }
00036 if (!!path)
00037 return WvString("unix:%s", path);
00038 else if (!!tmpdir)
00039 return WvString("unix:%s/dbus.sock", tmpdir);
00040 }
00041 else if (!strncasecmp(dbus_moniker, "tcp:", 4))
00042 {
00043 WvString host, port, family;
00044 l.split(dbus_moniker+4, ",");
00045 for (i.rewind(); i.next(); )
00046 {
00047 if (!strncasecmp(*i, "family=", 7))
00048 family = *i + 7;
00049 else if (!strncasecmp(*i, "host=", 5))
00050 host = *i + 5;
00051 else if (!strncasecmp(*i, "port=", 5))
00052 port = *i + 5;
00053 }
00054 if (!!host && !!port)
00055 return WvString("tcp:%s:%s", host, port);
00056 else if (!!host)
00057 return WvString("tcp:%s", host);
00058 else if (!!port)
00059 return WvString("tcp:0.0.0.0:%s", port);
00060 }
00061
00062 return dbus_moniker;
00063 }
00064
00065
00066 static IWvStream *stream_creator(WvStringParm _s, IObject *)
00067 {
00068 WvString s(_s);
00069
00070 if (!strcasecmp(s, "starter"))
00071 {
00072 WvString startbus(getenv("DBUS_STARTER_ADDRESS"));
00073 if (!!startbus)
00074 return IWvStream::create(translate(startbus));
00075 else
00076 {
00077 WvString starttype(getenv("DBUS_STARTER_BUS_TYPE"));
00078 if (!!starttype && !strcasecmp(starttype, "system"))
00079 s = "system";
00080 else if (!!starttype && !strcasecmp(starttype, "session"))
00081 s = "session";
00082 }
00083 }
00084
00085 if (!strcasecmp(s, "system"))
00086 {
00087
00088
00089
00090
00091
00092 WvString bus(getenv("DBUS_SYSTEM_BUS_ADDRESS"));
00093 if (!!bus)
00094 return IWvStream::create(translate(bus));
00095 }
00096
00097 if (!strcasecmp(s, "session"))
00098 {
00099 WvString bus(getenv("DBUS_SESSION_BUS_ADDRESS"));
00100 if (!!bus)
00101 return IWvStream::create(translate(bus));
00102 }
00103
00104 return IWvStream::create(translate(s));
00105 }
00106
00107 static WvMoniker<IWvStream> reg("dbus", stream_creator);
00108
00109
00110 static int conncount;
00111
00112 WvDBusConn::WvDBusConn(IWvStream *_cloned, IWvDBusAuth *_auth, bool _client)
00113 : WvStreamClone(_cloned),
00114 log(WvString("DBus %s%s",
00115 _client ? "" : "s",
00116 ++conncount), WvLog::Debug5),
00117 pending(10)
00118 {
00119 init(_auth, _client);
00120 }
00121
00122
00123 WvDBusConn::WvDBusConn(WvStringParm moniker, IWvDBusAuth *_auth, bool _client)
00124 : WvStreamClone(IWvStream::create(moniker)),
00125 log(WvString("DBus %s%s",
00126 _client ? "" : "s",
00127 ++conncount), WvLog::Debug5),
00128 pending(10)
00129 {
00130 log("Connecting to '%s'\n", moniker);
00131 init(_auth, _client);
00132 }
00133
00134
00135 void WvDBusConn::init(IWvDBusAuth *_auth, bool _client)
00136 {
00137 log("Initializing.\n");
00138 client = _client;
00139 auth = _auth ? _auth : new WvDBusClientAuth;
00140 authorized = in_post_select = false;
00141 if (!client) set_uniquename(WvString(":%s.0", conncount));
00142
00143 if (!isok()) return;
00144
00145 delay_output(true);
00146
00147
00148
00149
00150 if (client)
00151 send_hello();
00152
00153 try_auth();
00154 }
00155
00156 WvDBusConn::~WvDBusConn()
00157 {
00158 log("Shutting down.\n");
00159 if (geterr())
00160 log("Error was: %s\n", errstr());
00161
00162 close();
00163
00164 delete auth;
00165 }
00166
00167
00168 void WvDBusConn::close()
00169 {
00170 if (!closed)
00171 log("Closing.\n");
00172 WvStreamClone::close();
00173 }
00174
00175
00176 WvString WvDBusConn::uniquename() const
00177 {
00178 return _uniquename;
00179 }
00180
00181
00182 void WvDBusConn::request_name(WvStringParm name, const WvDBusCallback &onreply,
00183 time_t msec_timeout)
00184 {
00185 uint32_t flags = (DBUS_NAME_FLAG_ALLOW_REPLACEMENT |
00186 DBUS_NAME_FLAG_REPLACE_EXISTING);
00187 WvDBusMsg msg("org.freedesktop.DBus", "/org/freedesktop/DBus",
00188 "org.freedesktop.DBus", "RequestName");
00189 msg.append(name).append(flags);
00190 send(msg, onreply, msec_timeout);
00191 }
00192
00193
00194 uint32_t WvDBusConn::send(WvDBusMsg msg)
00195 {
00196 msg.marshal(out_queue);
00197 if (authorized)
00198 {
00199 log(" >> %s\n", msg);
00200 write(out_queue);
00201 }
00202 else
00203 log(" .> %s\n", msg);
00204 return msg.get_serial();
00205 }
00206
00207
00208 void WvDBusConn::send(WvDBusMsg msg, const WvDBusCallback &onreply,
00209 time_t msec_timeout)
00210 {
00211 send(msg);
00212 if (onreply)
00213 add_pending(msg, onreply, msec_timeout);
00214 }
00215
00216
00217 class xxReplyWaiter
00218 {
00219 public:
00220 WvDBusMsg *reply;
00221
00222 xxReplyWaiter()
00223 { reply = NULL; }
00224 ~xxReplyWaiter()
00225 { delete reply; }
00226 bool reply_wait(WvDBusMsg &msg)
00227 { reply = new WvDBusMsg(msg); return true; }
00228 };
00229
00230
00231 WvDBusMsg WvDBusConn::send_and_wait(WvDBusMsg msg, time_t msec_timeout,
00232 wv::function<void(uint32_t)> serial_cb)
00233 {
00234 xxReplyWaiter rw;
00235
00236 send(msg, wv::bind(&xxReplyWaiter::reply_wait, &rw, _1),
00237 msec_timeout);
00238 if (serial_cb)
00239 serial_cb(msg.get_serial());
00240 while (!rw.reply && isok())
00241 runonce();
00242 if (!rw.reply)
00243 return WvDBusError(msg, DBUS_ERROR_FAILED,
00244 WvString("Connection closed (%s) "
00245 "while waiting for reply.",
00246 errstr()));
00247 else
00248 return *rw.reply;
00249 }
00250
00251
00252 void WvDBusConn::out(WvStringParm s)
00253 {
00254 log(" >> %s", s);
00255 print(s);
00256 }
00257
00258
00259 const char *WvDBusConn::in()
00260 {
00261 const char *s = trim_string(getline(0));
00262 if (s)
00263 log("<< %s\n", s);
00264 return s;
00265 }
00266
00267
00268 void WvDBusConn::send_hello()
00269 {
00270 WvDBusMsg msg("org.freedesktop.DBus", "/org/freedesktop/DBus",
00271 "org.freedesktop.DBus", "Hello");
00272 send(msg, wv::bind(&WvDBusConn::_registered, this, _1));
00273 WvDBusMsg msg2("org.freedesktop.DBus", "/org/freedesktop/DBus",
00274 "org.freedesktop.DBus", "AddMatch");
00275 msg2.append("type='signal'");
00276 send(msg2);
00277 }
00278
00279
00280 void WvDBusConn::set_uniquename(WvStringParm s)
00281 {
00282
00283
00284 log("Assigned name '%s'\n", s);
00285 _uniquename = s;
00286 log.app = WvString("DBus %s%s", client ? "" : "s", uniquename());
00287 }
00288
00289
00290 void WvDBusConn::try_auth()
00291 {
00292 bool done = auth->authorize(*this);
00293 if (done)
00294 {
00295
00296 if (out_queue.used())
00297 {
00298 log(" >> (sending enqueued messages)\n");
00299 write(out_queue);
00300 }
00301
00302 authorized = true;
00303 }
00304 }
00305
00306
00307 void WvDBusConn::add_callback(CallbackPri pri, WvDBusCallback cb, void *cookie)
00308 {
00309 callbacks.append(new CallbackInfo(pri, cb, cookie), true);
00310 }
00311
00312
00313 void WvDBusConn::del_callback(void *cookie)
00314 {
00315
00316 CallbackInfoList::Iter i(callbacks);
00317 for (i.rewind(); i.next(); )
00318 if (i->cookie == cookie)
00319 i.xunlink();
00320 }
00321
00322
00323 int WvDBusConn::priority_order(const CallbackInfo *a, const CallbackInfo *b)
00324 {
00325 return a->pri - b->pri;
00326 }
00327
00328 bool WvDBusConn::filter_func(WvDBusMsg &msg)
00329 {
00330 log("<< %s\n", msg);
00331
00332
00333 uint32_t rserial = msg.get_replyserial();
00334 if (rserial)
00335 {
00336 Pending *p = pending[rserial];
00337 if (p)
00338 {
00339 p->cb(msg);
00340 pending.remove(p);
00341 return true;
00342 }
00343 }
00344
00345
00346 CallbackInfoList::Sorter i(callbacks, priority_order);
00347 for (i.rewind(); i.next(); )
00348 {
00349 bool handled = i->cb(msg);
00350 if (handled) return true;
00351 }
00352
00353 return false;
00354 }
00355
00356
00357 WvDBusClientAuth::WvDBusClientAuth()
00358 {
00359 sent_request = false;
00360 }
00361
00362
00363 wvuid_t WvDBusClientAuth::get_uid()
00364 {
00365 return wvgetuid();
00366 }
00367
00368
00369 bool WvDBusClientAuth::authorize(WvDBusConn &c)
00370 {
00371 if (!sent_request)
00372 {
00373 c.write("\0", 1);
00374 WvString uid = get_uid();
00375 c.out("AUTH EXTERNAL %s\r\n\0", WvHexEncoder().strflushstr(uid));
00376 sent_request = true;
00377 }
00378 else
00379 {
00380 const char *line = c.in();
00381 if (line)
00382 {
00383 if (!strncasecmp(line, "OK ", 3))
00384 {
00385 c.out("BEGIN\r\n");
00386 return true;
00387 }
00388 else if (!strncasecmp(line, "ERROR ", 6))
00389 c.seterr("Auth failed: %s", line);
00390 else
00391 c.seterr("Unknown AUTH response: '%s'", line);
00392 }
00393 }
00394
00395 return false;
00396 }
00397
00398
00399 time_t WvDBusConn::mintimeout_msec()
00400 {
00401 WvTime when = 0;
00402 PendingDict::Iter i(pending);
00403 for (i.rewind(); i.next(); )
00404 {
00405 if (!when || when > i->valid_until)
00406 when = i->valid_until;
00407 }
00408 if (!when)
00409 return -1;
00410 else if (when <= wvstime())
00411 return 0;
00412 else
00413 return msecdiff(when, wvstime());
00414 }
00415
00416
00417 bool WvDBusConn::post_select(SelectInfo &si)
00418 {
00419 bool ready = WvStreamClone::post_select(si);
00420 if (si.inherit_request) return ready;
00421
00422 if (in_post_select) return false;
00423 in_post_select = true;
00424
00425 if (!authorized && ready)
00426 try_auth();
00427
00428 if (!alarm_remaining())
00429 {
00430 WvTime now = wvstime();
00431 PendingDict::Iter i(pending);
00432 for (i.rewind(); i.next(); )
00433 {
00434 if (now > i->valid_until)
00435 {
00436 log("Expiring %s\n", i->msg);
00437 expire_pending(i.ptr());
00438 i.rewind();
00439 }
00440 }
00441 }
00442
00443 if (authorized && ready)
00444 {
00445
00446
00447
00448 bool ran;
00449 do
00450 {
00451 ran = false;
00452 size_t needed = WvDBusMsg::demarshal_bytes_needed(in_queue);
00453 size_t amt = needed - in_queue.used();
00454 if (amt < 4096)
00455 amt = 4096;
00456 read(in_queue, amt);
00457 WvDBusMsg *m;
00458 while ((m = WvDBusMsg::demarshal(in_queue)) != NULL)
00459 {
00460 ran = true;
00461 filter_func(*m);
00462 delete m;
00463 }
00464 } while (ran);
00465 }
00466
00467 alarm(mintimeout_msec());
00468 in_post_select = false;
00469 return false;
00470 }
00471
00472
00473 bool WvDBusConn::isidle()
00474 {
00475 return !out_queue.used() && pending.isempty();
00476 }
00477
00478
00479 void WvDBusConn::expire_pending(Pending *p)
00480 {
00481 if (p)
00482 {
00483 WvDBusCallback xcb(p->cb);
00484 pending.remove(p);
00485 WvDBusError e(p->msg, DBUS_ERROR_FAILED,
00486 "Timed out while waiting for reply");
00487 xcb(e);
00488 }
00489 }
00490
00491
00492 void WvDBusConn::cancel_pending(uint32_t serial)
00493 {
00494 Pending *p = pending[serial];
00495 if (p)
00496 {
00497 WvDBusCallback xcb(p->cb);
00498 WvDBusMsg msg(p->msg);
00499 pending.remove(p);
00500 WvDBusError e(msg, DBUS_ERROR_FAILED,
00501 "Canceled while waiting for reply");
00502 xcb(e);
00503 }
00504 }
00505
00506
00507 void WvDBusConn::add_pending(WvDBusMsg &msg, WvDBusCallback cb,
00508 time_t msec_timeout)
00509 {
00510 uint32_t serial = msg.get_serial();
00511 assert(serial);
00512 if (pending[serial])
00513 cancel_pending(serial);
00514 pending.add(new Pending(msg, cb, msec_timeout), true);
00515 alarm(mintimeout_msec());
00516 }
00517
00518
00519 bool WvDBusConn::_registered(WvDBusMsg &msg)
00520 {
00521 WvDBusMsg::Iter i(msg);
00522 _uniquename = i.getnext().get_str();
00523 set_uniquename(_uniquename);
00524 return true;
00525 }
00526