1 module hio.drivers.select; 2 3 import std.datetime; 4 import std.container; 5 import std.experimental.logger; 6 7 import std.experimental.allocator; 8 import std.experimental.allocator.mallocator; 9 import std.typecons; 10 11 import std.string; 12 import std.algorithm.comparison: min, max; 13 import std.exception: enforce; 14 import core.thread; 15 16 version(Windows) { 17 import core.sys.windows.winsock2; 18 } 19 version(Posix) { 20 import core.sys.posix.sys.select; 21 } 22 23 import core.stdc.string: strerror; 24 import core.stdc.errno; 25 import core.stdc.signal; 26 27 import hio.events; 28 29 // 30 // TODO add support for multiple select event loops 31 // 32 private enum sig_array_length = 256; 33 private static int[sig_array_length] last_signal; 34 private static int last_signal_index; 35 36 extern(C) void sig_catcher(int signum) nothrow @nogc { 37 last_signal[last_signal_index++] = signum; 38 } 39 40 private struct FileDescriptor { 41 package { 42 AppEvent _polling = AppEvent.NONE; 43 } 44 string toString() const @safe { 45 import std.format: format; 46 return appeventToString(_polling); 47 //return "FileDescriptor: filehandle: %d, events: %s".format(_fileno, appeventToString(_polling)); 48 } 49 } 50 51 struct FallbackEventLoopImpl { 52 immutable string _name = "select"; 53 immutable numberOfDescriptors = 1024; 54 55 private { 56 fd_set read_fds; 57 fd_set write_fds; 58 fd_set err_fds; 59 60 bool stopped = false; 61 RedBlackTree!Timer timers; 62 Timer[] overdue; // timers added with expiration in past 63 64 Signal[][int] signals; 65 66 FileDescriptor[numberOfDescriptors] fileDescriptors; 67 FileEventHandler[] fileHandlers; 68 //CircBuff!Notification notificationsQueue; 69 } 70 71 @disable this(this) {}; 72 73 void initialize() @safe nothrow { 74 timers = new RedBlackTree!Timer(); 75 fileHandlers = new FileEventHandler[](1024); 76 } 77 void deinit() @safe { 78 timers = null; 79 } 80 void stop() @safe { 81 debug trace("mark eventloop as stopped"); 82 stopped = true; 83 } 84 85 /** 86 * Find shortest interval between now->deadline, now->earliest timer 87 * If deadline expired or timer in past - set zero wait time 88 */ 89 timeval* _calculate_timeval(SysTime deadline, timeval* tv) { 90 SysTime now = Clock.currTime; 91 Duration d = deadline - now; 92 if ( ! timers.empty ) { 93 d = min(d, timers.front._expires - now); 94 } 95 d = max(d, 0.seconds); 96 auto converted = d.split!("seconds", "usecs"); 97 tv.tv_sec = cast(typeof(tv.tv_sec))converted.seconds; 98 tv.tv_usec = cast(typeof(tv.tv_usec))converted.usecs; 99 return tv; 100 } 101 timeval* _calculate_timeval(timeval* tv) { 102 SysTime now = Clock.currTime; 103 Duration d; 104 d = timers.front._expires - now; 105 d = max(d, 0.seconds); 106 auto converted = d.split!("seconds", "usecs"); 107 tv.tv_sec = cast(typeof(tv.tv_sec))converted.seconds; 108 tv.tv_usec = cast(typeof(tv.tv_usec))converted.usecs; 109 return tv; 110 } 111 void run(Duration d) { 112 113 immutable bool runIndefinitely = (d == Duration.max); 114 SysTime now = Clock.currTime; 115 SysTime deadline; 116 timeval tv; 117 timeval* wait; 118 119 if ( ! runIndefinitely ) { 120 deadline = now + d; 121 } 122 123 debug tracef("evl run %s",runIndefinitely? "indefinitely": "for %s".format(d)); 124 125 scope(exit) { 126 stopped = false; 127 } 128 129 while (!stopped) { 130 131 int fdmax = -1; 132 133 // 134 // handle user events(notifications) 135 // 136 //auto counter = notificationsQueue.Size * 10; 137 //while(!notificationsQueue.empty){ 138 // auto n = notificationsQueue.get(); 139 // n.handler(); 140 // counter--; 141 // enforce(counter > 0, "Can't clear notificatioinsQueue"); 142 // } 143 144 while (overdue.length > 0) { 145 // execute timers which user requested with negative delay 146 Timer t = overdue[0]; 147 overdue = overdue[1..$]; 148 debug tracef("execute overdue %s", t); 149 HandlerDelegate h = t._handler; 150 try { 151 h(AppEvent.TMO); 152 } catch (Exception e) { 153 errorf("Uncaught exception: %s", e); 154 } 155 } 156 if (stopped) { 157 break; 158 } 159 160 while ( !timers.empty && timers.front._expires <= now) { 161 debug tracef("processing overdue %s, lag: %s", timers.front, Clock.currTime - timers.front._expires); 162 Timer t = timers.front; 163 HandlerDelegate h = t._handler; 164 timers.removeFront; 165 try { 166 h(AppEvent.TMO); 167 } catch (Exception e) { 168 errorf("Uncaught exception: %s", e); 169 } 170 now = Clock.currTime; 171 } 172 173 FD_ZERO(&read_fds); 174 FD_ZERO(&write_fds); 175 FD_ZERO(&err_fds); 176 177 foreach(int fd; 0..numberOfDescriptors) { 178 AppEvent e = fileDescriptors[fd]._polling; 179 if ( e == AppEvent.NONE ) { 180 continue; 181 } 182 debug tracef("poll %d for %s", fd, fileDescriptors[fd]); 183 if ( e & AppEvent.IN ) { 184 FD_SET(fd, &read_fds); 185 } 186 if ( e & AppEvent.OUT ) { 187 FD_SET(fd, &write_fds); 188 } 189 fdmax = max(fdmax, fd); 190 } 191 192 wait = (runIndefinitely && timers.empty) ? 193 null 194 : _calculate_timeval(deadline, &tv); 195 if ( runIndefinitely && timers.empty ) { 196 wait = null; 197 } else 198 if ( runIndefinitely && !timers.empty ) { 199 wait = _calculate_timeval(&tv); 200 } else 201 wait = _calculate_timeval(deadline, &tv); 202 203 //debug tracef("waiting for events %s", wait is null?"forever":"%s".format(*wait)); 204 auto ready = select(fdmax+1, &read_fds, &write_fds, &err_fds, wait); 205 //debug tracef("returned %d events", ready); 206 if ( ready < 0 && errno == EINTR ) { 207 int s_ind; 208 while(s_ind < last_signal_index) { 209 int signum = last_signal[s_ind]; 210 assert(signals[signum].length > 0); 211 foreach(s; signals[signum]) { 212 debug tracef("processing signal handler %s", s); 213 try { 214 SigHandlerDelegate h = s._handler; 215 h(signum); 216 } catch (Exception e) { 217 errorf("Uncaught exception: %s", e); 218 } 219 } 220 s_ind++; 221 } 222 last_signal_index = 0; 223 continue; 224 } 225 if ( ready < 0 ) { 226 errorf("on call: (%s, %s, %s, %s)", fdmax+1, read_fds, write_fds, tv); 227 errorf("select returned error %s", fromStringz(strerror(errno))); 228 } 229 enforce(ready >= 0); 230 if ( ready == 0 ) { 231 // Timedout 232 // 233 // For select there can be two reasons for ready == 0: 234 // 1. we reached deadline 235 // 2. we have timer event 236 // 237 if ( timers.empty ) { 238 // there were no timers, so this can be only timeout 239 debug trace("select timedout and no timers active"); 240 assert(Clock.currTime >= deadline); 241 return; 242 } 243 now = Clock.currTime; 244 if ( !runIndefinitely && now >= deadline ) { 245 debug trace("deadline reached"); 246 return; 247 } 248 249 /* 250 * Invariants for timers 251 * --------------------- 252 * timer list must not be empty at event. 253 * we have to receive event only on the earliest timer in list 254 */ 255 assert(!timers.empty, "timers empty on timer event"); 256 /* */ 257 258 if ( timers.front._expires <= now) do { 259 debug tracef("processing %s, lag: %s", timers.front, Clock.currTime - timers.front._expires); 260 Timer t = timers.front; 261 HandlerDelegate h = t._handler; 262 try { 263 h(AppEvent.TMO); 264 } catch (Exception e) { 265 errorf("Uncaught exception: %s", e); 266 } 267 // timer event handler can try to stop exactly this timer, 268 // so when we returned from handler we can have different front 269 // and we do not have to remove it. 270 if ( !timers.empty && timers.front == t ) { 271 timers.removeFront; 272 } 273 now = Clock.currTime; 274 } while (!timers.empty && timers.front._expires <= now ); 275 } 276 if ( ready > 0 ) { 277 foreach(int fd; 0..numberOfDescriptors) { 278 AppEvent e = fileDescriptors[fd]._polling; 279 if ( e == AppEvent.NONE ) { 280 continue; 281 } 282 debug tracef("check %d for %s", fd, fileDescriptors[fd]); 283 if ( e & AppEvent.IN && FD_ISSET(fd, &read_fds) ) { 284 debug tracef("got IN event on file %d", fd); 285 fileHandlers[fd].eventHandler(fd, AppEvent.IN); 286 } 287 if ( e & AppEvent.OUT && FD_ISSET(fd, &write_fds) ) { 288 debug tracef("got OUT event on file %d", fd); 289 fileHandlers[fd].eventHandler(fd, AppEvent.OUT); 290 } 291 } 292 } 293 } 294 } 295 296 void start_timer(Timer t) @trusted { 297 debug tracef("insert timer: %s", t); 298 auto d = t._expires - Clock.currTime; 299 d = max(d, 0.seconds); 300 if ( d == 0.seconds ) { 301 overdue ~= t; 302 return; 303 } 304 timers.insert(t); 305 } 306 307 void stop_timer(Timer t) @trusted { 308 assert(!timers.empty, "You are trying to remove timer %s, but timer list is empty".format(t)); 309 debug tracef("remove timer %s", t); 310 auto r = timers.equalRange(t); 311 timers.remove(r); 312 } 313 314 void start_poll(int fd, AppEvent ev, FileEventHandler h) pure @safe { 315 enforce(fd >= 0, "fileno can't be negative"); 316 enforce(fd < numberOfDescriptors, "Can't use such big fd, recompile with larger numberOfDescriptors"); 317 debug tracef("start poll on fd %d for events %s", fd, appeventToString(ev)); 318 fileDescriptors[fd]._polling |= ev; 319 fileHandlers[fd] = h; 320 } 321 void stop_poll(int fd, AppEvent ev) @safe { 322 enforce(fd >= 0, "fileno can't be negative"); 323 enforce(fd < numberOfDescriptors, "Can't use such big fd, recompile with larger numberOfDescriptors"); 324 debug tracef("stop poll on fd %d for events %s", fd, appeventToString(ev)); 325 fileDescriptors[fd]._polling &= ev ^ AppEvent.ALL; 326 } 327 328 int get_kernel_id() @safe @nogc { 329 return -1; 330 } 331 332 void wait_for_user_event(int event_id, FileEventHandler handler) @safe { 333 334 } 335 void stop_wait_for_user_event(int event_id, FileEventHandler handler) @safe { 336 337 } 338 339 void detach(int fd) @safe { 340 fileHandlers[fd] = null; 341 } 342 343 // pragma(inline) 344 // void processNotification(Notification ue) @safe { 345 // ue.handler(); 346 // } 347 348 // void postNotification(Notification notification, Broadcast broadcast = No.broadcast) @safe { 349 // debug trace("posting notification"); 350 // if ( !notificationsQueue.full ) 351 // { 352 // notificationsQueue.put(notification); 353 // return; 354 // } 355 // // now try to find space for next notification 356 // auto retries = 10 * notificationsQueue.Size; 357 // while(notificationsQueue.full && retries > 0) 358 // { 359 // retries--; 360 // auto _n = notificationsQueue.get(); 361 // processNotification(_n); 362 // } 363 // enforce(!notificationsQueue.full, "Can't clear space for next notification in notificatioinsQueue"); 364 // notificationsQueue.put(notification); 365 // } 366 void flush() { 367 } 368 void start_signal(Signal s) { 369 debug tracef("start signal %s", s); 370 debug tracef("signals: %s", signals); 371 auto r = s._signum in signals; 372 if ( r is null || r.length == 0 ) { 373 // enable signal only through kevent 374 _add_kernel_signal(s); 375 } 376 signals[s._signum] ~= s; 377 } 378 379 void stop_signal(Signal s) { 380 debug trace("stop signal"); 381 auto r = s._signum in signals; 382 if ( r is null ) { 383 throw new NotFoundException("You tried to stop signal that was not started"); 384 } 385 Signal[] new_row; 386 foreach(a; *r) { 387 if (a._id == s._id) { 388 continue; 389 } 390 new_row ~= a; 391 } 392 if ( new_row.length == 0 ) { 393 *r = null; 394 _del_kernel_signal(s); 395 // reenable old signal behaviour 396 } else { 397 *r = new_row; 398 } 399 debug tracef("new signals %d row %s", s._signum, new_row); 400 } 401 void _add_kernel_signal(Signal s) { 402 signal(s._signum, &sig_catcher); 403 debug tracef("adding handler for signum %d: %x", s._signum, &this); 404 } 405 void _del_kernel_signal(Signal s) { 406 signal(s._signum, SIG_DFL); 407 debug tracef("deleted handler for signum %d: %x", s._signum, &this); 408 } 409 }