1 module hio.scheduler; 2 3 import std.experimental.logger; 4 5 import core.thread; 6 import core.sync.mutex; 7 import std.concurrency; 8 import std.datetime; 9 import std.format; 10 import std.traits; 11 import std.exception; 12 import core.sync.condition; 13 import std.algorithm; 14 import std.typecons; 15 import std.range; 16 17 //import core.stdc.string; 18 //import core.stdc.errno; 19 20 //static import core.sys.posix.unistd; 21 22 private enum PAGESIZE = 4*1024; 23 /// stack size for new tasks 24 shared int TASK_STACK_SIZE = 16 * PAGESIZE; 25 26 import hio.events; 27 import hio.loop; 28 import hio.common; 29 30 import std.stdio; 31 32 struct TaskNotReady { 33 string msg; 34 } 35 36 class NotReadyException : Exception 37 { 38 this(string msg, string file = __FILE__, size_t line = __LINE__) 39 { 40 super(msg, file, line); 41 } 42 } 43 44 void hlSleep(Duration d) { 45 if ( d <= 0.seconds) { 46 return; 47 } 48 auto tid = Fiber.getThis(); 49 assert(tid !is null); 50 bool shutdown; 51 auto callback = delegate void (AppEvent e) @trusted 52 { 53 debug tracef("got event %s while sleeping", e); 54 if ( e & AppEvent.SHUTDOWN) 55 { 56 shutdown = true; 57 } 58 tid.call(Fiber.Rethrow.no); 59 }; 60 auto t = new Timer(d, callback); 61 getDefaultLoop().startTimer(t); 62 Fiber.yield(); 63 if (shutdown) 64 { 65 throw new LoopShutdownException("got loop shutdown"); 66 } 67 } 68 69 struct Box(T) { 70 71 enum Void = is(T == void); 72 73 static if (!Void) { 74 T _data; 75 } 76 SocketPair _pair; 77 Throwable _exception; 78 @disable this(this); 79 } 80 81 ReturnType!F App(F, A...) (F f, A args) { 82 alias R = ReturnType!F; 83 Box!R box; 84 static if (!box.Void) 85 { 86 R r; 87 } 88 void _wrapper() 89 { 90 try 91 { 92 static if (!box.Void) 93 { 94 r = f(args); 95 box._data = r; 96 } 97 else 98 { 99 f(args); 100 } 101 } 102 catch (Throwable e) 103 { 104 version(unittest) 105 { 106 debug tracef("app throwed %s", e); 107 } 108 else 109 { 110 errorf("app throwed %s", e); 111 } 112 box._exception = e; 113 } 114 getDefaultLoop().stop(); 115 } 116 117 // shared void delegate() run = () { 118 // // 119 // // in the child thread: 120 // // 1. start new fiber (task over wrapper) with user supplied function 121 // // 2. start event loop forewer 122 // // 3. when eventLoop done(stopped inside from wrapper) the task will finish 123 // // 4. store value in box and use socketpair to send signal to caller thread 124 // // 125 // auto t = task(&_wrapper); 126 // auto e = t.start(Fiber.Rethrow.no); 127 // if ( box._exception is null ) { // box.exception can be filled before Fiber start 128 // box._exception = e; 129 // } 130 // getDefaultLoop.run(Duration.max); 131 // //t.reset(); 132 // }; 133 // Thread child = new Thread(run); 134 // child.start(); 135 // child.join(); 136 //run(); 137 auto t = task(&_wrapper); 138 auto e = t.start(Fiber.Rethrow.no); 139 if ( !box._exception ) 140 { 141 // box.exception can be filled before Fiber start 142 box._exception = e; 143 } 144 if ( !box._exception) 145 { 146 // if we started ok - run loop 147 getDefaultLoop().run(Duration.max); 148 } 149 if (box._exception) 150 { 151 throw box._exception; 152 } 153 static if (!box.Void) 154 { 155 debug tracef("joined, value = %s", box._data); 156 return box._data; 157 } 158 else 159 { 160 debug tracef("joined"); 161 } 162 } 163 164 interface Computation { 165 bool ready(); 166 bool wait(Duration t = Duration.max); 167 } 168 169 enum Commands 170 { 171 StopLoop, 172 WakeUpLoop, 173 ShutdownLoop, 174 } 175 /// 176 class Threaded(F, A...) : Computation if (isCallable!F) { 177 alias start = run; 178 private { 179 alias R = ReturnType!F; 180 181 F _f; 182 A _args; 183 bool _ready = false; 184 Thread _child; 185 bool _child_joined; // did we called _child.join? 186 Fiber _parent; 187 Box!R _box; 188 Timer _t; 189 enum Void = _box.Void; 190 bool _isDaemon; 191 SocketPair _commands; 192 } 193 final this(F f, A args) { 194 _f = f; 195 _args = args; 196 } 197 198 override bool ready() { 199 return _ready; 200 } 201 auto isDaemon(bool v) 202 in(_child is null) // you can't change this after thread started 203 { 204 _isDaemon = v; 205 return this; 206 } 207 auto isDaemon() 208 { 209 return _isDaemon; 210 } 211 static if (!Void) { 212 R value() { 213 if (_ready) 214 return _box._data; 215 throw new NotReadyException("You can't call value for non-ready task"); 216 } 217 } 218 void stopThreadLoop() 219 { 220 debug tracef("stopping loop in thread"); 221 ubyte[1] cmd = [Commands.StopLoop]; 222 auto s = _commands.write(1, cmd); 223 assert(s == 1); 224 } 225 void wakeUpThreadLoop() 226 { 227 debug tracef("waking up loop in thread"); 228 ubyte[1] cmd = [Commands.WakeUpLoop]; 229 auto s = _commands.write(1, cmd); 230 assert(s == 1); 231 } 232 void shutdownThreadLoop() 233 { 234 debug tracef("shutdown loop in thread"); 235 ubyte[1] cmd = [Commands.ShutdownLoop]; 236 auto s = _commands.write(1, cmd); 237 assert(s == 1); 238 } 239 override bool wait(Duration timeout = Duration.max) 240 in(!_isDaemon) // this not works with daemons 241 in(_child !is null) // you can wait only for started tasks 242 { 243 if (_ready) { 244 if ( !_child_joined ) { 245 _child.join(); 246 _child_joined = true; 247 _commands.close(); 248 _box._pair.close(); 249 } 250 if ( _box._exception ) { 251 throw _box._exception; 252 } 253 return true; 254 } 255 if ( timeout <= 0.seconds ) 256 { 257 // this is poll 258 return _ready; 259 } 260 if ( timeout < Duration.max ) 261 { 262 // rize timer 263 _t = new Timer(timeout, (AppEvent e) @trusted { 264 getDefaultLoop().stopPoll(_box._pair[0], AppEvent.IN); 265 debug tracef("threaded timed out"); 266 auto throwable = _parent.call(Fiber.Rethrow.no); 267 _t = null; 268 }); 269 getDefaultLoop().startTimer(_t); 270 } 271 // wait on the pair 272 final class ThreadEventHandler : FileEventHandler 273 { 274 override string describe() @safe 275 { 276 return 277 "thread event handler: f(args): %s(%s)".format(_f, _args); 278 } 279 override void eventHandler(int fd, AppEvent e) @trusted 280 { 281 _box._pair.read(0, 1); 282 getDefaultLoop().stopPoll(_box._pair[0], AppEvent.IN); 283 getDefaultLoop().detach(_box._pair[0]); 284 debug tracef("threaded done"); 285 if ( _t ) { 286 getDefaultLoop.stopTimer(_t); 287 _t = null; 288 } 289 auto throwable = _parent.call(Fiber.Rethrow.no); 290 } 291 } 292 _parent = Fiber.getThis(); 293 assert(_parent, "You can call this only trom fiber"); 294 debug tracef("wait - start listen on socketpair"); 295 //auto eh = new ThreadEventHandler(); 296 getDefaultLoop().startPoll(_box._pair[0], AppEvent.IN, new ThreadEventHandler()); 297 Fiber.yield(); 298 debug tracef("wait done"); 299 if ( _ready && !_child_joined ) { 300 _child.join(); 301 _child_joined = true; 302 _commands.close(); 303 _box._pair.close(); 304 } 305 return _ready; 306 } 307 308 final auto run() 309 in(_child is null) 310 { 311 class CommandsHandler : FileEventHandler 312 { 313 override string describe() 314 { 315 return "CommandsHandler"; 316 } 317 override void eventHandler(int fd, AppEvent e) 318 { 319 assert(fd == _commands[0]); 320 if ( e & AppEvent.SHUTDOWN) 321 { 322 // just ignore 323 return; 324 } 325 auto b = _commands.read(0, 1); 326 final switch(b[0]) 327 { 328 case Commands.StopLoop: 329 debug safe_tracef("got stopLoop command"); 330 getDefaultLoop.stop(); 331 break; 332 case Commands.WakeUpLoop: 333 debug safe_tracef("got WakeUpLoop command"); 334 break; 335 case Commands.ShutdownLoop: 336 debug tracef("got Shutdown command"); 337 getDefaultLoop.shutdown(); 338 break; 339 } 340 } 341 } 342 this._child = new Thread( 343 { 344 getDefaultLoop.startPoll(_commands[0], AppEvent.IN, new CommandsHandler()); 345 try { 346 debug safe_tracef("calling"); 347 static if (!Void) { 348 _box._data = App(_f, _args); 349 } 350 else { 351 App(_f, _args); 352 } 353 } 354 catch (Throwable e) { 355 _box._exception = e; 356 } 357 ubyte[1] b = [0]; 358 _ready = true; 359 auto s = _box._pair.write(1, b); 360 // clean up everything and release memory 361 uninitializeLoops(); // close fds, free memory 362 } 363 ); 364 this._child.isDaemon = _isDaemon; 365 if ( !_isDaemon ) 366 { 367 _box._pair = makeSocketPair(); 368 _commands = makeSocketPair(); 369 } 370 this._child.start(); 371 return this; 372 } 373 } 374 375 /// 376 /// Task. Exacute computation. Inherits from Fiber 377 /// you can start, wait, check for readiness. 378 /// 379 class Task(F, A...) : Computation if (isCallable!F) { 380 private enum Void = is(ReturnType!F==void); 381 alias start = call; 382 debug private 383 { 384 static ulong task_id; 385 ulong _task_id; 386 } 387 private { 388 alias R = ReturnType!F; 389 390 F _f; 391 A _args; 392 bool _ready; 393 bool _daemon; 394 // Notification _done; 395 Fiber _waitor; 396 Throwable _exception; 397 398 Fiber _executor; 399 static if ( !Void ) { 400 R _result; 401 } 402 } 403 404 final this(F f, A args) @safe 405 { 406 _f = f; 407 _args = args; 408 _waitor = null; 409 _exception = null; 410 static if (!Void) { 411 _result = R.init; 412 } 413 if (fiberPoolSize>0) 414 { 415 _executor = fiberPool[--fiberPoolSize]; 416 () @trusted { 417 _executor.reset(&run); 418 }(); 419 } 420 else 421 { 422 () @trusted { 423 _executor = new Fiber(&run, TASK_STACK_SIZE); 424 }(); 425 } 426 debug 427 { 428 _task_id = task_id++; 429 debug tracef("t:%0X task %s created", Thread.getThis.id, _task_id); 430 } 431 //super(&run); 432 } 433 Throwable call(Fiber.Rethrow r = Fiber.Rethrow.yes) 434 { 435 return _executor.call(r); 436 } 437 void daemon(bool v) 438 { 439 _daemon = v; 440 } 441 void start() @trusted 442 { 443 _executor.call(); 444 } 445 void reset() 446 { 447 _executor.reset(); 448 } 449 auto state() 450 { 451 return _executor.state; 452 } 453 static int fiberPoolSize; 454 enum FiberPoolCapacity = 1024; 455 static Fiber[FiberPoolCapacity] fiberPool; 456 /// 457 /// wait() - wait forewer 458 /// wait(Duration) - wait with timeout 459 /// 460 override bool wait(Duration timeout = Duration.max) { 461 debug trace("enter wait"); 462 assert(!_daemon); 463 if ( _ready || timeout <= 0.msecs ) 464 { 465 if ( _exception !is null ) { 466 throw _exception; 467 } 468 return _ready; 469 } 470 assert(this._waitor is null, "You can't wait twice"); 471 this._waitor = Fiber.getThis(); 472 assert(_waitor !is null, "You can wait task only from another task or fiber"); 473 Timer t; 474 if ( timeout < Duration.max) 475 { 476 477 t = new Timer(timeout, (AppEvent e) @trusted { 478 debug tracef("Event %s on 'wait' timer", e); 479 if (_waitor) 480 { 481 auto w = _waitor; 482 _waitor = null; 483 w.call(Fiber.Rethrow.no); 484 } 485 }); 486 try 487 { 488 getDefaultLoop().startTimer(t); 489 } 490 catch(LoopShutdownException e) 491 { 492 // this can happens if we are in shutdown process 493 t = null; 494 } 495 } 496 debug tracef("yeilding task"); 497 Fiber.yield(); 498 debug tracef("wait continue, task state %s", _executor.state); 499 if ( t ) 500 { 501 getDefaultLoop().stopTimer(t); 502 } 503 if (fiberPoolSize < FiberPoolCapacity ) 504 { 505 fiberPool[fiberPoolSize++] = _executor; 506 } 507 if ( _exception !is null ) 508 { 509 throw _exception; 510 } 511 return _ready; 512 } 513 514 static if (!Void) { 515 auto waitResult() { 516 wait(); 517 enforce(_ready); 518 return _result; 519 } 520 } 521 522 override bool ready() const { 523 return _ready; 524 } 525 static if (!Void) { 526 @property 527 final auto result() const { 528 enforce!NotReadyException(_ready, "You can't get result from not ready task"); 529 return _result; 530 } 531 alias value = result; 532 } 533 private final void run() { 534 static if ( Void ) 535 { 536 try 537 { 538 _f(_args); 539 } 540 catch (Throwable e) 541 { 542 _exception = e; 543 version(unittest) 544 { 545 debug tracef("got throwable %s", e); 546 } 547 else 548 { 549 errorf("got throwable %s", e); 550 } 551 } 552 } 553 else 554 { 555 try 556 { 557 _result = _f(_args); 558 } 559 catch(Throwable e) 560 { 561 _exception = e; 562 version(unittest) 563 { 564 debug tracef("got throwable %s", e); 565 } 566 else 567 { 568 errorf("got throwable %s", e); 569 } 570 } 571 //debug tracef("run finished, result: %s, waitor: %s", _result, this._waitor); 572 } 573 this._ready = true; 574 if ( !_daemon && this._waitor ) { 575 auto w = this._waitor; 576 this._waitor = null; 577 debug tracef("t:%0X task %s finished, wakeup waitor", Thread.getThis.id, _task_id); 578 w.call(); 579 return; 580 } 581 else 582 { 583 debug tracef("t:%0X task %s finsihed, no one to wake up(isdaemon=%s)", Thread.getThis.id, _task_id, _daemon); 584 } 585 if ( _daemon ) 586 { 587 Fiber.getThis.reset(); 588 return; 589 } 590 if (fiberPoolSize < FiberPoolCapacity ) 591 { 592 fiberPool[fiberPoolSize++] = _executor; 593 } 594 } 595 } 596 597 auto task(F, A...)(F f, A a) { 598 return new Task!(F,A)(f, a); 599 } 600 601 auto threaded(F, A...)(F f, A a) { 602 return new Threaded!(F, A)(f, a); 603 } 604 605 unittest { 606 int i; 607 int f(int s) { 608 i+=s; 609 return(i); 610 } 611 auto t = task(&f, 1); 612 t.start(); 613 t.wait(); 614 assert(t.result == 1); 615 assert(i==1, "i=%d, expected 1".format(i)); 616 assert(t.result == 1, "result: %d, expected 1".format(t.result)); 617 } 618 619 unittest { 620 auto v = App(function int() { 621 Duration f(Duration t) 622 { 623 hlSleep(t); 624 return t; 625 } 626 627 auto t100 = task(&f, 100.msecs); 628 auto t200 = task(&f, 200.msecs); 629 t100.start; 630 t200.start; 631 t100.wait(); 632 return 1; 633 }); 634 assert(v == 1); 635 } 636 637 unittest 638 { 639 // test wakeup thread loop and stop thread loop 640 globalLogLevel = LogLevel.info; 641 App({ 642 bool canary = true; 643 auto t = threaded({ 644 hlSleep(10.seconds); 645 canary = false; 646 }).start; 647 hlSleep(100.msecs); 648 t.wakeUpThreadLoop(); 649 hlSleep(500.msecs); 650 t.stopThreadLoop(); 651 assert(canary); 652 //t.wait(); 653 }); 654 globalLogLevel = LogLevel.info; 655 } 656 657 unittest 658 { 659 globalLogLevel = LogLevel.info; 660 auto v = App(function int() { 661 Duration f(Duration t) 662 { 663 hlSleep(t); 664 return t; 665 } 666 667 auto t100 = threaded(&f, 100.msecs).start; 668 auto t200 = threaded(&f, 200.msecs).start; 669 t200.wait(100.msecs); 670 assert(!t200.ready); 671 t100.wait(300.msecs); 672 assert(t100.ready); 673 assert(t100.value == 100.msecs); 674 t200.wait(); 675 return 1; 676 }); 677 assert(v == 1); 678 } 679 680 unittest 681 { 682 import core.memory; 683 // create lot of "daemon" tasks to check how they will survive GC collection. 684 // 2019-12-01T22:38:53.301 [info] scheduler.d:783:__lambda2 create 10000 tasks 685 // 2019-12-01T22:38:55.731 [info] scheduler.d:856:__unittest_L840_C1 test1 ok in FALLBACK mode 686 enum tasks = 10_000; 687 int N; 688 void t0(int i) { 689 if (i%5==0) 690 hlSleep((i%1000).msecs); 691 N++; 692 // if (N==tasks) 693 // { 694 // getDefaultLoop().stop(); 695 // } 696 } 697 App({ 698 infof(" create %d tasks", tasks); 699 iota(tasks).each!((int i){ 700 auto t = task(&t0, i); 701 t.start(); 702 }); 703 GC.collect(); 704 infof(" created, sleep and let all timers to expire"); 705 // globalLogLevel = LogLevel.trace; 706 hlSleep(2.seconds); 707 }); 708 assert(N==tasks); 709 info("done"); 710 } 711 712 unittest { 713 // 714 // just to test that we received correct value at return 715 // 716 globalLogLevel = LogLevel.info; 717 auto mode = globalLoopMode; 718 scope(exit) { 719 globalLoopMode = mode; 720 } 721 foreach(m; [Mode.FALLBACK, Mode.NATIVE]) { 722 globalLoopMode = m; 723 int f() { 724 return 1; 725 } 726 auto v = App(&f); 727 assert(v == 1, "expected v==1, but received v=%d".format(v)); 728 infof("test1 ok in %s mode", m); 729 } 730 } 731 732 unittest { 733 // 734 // call sleep in spawned thread 735 // 736 globalLogLevel = LogLevel.info; 737 auto mode = globalLoopMode; 738 foreach(m; [Mode.FALLBACK, Mode.NATIVE]) { 739 globalLoopMode = m; 740 int f() { 741 hlSleep(200.msecs); 742 return 2; 743 } 744 auto v = App(&f); 745 assert(v == 2, "expected v==2, but received v=%d".format(v)); 746 infof("test2 ok in %s mode", m); 747 } 748 globalLoopMode = mode; 749 } 750 751 version(unittest) { 752 class TestException : Exception { 753 this(string msg, string file = __FILE__, size_t line = __LINE__) { 754 super(msg, file, line); 755 } 756 } 757 } 758 759 unittest { 760 // 761 // test exception delivery when called from thread 762 // 763 globalLogLevel = LogLevel.info; 764 auto mode = globalLoopMode; 765 foreach(m; [Mode.FALLBACK, Mode.NATIVE]) { 766 globalLoopMode = m; 767 int f() { 768 hlSleep(200.msecs); 769 throw new TestException("test exception"); 770 } 771 assertThrown!TestException(App(&f)); 772 infof("test3a ok in %s mode", m); 773 } 774 globalLoopMode = mode; 775 } 776 777 unittest { 778 // 779 // test exception delivery when called from task 780 // 781 globalLogLevel = LogLevel.info; 782 auto mode = globalLoopMode; 783 scope(exit) { 784 globalLoopMode = mode; 785 } 786 foreach(m; [Mode.FALLBACK, Mode.NATIVE]) { 787 globalLoopMode = m; 788 int f() { 789 auto t = task((){ 790 hlSleep(200.msecs); 791 throw new TestException("test exception"); 792 }); 793 t.start(); 794 t.wait(300.msecs); 795 return 0; 796 } 797 assertThrown!TestException(App(&f)); 798 infof("test3b ok in %s mode", m); 799 } 800 } 801 802 unittest { 803 // 804 // test wait with timeout 805 // 806 globalLogLevel = LogLevel.info; 807 auto mode = globalLoopMode; 808 scope(exit) { 809 globalLoopMode = mode; 810 } 811 foreach(m; [Mode.FALLBACK, Mode.NATIVE]) { 812 globalLoopMode = m; 813 int f0() { 814 hlSleep(100.msecs); 815 return 4; 816 } 817 int f() { 818 auto t = task(&f0); 819 t.call(); 820 t.wait(); 821 return t.result; 822 } 823 auto r = App(&f); 824 assert(r == 4, "App returned %d, expected 4".format(r)); 825 infof("test4 ok in %s mode", m); 826 } 827 } 828 829 unittest { 830 // 831 // test calling void function 832 // 833 globalLogLevel = LogLevel.info; 834 auto mode = globalLoopMode; 835 scope(exit) { 836 globalLoopMode = mode; 837 } 838 foreach(m; [Mode.FALLBACK, Mode.NATIVE]) { 839 globalLoopMode = m; 840 void f() { 841 hlSleep(200.msecs); 842 } 843 App(&f); 844 infof("test6 ok in %s mode", m); 845 } 846 } 847 848 849 unittest { 850 globalLogLevel = LogLevel.info; 851 auto mode = globalLoopMode; 852 scope(exit) { 853 globalLoopMode = mode; 854 } 855 foreach(m; [Mode.FALLBACK, Mode.NATIVE]) { 856 globalLoopMode = m; 857 int f0() { 858 hlSleep(100.msecs); 859 tracef("sleep done"); 860 return 6; 861 } 862 int f() { 863 auto v = App(&f0); 864 tracef("got value %s", v); 865 return v+1; 866 } 867 auto r = App(&f); 868 assert(r == 7, "App returned %d, expected 7".format(r)); 869 infof("test7 ok in %s mode", m); 870 } 871 } 872 873 // 874 // split array on N balanced chunks 875 // (not on chunks with N members) 876 // 877 private auto splitn(T)(T a, size_t slices) { 878 T[] r; 879 if (a.length == 0) { 880 return r; 881 } 882 if (a.length % slices == 0) { 883 return chunks(a, a.length / slices).array; 884 } 885 int n; 886 while (n < a.length) { 887 auto rest = a.length - n; 888 auto done = slices - r.length; 889 auto size = rest % done ? (rest / done + 1) : rest / done; 890 r ~= a[n .. n + size]; 891 n += size; 892 } 893 return r; 894 } 895 unittest { 896 for(int n=1; n<100; n++) { 897 for (int slices = 1; slices < n; slices++) { 898 auto r = splitn(iota(n).array, slices); 899 assert(r.length == slices); 900 assert(equal(iota(n), r.join)); 901 } 902 } 903 } 904 905 // Map array on M threads and N fibers 906 // Non lazy. Return void if f is void. 907 // : 908 // /|\ 909 // / | \ 910 // ->/ | \<- M threads 911 // / | \ 912 // N N N 913 // /|\ /|\ /|\ 914 // ||| ||| ||| 915 // ||| |||->|||<- N fibers 916 // fff fff fff 917 // ... ... ... 918 // ... ... ... <- r splitted over MxN fibers 919 // ... ... .. 920 // 921 auto mapMxN(F, R)(R r, F f, ulong m, ulong n) { 922 enum Void = is(ReturnType!F == void); 923 struct V 924 { 925 Throwable exception; 926 static if (!Void) 927 { 928 ReturnType!F _value; 929 auto value() inout 930 { 931 check(); 932 return _value; 933 } 934 alias value this; 935 } 936 void check() inout 937 { 938 if (exception) 939 { 940 throw exception; 941 } 942 } 943 } 944 assert(m > 0 && n > 0 && r.length > 0, "should be m > 0 && n > 0 && r.length > 0, you have %d,%d,%d".format(m,n,r.length)); 945 946 m = min(m, r.length); 947 948 auto fiberWorker(R fiber_chunk) { 949 V[] result; 950 foreach(ref c; fiber_chunk) 951 { 952 try 953 { 954 static if (!Void) 955 { 956 result ~= V(null, f(c)); 957 } 958 else 959 { 960 f(c); 961 result ~= V(); 962 } 963 } 964 catch(Throwable e) 965 { 966 result ~= V(e); 967 } 968 } 969 return result; 970 } 971 972 auto threadWorker(R thread_chunk) { 973 auto fibers = thread_chunk.splitn(n). // split on N chunks 974 map!(fiber_chunk => task(&fiberWorker, fiber_chunk)). // start fiber over each chunk 975 array; 976 fibers.each!"a.start"; 977 debug tracef("%d tasks started", fibers.length); 978 auto ready = fibers.map!"a.wait".array; 979 assert(ready.all); 980 return fibers.map!"a.value".join; 981 } 982 auto threads = r.splitn(m). // split on M chunks 983 map!(thread_chunk => threaded(&threadWorker, thread_chunk)). // start thread over each chunk 984 array; 985 threads.each!"a.start"; 986 debug tracef("threads started"); 987 threads.each!"a.wait"; 988 debug tracef("threads finished"); 989 return threads.map!"a.value".array.join; 990 // auto ready = threads.map!"a.wait".array; 991 // assert(ready.all); 992 // return threads.map!"a.value".join; 993 } 994 995 // map array on M threads 996 // Non lazy. Return void if f is void. 997 // : 998 // /|\ 999 // / | \ 1000 // ->/ | \<- M threads 1001 // / | \ 1002 // f f f 1003 // . . . 1004 // . . . <- r splitted over M threads 1005 // . . 1006 // 1007 auto mapM(R, F)(R r, F f, ulong m) if (isArray!R) { 1008 enum Void = is(ReturnType!F == void); 1009 1010 assert(m > 0 && r.length > 0); 1011 1012 m = min(m, r.length); 1013 1014 static if (Void) { 1015 void threadWorker(R chunk) { 1016 chunk.each!f; 1017 } 1018 } else { 1019 auto threadWorker(R chunk) { 1020 return chunk.map!f.array; 1021 } 1022 } 1023 1024 auto threads = r.splitn(m).map!(thread_chunk => threaded(&threadWorker, thread_chunk).start).array; 1025 1026 threads.each!"a.wait"; 1027 1028 static if (!Void) { 1029 return threads.map!"a.value".array.join; 1030 } 1031 } 1032 1033 unittest { 1034 import std.range; 1035 import std.stdio; 1036 import core.atomic; 1037 1038 shared int cnt; 1039 1040 void f0(int arg) { 1041 atomicOp!"+="(cnt,arg); 1042 } 1043 1044 int f1(int i) { 1045 return i * i; 1046 } 1047 1048 int[] f2(int i) { 1049 return [i,i+1]; 1050 } 1051 1052 App({ 1053 auto r = iota(20).array.mapM(&f1, 5); 1054 assert(equal(r, iota(20).map!"a*a")); 1055 }); 1056 1057 App({ 1058 auto r = iota(20).array.mapM(&f2, 5); 1059 assert(equal(r, iota(20).map!"[a, a+1]")); 1060 }); 1061 1062 App({ 1063 // woid function, updates shared counter 1064 iota(20).array.mapM(&f0, 5); 1065 assert(cnt == 190); 1066 }); 1067 1068 cnt = 0; 1069 App({ 1070 // woid function, updates shared counter 1071 iota(20).array.mapMxN(&f0, 2, 3); 1072 assert(cnt == 190); 1073 }); 1074 1075 App({ 1076 auto r = iota(20).array.mapMxN(&f1, 1, 1); 1077 assert(equal(r.map!"a.value", iota(20).map!"a*a")); 1078 }); 1079 } 1080 unittest 1081 { 1082 globalLogLevel = LogLevel.info; 1083 // test shutdown 1084 App({ 1085 void a() 1086 { 1087 hlSleep(100.msecs); 1088 getDefaultLoop.shutdown(); 1089 } 1090 void b() 1091 { 1092 try 1093 { 1094 hlSleep(1.seconds); 1095 assert(0, "have to be interrupted"); 1096 } 1097 catch (LoopShutdownException e) 1098 { 1099 debug tracef("got what expected"); 1100 } 1101 } 1102 auto ta = task(&a); 1103 auto tb = task(&b); 1104 ta.start(); 1105 tb.start(); 1106 ta.wait(); 1107 tb.wait(); 1108 }); 1109 uninitializeLoops(); 1110 globalLogLevel = LogLevel.info; 1111 }