1 module hio.drivers.epoll; 2 3 version(linux): 4 5 import std.datetime; 6 import std.string; 7 import std.container; 8 import std.exception; 9 import std.experimental.logger; 10 import std.typecons; 11 12 import std.experimental.allocator; 13 import std.experimental.allocator.mallocator; 14 15 import core.memory: GC; 16 17 import std.algorithm.comparison: max; 18 import core.stdc.string: strerror; 19 import core.stdc.errno: errno, EAGAIN, EINTR; 20 21 import core.sys.linux.epoll; 22 import core.sys.linux.timerfd; 23 import core.sys.linux.sys.signalfd; 24 25 import core.sys.posix.unistd: close, read; 26 import core.sys.posix.time : itimerspec, CLOCK_MONOTONIC , timespec; 27 28 import hio.events; 29 import hio.common; 30 31 struct NativeEventLoopImpl { 32 immutable bool native = true; 33 immutable string _name = "epoll"; 34 private { 35 bool stopped = false; 36 enum MAXEVENTS = 1024; 37 int epoll_fd = -1; 38 int timer_fd = -1; 39 int signal_fd = -1; 40 sigset_t mask; 41 42 align(1) epoll_event[MAXEVENTS] events; 43 44 RedBlackTree!Timer timers; 45 Timer[] overdue; // timers added with expiration in past 46 Signal[][int] signals; 47 //FileHandlerFunction[int] fileHandlers; 48 FileEventHandler[] fileHandlers; 49 50 } 51 @disable this(this) {} 52 53 void initialize() @trusted nothrow { 54 if ( epoll_fd == -1 ) { 55 epoll_fd = (() @trusted => epoll_create(MAXEVENTS))(); 56 } 57 if ( timer_fd == -1 ) { 58 timer_fd = (() @trusted => timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK))(); 59 } 60 timers = new RedBlackTree!Timer(); 61 fileHandlers = Mallocator.instance.makeArray!FileEventHandler(16*1024); 62 GC.addRange(&fileHandlers[0], fileHandlers.length * FileEventHandler.sizeof); 63 } 64 void deinit() @trusted { 65 close(epoll_fd); 66 epoll_fd = -1; 67 close(timer_fd); 68 timer_fd = -1; 69 timers = null; 70 GC.removeRange(&fileHandlers[0]); 71 Mallocator.instance.dispose(fileHandlers); 72 } 73 74 void stop() @safe { 75 stopped = true; 76 } 77 78 int _calculate_timeout(SysTime deadline) { 79 Duration delta = deadline - Clock.currTime; 80 delta = max(delta, 0.seconds); 81 return cast(int)delta.total!"msecs"; 82 } 83 /** 84 * 85 **/ 86 void run(Duration d) { 87 88 immutable bool runIndefinitely = (d == Duration.max); 89 90 /** 91 * eventloop will exit when we reach deadline 92 * it is allowed to have d == 0.seconds, 93 * which mean we wil run events once 94 **/ 95 SysTime deadline = Clock.currTime + d; 96 debug tracef("evl run %s",runIndefinitely? "indefinitely": "for %s".format(d)); 97 98 scope ( exit ) 99 { 100 stopped = false; 101 } 102 103 while( !stopped ) { 104 debug tracef("event loop iteration"); 105 106 // 107 // handle user events(notifications) 108 // 109 // auto counter = notificationsQueue.Size * 10; 110 // while(!notificationsQueue.empty){ 111 // auto nd = notificationsQueue.get(); 112 // Notification n = nd._n; 113 // Broadcast b = nd._broadcast; 114 // n.handler(b); 115 // counter--; 116 // enforce(counter > 0, "Can't clear notificatioinsQueue"); 117 // } 118 //auto counter = notificationsQueue.Size * 10; 119 //while(!notificationsQueue.empty){ 120 // auto n = notificationsQueue.get(); 121 // n.handler(); 122 // counter--; 123 // enforce(counter > 0, "Can't clear notificatioinsQueue"); 124 //} 125 126 while (overdue.length > 0) { 127 // execute timers with requested negative delay 128 Timer t = overdue[0]; 129 overdue = overdue[1..$]; 130 debug tracef("execute overdue %s", t); 131 HandlerDelegate h = t._handler; 132 try { 133 h(AppEvent.TMO); 134 } catch (Exception e) { 135 errorf("Uncaught exception: %s", e); 136 } 137 } 138 if (stopped) { 139 break; 140 } 141 142 int timeout_ms = runIndefinitely ? 143 -1 : 144 _calculate_timeout(deadline); 145 146 int ready = epoll_wait(epoll_fd, &events[0], MAXEVENTS, timeout_ms); 147 debug tracef("got %d events", ready); 148 if ( ready == 0 ) { 149 debug trace("epoll timedout and no events to process"); 150 return; 151 } 152 if ( ready == -1 && errno == EINTR) { 153 continue; 154 } 155 if ( ready < 0 ) { 156 errorf("epoll_wait returned error %s", fromStringz(strerror(errno))); 157 } 158 enforce(ready >= 0); 159 debug tracef("events: %s", events[0..ready]); 160 foreach(i; 0..ready) { 161 auto e = events[i]; 162 debug tracef("got event %s", e); 163 int fd = e.data.fd; 164 165 if ( fd == timer_fd ) { 166 // with EPOLLET flag I dont have to read from timerfd, otherwise I have to: 167 // ubyte[8] v; 168 // auto tfdr = read(timer_fd, &v[0], 8); 169 debug tracef("timer event"); 170 auto now = Clock.currTime; 171 /* 172 * Invariants for timers 173 * --------------------- 174 * timer list must not be empty at event. 175 * we have to receive event only on the earliest timer in list 176 **/ 177 assert(!timers.empty, "timers empty on timer event"); 178 assert(timers.front._expires <= now); 179 180 do { 181 debug tracef("processing %s, lag: %s", timers.front, Clock.currTime - timers.front._expires); 182 Timer t = timers.front; 183 HandlerDelegate h = t._handler; 184 timers.removeFront; 185 if (timers.empty) { 186 _del_kernel_timer(); 187 } 188 try { 189 h(AppEvent.TMO); 190 } catch (Exception e) { 191 errorf("Uncaught exception: %s", e); 192 } 193 now = Clock.currTime; 194 } while (!timers.empty && timers.front._expires <= now ); 195 196 if ( ! timers.empty ) { 197 Duration kernel_delta = timers.front._expires - now; 198 assert(kernel_delta > 0.seconds); 199 _mod_kernel_timer(timers.front, kernel_delta); 200 } else { 201 // delete kernel timer so we can add it next time 202 //_del_kernel_timer(); 203 } 204 continue; 205 } 206 if ( fd == signal_fd ) { 207 enum siginfo_items = 8; 208 signalfd_siginfo[siginfo_items] info; 209 debug trace("got signal"); 210 assert(signal_fd != -1); 211 while (true) { 212 auto rc = read(signal_fd, &info, info.sizeof); 213 if ( rc < 0 && errno == EAGAIN ) { 214 break; 215 } 216 enforce(rc > 0); 217 auto got_signals = rc / signalfd_siginfo.sizeof; 218 debug tracef("read info %d, %s", got_signals, info[0..got_signals]); 219 foreach(si; 0..got_signals) { 220 auto signum = info[si].ssi_signo; 221 debug tracef("signum: %d", signum); 222 foreach(s; signals[signum]) { 223 debug tracef("processing signal handler %s", s); 224 try { 225 SigHandlerDelegate h = s._handler; 226 h(signum); 227 } catch (Exception e) { 228 errorf("Uncaught exception: %s", e); 229 } 230 } 231 } 232 } 233 continue; 234 } 235 AppEvent ae; 236 if ( e.events & EPOLLIN ) { 237 ae |= AppEvent.IN; 238 } 239 if (e.events & EPOLLOUT) { 240 ae |= AppEvent.OUT; 241 } 242 if (e.events & EPOLLERR) { 243 ae |= AppEvent.ERR; 244 } 245 if (e.events & EPOLLHUP) { 246 ae |= AppEvent.HUP; 247 } 248 debug tracef("process event %02x on fd: %s, handler: %s", e.events, e.data.fd, fileHandlers[fd]); 249 if ( fileHandlers[fd] !is null ) { 250 try { 251 fileHandlers[fd].eventHandler(e.data.fd, ae); 252 } 253 catch (Exception e) { 254 errorf("On file handler: %d, %s", fd, e); 255 } 256 } 257 //HandlerDelegate h = cast(HandlerDelegate)e.data.ptr; 258 //AppEvent appEvent = AppEvent(sysEventToAppEvent(e.events), -1); 259 //h(appEvent); 260 } 261 } 262 } 263 void start_timer(Timer t) @safe { 264 debug tracef("insert timer %s", t); 265 if ( timers.empty || t < timers.front ) { 266 auto d = t._expires - Clock.currTime; 267 d = max(d, 0.seconds); 268 if ( d == 0.seconds ) { 269 overdue ~= t; 270 return; 271 } 272 debug { 273 tracef("timers: %s", timers); 274 } 275 if ( timers.empty ) { 276 _add_kernel_timer(t, d); 277 } else { 278 _mod_kernel_timer(t, d); 279 } 280 } 281 timers.insert(t); 282 } 283 284 void stop_timer(Timer t) @safe { 285 debug tracef("remove timer %s", t); 286 287 if ( t !is timers.front ) { 288 debug tracef("Non front timer: %s", timers); 289 auto r = timers.equalRange(t); 290 timers.remove(r); 291 return; 292 } 293 294 timers.removeFront(); 295 debug trace("we have to del this timer from kernel or set to next"); 296 if ( !timers.empty ) { 297 // we can change kernel timer to next, 298 // If next timer expired - set delta = 0 to run on next loop invocation 299 debug trace("set up next timer"); 300 auto next = timers.front; 301 auto d = next._expires - Clock.currTime; 302 d = max(d, 0.seconds); 303 _mod_kernel_timer(timers.front, d); 304 return; 305 } 306 debug trace("remove last timer"); 307 _del_kernel_timer(); 308 } 309 310 void _add_kernel_timer(Timer t, in Duration d) @trusted { 311 debug trace("add kernel timer"); 312 assert(d > 0.seconds); 313 itimerspec itimer; 314 auto ds = d.split!("seconds", "nsecs"); 315 itimer.it_value.tv_sec = cast(typeof(itimer.it_value.tv_sec)) ds.seconds; 316 itimer.it_value.tv_nsec = cast(typeof(itimer.it_value.tv_nsec)) ds.nsecs; 317 int rc = timerfd_settime(timer_fd, 0, &itimer, null); 318 enforce(rc >= 0, "timerfd_settime(%s): %s".format(itimer, fromStringz(strerror(errno)))); 319 epoll_event e; 320 e.events = EPOLLIN|EPOLLET; 321 e.data.fd = timer_fd; 322 rc = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, timer_fd, &e); 323 enforce(rc >= 0, "epoll_ctl add(%s): %s".format(e, fromStringz(strerror(errno)))); 324 } 325 void _mod_kernel_timer(Timer t, in Duration d) @trusted { 326 debug tracef("mod kernel timer to %s", t); 327 assert(d >= 0.seconds, "Illegal timer %s".format(d)); 328 itimerspec itimer; 329 auto ds = d.split!("seconds", "nsecs"); 330 itimer.it_value.tv_sec = cast(typeof(itimer.it_value.tv_sec)) ds.seconds; 331 itimer.it_value.tv_nsec = cast(typeof(itimer.it_value.tv_nsec)) ds.nsecs; 332 int rc = timerfd_settime(timer_fd, 0, &itimer, null); 333 enforce(rc >= 0, "timerfd_settime(%s): %s".format(itimer, fromStringz(strerror(errno)))); 334 epoll_event e; 335 e.events = EPOLLIN|EPOLLET; 336 e.data.fd = timer_fd; 337 rc = epoll_ctl(epoll_fd, EPOLL_CTL_MOD, timer_fd, &e); 338 enforce(rc >= 0); 339 } 340 void _del_kernel_timer() @trusted { 341 debug trace("del kernel timer"); 342 epoll_event e; 343 e.events = EPOLLIN; 344 e.data.fd = timer_fd; 345 int rc = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, timer_fd, &e); 346 enforce(rc >= 0, "epoll_ctl del(%s): %s".format(e, fromStringz(strerror(errno)))); 347 } 348 // 349 // notifications 350 // 351 // pragma(inline) 352 // void processNotification(Notification ue, Broadcast broadcast) @safe { 353 // ue.handler(broadcast); 354 // } 355 // void postNotification(Notification notification, Broadcast broadcast = No.broadcast) @safe { 356 // debug trace("posting notification"); 357 // if ( !notificationsQueue.full ) 358 // { 359 // debug trace("put notification"); 360 // notificationsQueue.put(NotificationDelivery(notification, broadcast)); 361 // debug trace("put notification done"); 362 // return; 363 // } 364 // // now try to find space for next notification 365 // auto retries = 10 * notificationsQueue.Size; 366 // while(notificationsQueue.full && retries > 0) 367 // { 368 // retries--; 369 // auto nd = notificationsQueue.get(); 370 // Notification _n = nd._n; 371 // Broadcast _b = nd._broadcast; 372 // processNotification(_n, _b); 373 // } 374 // enforce(!notificationsQueue.full, "Can't clear space for next notification in notificatioinsQueue"); 375 // notificationsQueue.put(NotificationDelivery(notification, broadcast)); 376 // debug trace("posting notification - done"); 377 // } 378 379 380 //void postNotification(Notification notification, Broadcast broadcast = No.broadcast) @safe { 381 // debug trace("posting notification"); 382 // if ( !notificationsQueue.full ) 383 // { 384 // notificationsQueue.put(NotificationDelivery(notification, broadcast)); 385 // return; 386 // } 387 // // now try to find space for next notification 388 // auto retries = 10 * notificationsQueue.Size; 389 // while(notificationsQueue.full && retries > 0) 390 // { 391 // retries--; 392 // auto nd = notificationsQueue.get(); 393 // Notification _n = nd._n; 394 // Broadcast _b = nd._broadcast; 395 // processNotification(_n, _b); 396 // } 397 // enforce(!notificationsQueue.full, "Can't clear space for next notification in notificatioinsQueue"); 398 // notificationsQueue.put(NotificationDelivery(notification, broadcast)); 399 //} 400 401 // 402 // signals 403 // 404 void start_signal(Signal s) { 405 debug tracef("start signal %s", s); 406 debug tracef("signals: %s", signals); 407 auto r = s._signum in signals; 408 if ( r is null || r.length == 0 ) { 409 // enable signal only through kevent 410 _add_kernel_signal(s); 411 } 412 signals[s._signum] ~= s; 413 } 414 void stop_signal(Signal s) { 415 debug trace("stop signal"); 416 auto r = s._signum in signals; 417 if ( r is null ) { 418 throw new NotFoundException("You tried to stop signal that was not started"); 419 } 420 Signal[] new_row; 421 foreach(a; *r) { 422 if (a._id == s._id) { 423 continue; 424 } 425 new_row ~= a; 426 } 427 if ( new_row.length == 0 ) { 428 *r = null; 429 _del_kernel_signal(s); 430 // reenable old signal behaviour 431 } else { 432 *r = new_row; 433 } 434 debug tracef("new signals %d row %s", s._signum, new_row); 435 } 436 void _add_kernel_signal(Signal s) { 437 debug tracef("add kernel signal %d, id: %d", s._signum, s._id); 438 sigset_t m; 439 sigemptyset(&m); 440 sigaddset(&m, s._signum); 441 pthread_sigmask(SIG_BLOCK, &m, null); 442 443 sigaddset(&mask, s._signum); 444 if ( signal_fd == -1 ) { 445 signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC); 446 debug tracef("signalfd %d", signal_fd); 447 epoll_event e; 448 e.events = EPOLLIN|EPOLLET; 449 e.data.fd = signal_fd; 450 auto rc = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, signal_fd, &e); 451 enforce(rc >= 0, "epoll_ctl add(%s): %s".format(e, fromStringz(strerror(errno)))); 452 } else { 453 signalfd(signal_fd, &mask, 0); 454 } 455 456 } 457 void _del_kernel_signal(Signal s) { 458 debug tracef("del kernel signal %d, id: %d", s._signum, s._id); 459 sigset_t m; 460 sigemptyset(&m); 461 sigaddset(&m, s._signum); 462 pthread_sigmask(SIG_UNBLOCK, &m, null); 463 sigdelset(&mask, s._signum); 464 assert(signal_fd != -1); 465 signalfd(signal_fd, &mask, 0); 466 } 467 void wait_for_user_event(int event_id, FileEventHandler handler) @safe { 468 epoll_event e; 469 e.events = EPOLLIN; 470 e.data.fd = event_id; 471 auto rc = (() @trusted => epoll_ctl(epoll_fd, EPOLL_CTL_ADD, event_id, &e))(); 472 enforce(rc >= 0, "epoll_ctl add(%s): %s".format(e, s_strerror(errno))); 473 fileHandlers[event_id] = handler; 474 } 475 void stop_wait_for_user_event(int event_id, FileEventHandler handler) @safe { 476 epoll_event e; 477 e.events = EPOLLIN; 478 e.data.fd = event_id; 479 auto rc = (() @trusted => epoll_ctl(epoll_fd, EPOLL_CTL_DEL, event_id, &e))(); 480 fileHandlers[event_id] = null; 481 } 482 483 int get_kernel_id() pure @safe nothrow @nogc { 484 return epoll_fd; 485 } 486 487 // 488 // files/sockets 489 // 490 void detach(int fd) @safe { 491 fileHandlers[fd] = null; 492 } 493 void start_poll(int fd, AppEvent ev, FileEventHandler f) @trusted { 494 epoll_event e; 495 e.events = appEventToSysEvent(ev); 496 e.data.fd = fd; 497 auto rc = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &e); 498 enforce(rc >= 0, "epoll_ctl add(%d, %s): %s".format(fd, e, fromStringz(strerror(errno)))); 499 fileHandlers[fd] = f; 500 } 501 502 void stop_poll(int fd, AppEvent ev) @trusted { 503 epoll_event e; 504 e.events = appEventToSysEvent(ev); 505 e.data.fd = fd; 506 auto rc = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, &e); 507 } 508 auto appEventToSysEvent(AppEvent ae) pure @safe { 509 import core.bitop; 510 assert( popcnt(ae) == 1, "Set one event at a time, you tried %x, %s".format(ae, appeventToString(ae))); 511 assert( ae <= AppEvent.CONN, "You can ask for IN,OUT,CONN events"); 512 switch ( ae ) { 513 case AppEvent.IN: 514 return EPOLLIN; 515 case AppEvent.OUT: 516 return EPOLLOUT; 517 //case AppEvent.CONN: 518 // return EVFILT_READ; 519 default: 520 throw new Exception("You can't wait for event %X".format(ae)); 521 } 522 } 523 } 524