1 module hio.drivers.kqueue; 2 3 version(OSX): 4 5 import std.datetime; 6 import std.conv; 7 import std.string; 8 import std.container; 9 import std.stdio; 10 import std.exception; 11 import std.experimental.logger; 12 import std.typecons; 13 import std.experimental.allocator; 14 import std.experimental.allocator.mallocator; 15 16 import core.memory; 17 18 import std.algorithm.comparison: max; 19 import core.sys.posix.fcntl: open, O_RDONLY; 20 import core.sys.posix.unistd: close; 21 22 import core.sys.darwin.sys.event; 23 24 import core.sys.posix.signal; 25 import core.stdc.stdint : intptr_t, uintptr_t; 26 import core.stdc.string: strerror; 27 import core.stdc.errno: errno; 28 29 import hio.events; 30 import hio.common; 31 32 //enum : short { 33 // EVFILT_READ = (-1), 34 // EVFILT_WRITE = (-2), 35 // EVFILT_AIO = (-3), /* attached to aio requests */ 36 // EVFILT_VNODE = (-4), /* attached to vnodes */ 37 // EVFILT_PROC = (-5), /* attached to struct proc */ 38 // EVFILT_SIGNAL = (-6), /* attached to struct proc */ 39 // EVFILT_TIMER = (-7), /* timers */ 40 // EVFILT_MACHPORT = (-8), /* Mach portsets */ 41 // EVFILT_FS = (-9), /* Filesystem events */ 42 // EVFILT_USER = (-10), /* User events */ 43 // /* (-11) unused */ 44 // EVFILT_VM = (-12) /* Virtual memory events */ 45 //} 46 47 //enum : ushort { 48 ///* actions */ 49 // EV_ADD = 0x0001, /* add event to kq (implies enable) */ 50 // EV_DELETE = 0x0002, /* delete event from kq */ 51 // EV_ENABLE = 0x0004, /* enable event */ 52 // EV_DISABLE = 0x0008 /* disable event (not reported) */ 53 //} 54 55 //struct kevent_t { 56 // uintptr_t ident; /* identifier for this event */ 57 // short filter; /* filter for event */ 58 // ushort flags; /* general flags */ 59 // uint fflags; /* filter-specific flags */ 60 // intptr_t data; /* filter-specific data */ 61 // void* udata; 62 //} 63 64 //extern(C) int kqueue() @safe @nogc nothrow; 65 //extern(C) int kevent(int kqueue_fd, const kevent_t *events, int ne, const kevent_t *events, int ne,timespec* timeout) @safe @nogc nothrow; 66 67 auto s_kevent(A...)(A args) @trusted @nogc nothrow { 68 return kevent(args); 69 } 70 71 Timer udataToTimer(T)(T udata) @trusted { 72 return cast(Timer)udata; 73 } 74 75 struct NativeEventLoopImpl { 76 immutable bool native = true; 77 immutable string _name = "kqueue"; 78 @disable this(this) {} 79 private { 80 bool stopped = false; 81 enum MAXEVENTS = 512; 82 83 int kqueue_fd = -1; // interface to kernel 84 int in_index; 85 int ready; 86 87 timespec ts; 88 89 kevent_t[MAXEVENTS] in_events; 90 kevent_t[MAXEVENTS] out_events; 91 92 RedBlackTree!Timer timers; // this is timers contaiers 93 Timer[] overdue; // timers added with expiration in past placed here 94 95 Signal[][int] signals; // this is signals container 96 97 FileEventHandler[] fileHandlers; 98 99 // CircBuff!NotificationDelivery 100 // notificationsQueue; 101 102 //HandlerDelegate[] userEventHandlers; 103 } 104 void initialize() @trusted nothrow { 105 if ( kqueue_fd == -1) { 106 kqueue_fd = kqueue(); 107 } 108 debug try{tracef("kqueue_fd=%d", kqueue_fd);}catch(Exception e){} 109 timers = new RedBlackTree!Timer(); 110 fileHandlers = Mallocator.instance.makeArray!FileEventHandler(16*1024); 111 GC.addRange(fileHandlers.ptr, fileHandlers.length*FileEventHandler.sizeof); 112 } 113 void deinit() @trusted { 114 debug tracef("deinit"); 115 if ( kqueue_fd != -1 ) 116 { 117 close(kqueue_fd); 118 kqueue_fd = -1; 119 } 120 in_index = 0; 121 timers = null; 122 GC.removeRange(&fileHandlers[0]); 123 Mallocator.instance.dispose(fileHandlers); 124 //Mallocator.instance.dispose(userEventHandlers); 125 } 126 int get_kernel_id() pure @safe nothrow @nogc { 127 return kqueue_fd; 128 } 129 void stop() @safe pure { 130 debug trace("mark eventloop as stopped"); 131 stopped = true; 132 } 133 134 timespec _calculate_timespec(SysTime deadline) @safe { 135 timespec ts; 136 Duration delta = deadline - Clock.currTime; 137 delta = max(delta, 0.seconds); 138 debug tracef("delta = %s", delta); 139 auto ds = delta.split!("seconds", "nsecs"); 140 ts.tv_sec = cast(typeof(timespec.tv_sec))ds.seconds; 141 ts.tv_nsec = cast(typeof(timespec.tv_nsec))ds.nsecs; 142 return ts; 143 } 144 145 void run(Duration d) @safe { 146 147 immutable bool runInfinitely = (d == Duration.max); 148 SysTime deadline; 149 timespec* wait; 150 151 if ( !runInfinitely ) { 152 deadline = Clock.currTime + d; 153 } 154 155 debug tracef("evl run for %s", d); 156 157 scope(exit) { 158 stopped = false; 159 } 160 161 while(!stopped) { 162 // 163 // handle user events(notifications) 164 // 165 //auto counter = notificationsQueue.Size * 10; 166 //while(!notificationsQueue.empty){ 167 // auto nd = notificationsQueue.get(); 168 // Notification n = nd._n; 169 // Broadcast b = nd._broadcast; 170 // n.handler(b); 171 // counter--; 172 // enforce(counter > 0, "Can't clear notificatioinsQueue"); 173 // } 174 // 175 // handle overdue timers 176 // 177 while (overdue.length > 0) { 178 // execute timers which user requested with negative delay 179 Timer t = overdue[0]; 180 overdue = overdue[1..$]; 181 debug tracef("execute overdue %s", t); 182 HandlerDelegate h = t._handler; 183 try { 184 h(AppEvent.TMO); 185 } catch (Exception e) { 186 errorf("Uncaught exception: %s", e.msg); 187 } 188 } 189 if (stopped) { 190 break; 191 } 192 ts = _calculate_timespec(deadline); 193 194 wait = runInfinitely ? 195 null 196 : &ts; 197 198 debug tracef("waiting for %s", wait is null?"forever":"%s".format(*wait)); 199 debug tracef("waiting events %s", in_events[0..in_index]); 200 ready = s_kevent(kqueue_fd, 201 cast(kevent_t*)&in_events[0], in_index, 202 cast(kevent_t*)&out_events[0], MAXEVENTS, 203 wait); 204 in_index = 0; 205 debug tracef("kevent returned %d events", ready); 206 debug tracef(""); 207 208 209 if ( ready < 0 ) { 210 error("kevent returned error %s".format(s_strerror(errno))); 211 } 212 enforce(ready >= 0); 213 if ( ready == 0 ) { 214 debug trace("kevent timedout and no events to process"); 215 return; 216 } 217 // 218 // handle kernel events 219 // 220 foreach(i; 0..ready) { 221 if ( stopped ) { 222 break; 223 } 224 auto e = out_events[i]; 225 debug tracef("got kevent[%d] %s, data: %d, udata: %0x", i, e, e.data, e.udata); 226 227 switch (e.filter) { 228 case EVFILT_READ: 229 debug tracef("Read on fd %d", e.ident); 230 AppEvent ae = AppEvent.IN; 231 if ( e.flags & EV_ERROR) { 232 ae |= AppEvent.ERR; 233 } 234 if ( e.flags & EV_EOF) { 235 ae |= AppEvent.HUP; 236 } 237 int fd = cast(int)e.ident; 238 fileHandlers[fd].eventHandler(cast(int)e.ident, ae); 239 continue; 240 case EVFILT_WRITE: 241 debug tracef("Write on fd %d", e.ident); 242 AppEvent ae = AppEvent.OUT; 243 if ( e.flags & EV_ERROR) { 244 ae |= AppEvent.ERR; 245 } 246 if ( e.flags & EV_EOF) { 247 ae |= AppEvent.HUP; 248 } 249 int fd = cast(int)e.ident; 250 fileHandlers[fd].eventHandler(cast(int)e.ident, ae); 251 continue; 252 case EVFILT_TIMER: 253 /* 254 * Invariants for timers 255 * --------------------- 256 * timer list must not be empty at event. 257 * we have to receive event only on the earliest timer in list 258 */ 259 assert(!timers.empty, "timers empty on timer event: %s".format(out_events[0..ready])); 260 enforce((e.flags & EV_ERROR) == 0, "Timer errno: %d".format(e.data)); 261 262 if ( udataToTimer(e.udata) !is timers.front) { 263 errorf("timer event: %s != timers.front: %s", udataToTimer(e.udata), timers.front); 264 //errorf("timers=%s", to!string(timers)); 265 errorf("events=%s", out_events[0..ready]); 266 assert(0); 267 } 268 /* */ 269 270 auto now = Clock.currTime; 271 272 do { 273 debug tracef("processing %s, lag: %s", timers.front, Clock.currTime - timers.front._expires); 274 Timer t = timers.front; 275 HandlerDelegate h = t._handler; 276 try { 277 h(AppEvent.TMO); 278 } catch (Exception e) { 279 errorf("Uncaught exception: %s", e.msg); 280 } 281 // timer event handler can try to stop exactly this timer, 282 // so when we returned from handler we can have different front 283 // and we do not have to remove it. 284 if ( !timers.empty && timers.front is t ) { 285 timers.removeFront; 286 } 287 now = Clock.currTime; 288 } while (!timers.empty && timers.front._expires <= now ); 289 290 if ( ! timers.empty ) { 291 Duration kernel_delta = timers.front._expires - now; 292 assert(kernel_delta > 0.seconds); 293 _mod_kernel_timer(timers.front, kernel_delta); 294 } else { 295 // kqueue do not require deletion here 296 } 297 298 continue; 299 case EVFILT_SIGNAL: 300 assert(signals.length != 0); 301 auto signum = cast(int)e.ident; 302 debug tracef("received signal %d", signum); 303 assert(signals[signum].length > 0); 304 foreach(s; signals[signum]) { 305 debug tracef("processing signal handler %s", s); 306 try { 307 SigHandlerDelegate h = s._handler; 308 h(signum); 309 } catch (Exception e) { 310 errorf("Uncaught exception: %s", e.msg); 311 } 312 } 313 continue; 314 case EVFILT_USER: 315 handle_user_event(e); 316 continue; 317 default: 318 break; 319 } 320 } 321 } 322 } 323 324 void start_timer(Timer t) @trusted { 325 debug tracef("insert timer %s - %X", t, cast(void*)t); 326 if ( timers.empty || t < timers.front ) { 327 auto d = t._expires - Clock.currTime; 328 d = max(d, 0.seconds); 329 if ( d == 0.seconds ) { 330 overdue ~= t; 331 return; 332 } 333 if ( timers.empty ) { 334 _add_kernel_timer(t, d); 335 } else { 336 _mod_kernel_timer(t, d); 337 } 338 } 339 timers.insert(t); 340 } 341 342 bool timer_cleared_from_out_events(kevent_t e) @safe pure nothrow @nogc { 343 foreach(ref o; out_events[0..ready]) { 344 if ( o.ident == e.ident && o.filter == e.filter && o.udata == e.udata ) { 345 o.ident = 0; 346 o.filter = 0; 347 o.udata = null; 348 return true; 349 } 350 } 351 return false; 352 } 353 354 void stop_timer(Timer t) @trusted { 355 356 assert(!timers.empty, "You are trying to remove timer %s, but timer list is empty".format(t)); 357 358 debug tracef("timers: %s", timers); 359 if ( t != timers.front ) { 360 debug tracef("remove non-front %s", t); 361 auto r = timers.equalRange(t); 362 timers.remove(r); 363 return; 364 } 365 366 kevent_t e; 367 e.ident = 0; 368 e.filter = EVFILT_TIMER; 369 e.udata = cast(void*)t; 370 auto cleared = timer_cleared_from_out_events(e); 371 372 timers.removeFront(); 373 if ( timers.empty ) { 374 if ( cleared ) { 375 debug tracef("return because it is cleared"); 376 return; 377 } 378 debug tracef("we have to del this timer from kernel"); 379 _del_kernel_timer(); 380 return; 381 } 382 debug tracef("we have to set timer to next: %s, %s", out_events[0..ready], timers); 383 // we can change kernel timer to next, 384 // If next timer expired - set delta = 0 to run on next loop invocation 385 auto next = timers.front; 386 auto d = next._expires - Clock.currTime; 387 d = max(d, 0.seconds); 388 _mod_kernel_timer(timers.front, d); 389 return; 390 } 391 392 // pragma(inline, true) 393 // void processNotification(Notification ue, Broadcast broadcast) @safe { 394 // ue.handler(); 395 // } 396 397 // void postNotification(Notification notification, Broadcast broadcast = No.broadcast) @safe { 398 // debug trace("posting notification"); 399 // if ( !notificationsQueue.full ) 400 // { 401 // debug trace("put notification"); 402 // notificationsQueue.put(NotificationDelivery(notification, broadcast)); 403 // debug trace("put notification done"); 404 // return; 405 // } 406 // // now try to find space for next notification 407 // auto retries = 10 * notificationsQueue.Size; 408 // while(notificationsQueue.full && retries > 0) 409 // { 410 // retries--; 411 // auto nd = notificationsQueue.get(); 412 // Notification _n = nd._n; 413 // Broadcast _b = nd._broadcast; 414 // processNotification(_n, _b); 415 // } 416 // enforce(!notificationsQueue.full, "Can't clear space for next notification in notificatioinsQueue"); 417 // notificationsQueue.put(NotificationDelivery(notification, broadcast)); 418 // debug trace("posting notification - done"); 419 // } 420 // 421 void flush() @trusted { 422 if ( in_index == 0 ) { 423 return; 424 } 425 // flush 426 int rc = kevent(kqueue_fd, &in_events[0], in_index, null, 0, null); 427 enforce(rc>=0, "flush: kevent %s, %s".format(fromStringz(strerror(errno)), in_events[0..in_index])); 428 in_index = 0; 429 } 430 431 bool fd_cleared_from_out_events(kevent_t e) @safe pure nothrow @nogc { 432 foreach(ref o; out_events[0..ready]) { 433 if ( o.ident == e.ident && o.filter == e.filter ) { 434 o.ident = 0; 435 o.filter = 0; 436 return true; 437 } 438 } 439 return false; 440 } 441 442 void detach(int fd) @safe { 443 fileHandlers[fd] = null; 444 } 445 void start_poll(int fd, AppEvent ev, FileEventHandler h) @safe { 446 assert(fd>=0); 447 immutable filter = appEventToSysEvent(ev); 448 debug tracef("start poll on fd %d for events %s", fd, appeventToString(ev)); 449 kevent_t e; 450 e.ident = fd; 451 e.filter = filter; 452 e.flags = EV_ADD; 453 if ( in_index == MAXEVENTS ) { 454 flush(); 455 } 456 in_events[in_index++] = e; 457 fileHandlers[fd] = h; 458 } 459 void stop_poll(int fd, AppEvent ev) @safe { 460 assert(fd>=0); 461 immutable filter = appEventToSysEvent(ev); 462 debug tracef("stop poll on fd %d for events %s", fd, appeventToString(ev)); 463 kevent_t e; 464 e.ident = fd; 465 e.filter = filter; 466 e.flags = EV_DELETE|EV_DISABLE; 467 fd_cleared_from_out_events(e); 468 if ( in_index == MAXEVENTS ) { 469 flush(); 470 } 471 in_events[in_index++] = e; 472 flush(); 473 } 474 475 pragma(inline, true) 476 void handle_user_event(kevent_t e) @safe { 477 import core.thread; 478 debug tracef("Got user event thread.id:%s event.id:%d", Thread.getThis().id(), e.ident); 479 disable_user_event(e); 480 auto h = fileHandlers[e.ident]; 481 h.eventHandler(kqueue_fd, AppEvent.USER); 482 } 483 484 void wait_for_user_event(int event_id, FileEventHandler handler) @safe { 485 debug tracef("start waiting for user_event %s", event_id); 486 fileHandlers[event_id] = handler; 487 kevent_t e; 488 e.ident = event_id; 489 e.filter = EVFILT_USER; 490 e.flags = EV_ADD; 491 if ( in_index == MAXEVENTS ) { 492 flush(); 493 } 494 in_events[in_index++] = e; 495 } 496 void stop_wait_for_user_event(int event_id, FileEventHandler handler) @safe { 497 debug tracef("start waiting for user_event %s", event_id); 498 fileHandlers[event_id] = null; 499 kevent_t e; 500 e.ident = event_id; 501 e.filter = EVFILT_USER; 502 e.flags = EV_DELETE; 503 if ( in_index == MAXEVENTS ) { 504 flush(); 505 } 506 in_events[in_index++] = e; 507 } 508 void disable_user_event(kevent_t e) @safe { 509 e.flags = EV_DISABLE; 510 if ( in_index == MAXEVENTS ) { 511 flush(); 512 } 513 in_events[in_index++] = e; 514 } 515 void _add_kernel_timer(in Timer t, in Duration d) @trusted { 516 debug tracef("add kernel timer %s, delta %s", t, d); 517 assert(d >= 0.seconds); 518 intptr_t delay_ms; 519 if ( d < 36500.days) 520 { 521 delay_ms = d.split!"msecs".msecs; 522 } 523 else 524 { 525 // https://github.com/opensource-apple/xnu/blob/master/bsd/kern/kern_event.c#L1188 526 // OSX kerner refuses to set too large timer interwal with errno ERANGE 527 delay_ms = 36500.days.split!"msecs".msecs; 528 } 529 kevent_t e; 530 e.ident = 0; 531 e.filter = EVFILT_TIMER; 532 e.flags = EV_ADD | EV_ONESHOT; 533 e.data = delay_ms; 534 e.udata = cast(void*)t; 535 if ( in_index == MAXEVENTS ) { 536 flush(); 537 } 538 in_events[in_index++] = e; 539 } 540 541 alias _mod_kernel_timer = _add_kernel_timer; 542 543 void _del_kernel_timer() @safe { 544 debug trace("del kernel timer"); 545 kevent_t e; 546 e.ident = 0; 547 e.filter = EVFILT_TIMER; 548 e.flags = EV_DELETE; 549 if ( in_index == MAXEVENTS ) { 550 flush(); 551 } 552 in_events[in_index++] = e; 553 } 554 555 /* 556 * signal functions 557 */ 558 559 void start_signal(Signal s) { 560 debug tracef("start signal %s", s); 561 debug tracef("signals: %s", signals); 562 auto r = s._signum in signals; 563 if ( r is null || r.length == 0 ) { 564 // enable signal only through kevent 565 _add_kernel_signal(s); 566 } 567 signals[s._signum] ~= s; 568 } 569 570 void stop_signal(Signal s) { 571 debug trace("stop signal"); 572 auto r = s._signum in signals; 573 if ( r is null ) { 574 throw new NotFoundException("You tried to stop signal that was not started"); 575 } 576 Signal[] new_row; 577 foreach(a; *r) { 578 if (a._id == s._id) { 579 continue; 580 } 581 new_row ~= a; 582 } 583 if ( new_row.length == 0 ) { 584 *r = null; 585 _del_kernel_signal(s); 586 // reenable old signal behaviour 587 } else { 588 *r = new_row; 589 } 590 debug tracef("new signals %d row %s", s._signum, new_row); 591 } 592 593 void _add_kernel_signal(in Signal s) { 594 debug tracef("add kernel signal %d, id: %d", s._signum, s._id); 595 signal(s._signum, SIG_IGN); 596 597 kevent_t e; 598 e.ident = s._signum; 599 e.filter = EVFILT_SIGNAL; 600 e.flags = EV_ADD; 601 if ( in_index == MAXEVENTS ) { 602 // flush 603 int rc = kevent(kqueue_fd, &in_events[0], in_index, null, 0, null); 604 enforce(rc>=0, "_add_kernel_signal: kevent %s, %s".format(fromStringz(strerror(errno)), in_events[0..in_index])); 605 in_index = 0; 606 } 607 in_events[in_index++] = e; 608 } 609 610 void _del_kernel_signal(in Signal s) { 611 debug tracef("del kernel signal %d, id: %d", s._signum, s._id); 612 613 signal(s._signum, SIG_DFL); 614 615 kevent_t e; 616 e.ident = s._signum; 617 e.filter = EVFILT_SIGNAL; 618 e.flags = EV_DELETE; 619 if ( in_index == MAXEVENTS ) { 620 // flush 621 int rc = kevent(kqueue_fd, &in_events[0], in_index, null, 0, null); 622 enforce(rc>=0, "_add_kernel_signal: kevent %s, %s".format(fromStringz(strerror(errno)), in_events[0..in_index])); 623 in_index = 0; 624 } 625 in_events[in_index++] = e; 626 } 627 } 628 629 auto appEventToSysEvent(AppEvent ae) { 630 import core.bitop; 631 assert( popcnt(ae) == 1, "Set one event at a time, you tried %x, %s".format(ae, appeventToString(ae))); 632 assert( ae <= AppEvent.CONN, "You can ask for IN,OUT,CONN events"); 633 switch ( ae ) { 634 case AppEvent.IN: 635 return EVFILT_READ; 636 case AppEvent.OUT: 637 return EVFILT_WRITE; 638 case AppEvent.CONN: 639 return EVFILT_READ; 640 default: 641 throw new Exception("You can't wait for event %X".format(ae)); 642 } 643 } 644 AppEvent sysEventToAppEvent(short se) { 645 final switch ( se ) { 646 case EVFILT_READ: 647 return AppEvent.IN; 648 case EVFILT_WRITE: 649 return AppEvent.OUT; 650 // default: 651 // throw new Exception("Unexpected event %d".format(se)); 652 } 653 } 654 unittest { 655 import std.exception; 656 import core.exception; 657 658 assert(appEventToSysEvent(AppEvent.IN)==EVFILT_READ); 659 assert(appEventToSysEvent(AppEvent.OUT)==EVFILT_WRITE); 660 assert(appEventToSysEvent(AppEvent.CONN)==EVFILT_READ); 661 //assertThrown!AssertError(appEventToSysEvent(AppEvent.IN | AppEvent.OUT)); 662 assert(sysEventToAppEvent(EVFILT_READ) == AppEvent.IN); 663 assert(sysEventToAppEvent(EVFILT_WRITE) == AppEvent.OUT); 664 }