1 module hio.scheduler;
2 
3 import std.experimental.logger;
4 
5 import core.thread;
6 import core.sync.mutex;
7 import std.concurrency;
8 import std.datetime;
9 import std.format;
10 import std.traits;
11 import std.exception;
12 import core.sync.condition;
13 import std.algorithm;
14 import std.typecons;
15 import std.range;
16 
17 //import core.stdc.string;
18 //import core.stdc.errno;
19 
20 //static import core.sys.posix.unistd;
21 
22 private enum PAGESIZE = 4*1024;
23 /// stack size for new tasks
24 shared int TASK_STACK_SIZE = 16 * PAGESIZE;
25 
26 import hio.events;
27 import hio.loop;
28 import hio.common;
29 
30 import std.stdio;
31 
32 struct TaskNotReady {
33     string msg;
34 }
35 
36 class NotReadyException : Exception
37 {
38     this(string msg, string file = __FILE__, size_t line = __LINE__)
39     {
40         super(msg, file, line);
41     }
42 }
43 
44 void hlSleep(Duration d) {
45     if ( d <= 0.seconds) {
46         return;
47     }
48     auto tid = Fiber.getThis();
49     assert(tid !is null);
50     bool shutdown;
51     auto callback = delegate void (AppEvent e) @trusted
52     {
53         debug tracef("got event %s while sleeping", e);
54         if ( e & AppEvent.SHUTDOWN)
55         {
56             shutdown = true;
57         }
58         tid.call(Fiber.Rethrow.no);
59     };
60     auto t = new Timer(d, callback);
61     getDefaultLoop().startTimer(t);
62     Fiber.yield();
63     if (shutdown)
64     {
65         throw new LoopShutdownException("got loop shutdown");
66     }
67 }
68 
69 struct Box(T) {
70 
71     enum Void = is(T == void);
72 
73     static if (!Void) {
74         T   _data;
75     }
76     SocketPair  _pair;
77     Throwable   _exception;
78     @disable this(this);
79 }
80 
81 ReturnType!F App(F, A...) (F f, A args) {
82     alias R = ReturnType!F;
83     Box!R box;
84     static if (!box.Void)
85     {
86         R r;
87     }
88     void _wrapper()
89     {
90         try
91         {
92             static if (!box.Void)
93             {
94                 r = f(args);
95                 box._data = r;
96             }
97             else
98             {
99                 f(args);
100             }
101         }
102         catch (Throwable e)
103         {
104             version(unittest)
105             {
106                 debug tracef("app throwed %s", e);
107             }
108             else
109             {
110                 errorf("app throwed %s", e);
111             }
112             box._exception = e;
113         }
114         getDefaultLoop().stop();
115     }
116 
117     // shared void delegate() run = () {
118     //     //
119     //     // in the child thread:
120     //     // 1. start new fiber (task over wrapper) with user supplied function
121     //     // 2. start event loop forewer
122     //     // 3. when eventLoop done(stopped inside from wrapper) the task will finish
123     //     // 4. store value in box and use socketpair to send signal to caller thread
124     //     //
125     //     auto t = task(&_wrapper);
126     //     auto e = t.start(Fiber.Rethrow.no);
127     //     if ( box._exception is null ) { // box.exception can be filled before Fiber start
128     //         box._exception = e;
129     //     }
130     //     getDefaultLoop.run(Duration.max);
131     //     //t.reset();
132     // };
133     // Thread child = new Thread(run);
134     // child.start();
135     // child.join();
136     //run();
137     auto t = task(&_wrapper);
138     auto e = t.start(Fiber.Rethrow.no);
139     if ( !box._exception )
140     {
141         // box.exception can be filled before Fiber start
142         box._exception = e;
143     }
144     if ( !box._exception)
145     {
146         // if we started ok - run loop
147         getDefaultLoop().run(Duration.max);
148     }
149     if (box._exception)
150     {
151         throw box._exception;
152     }
153     static if (!box.Void)
154     {
155         debug tracef("joined, value = %s", box._data);
156         return box._data;
157     }
158     else
159     {
160         debug tracef("joined");
161     }
162 }
163 
164 interface Computation {
165     bool ready();
166     bool wait(Duration t = Duration.max);
167 }
168 
169 enum Commands
170 {
171     StopLoop,
172     WakeUpLoop,
173     ShutdownLoop,
174 }
175 ///
176 class Threaded(F, A...) : Computation if (isCallable!F) {
177     alias start = run;
178     private {
179         alias R = ReturnType!F;
180 
181         F           _f;
182         A           _args;
183         bool        _ready = false;
184         Thread      _child;
185         bool        _child_joined; // did we called _child.join?
186         Fiber       _parent;
187         Box!R       _box;
188         Timer       _t;
189         enum Void = _box.Void;
190         bool        _isDaemon;
191         SocketPair  _commands;
192     }
193     final this(F f, A args) {
194         _f = f;
195         _args = args;
196     }
197 
198     override bool ready() {
199         return _ready;
200     }
201     auto isDaemon(bool v)
202     in(_child is null) // you can't change this after thread started
203     {
204         _isDaemon = v;
205         return this;
206     }
207     auto isDaemon()
208     {
209         return _isDaemon;
210     }
211     static if (!Void) {
212         R value() {
213             if (_ready)
214                 return _box._data;
215             throw new NotReadyException("You can't call value for non-ready task");
216         }
217     }
218     void stopThreadLoop()
219     {
220         debug tracef("stopping loop in thread");
221         ubyte[1] cmd = [Commands.StopLoop];
222         auto s = _commands.write(1, cmd);
223         assert(s == 1);
224     }
225     void wakeUpThreadLoop()
226     {
227         debug tracef("waking up loop in thread");
228         ubyte[1] cmd = [Commands.WakeUpLoop];
229         auto s = _commands.write(1, cmd);
230         assert(s == 1);
231     }
232     void shutdownThreadLoop()
233     {
234         debug tracef("shutdown loop in thread");
235         ubyte[1] cmd = [Commands.ShutdownLoop];
236         auto s = _commands.write(1, cmd);
237         assert(s == 1);
238     }
239     override bool wait(Duration timeout = Duration.max)
240     in(!_isDaemon)      // this not works with daemons
241     in(_child !is null) // you can wait only for started tasks
242     {
243         if (_ready) {
244             if ( !_child_joined ) {
245                 _child.join();
246                 _child_joined = true;
247                 _commands.close();
248                 _box._pair.close();
249             }
250             if ( _box._exception ) {
251                 throw _box._exception;
252             }
253             return true;
254         }
255         if ( timeout <= 0.seconds )
256         {
257             // this is poll
258             return _ready;
259         }
260         if ( timeout < Duration.max )
261         {
262             // rize timer
263             _t = new Timer(timeout, (AppEvent e) @trusted {
264                 getDefaultLoop().stopPoll(_box._pair[0], AppEvent.IN);
265                 debug tracef("threaded timed out");
266                 auto throwable = _parent.call(Fiber.Rethrow.no);
267                 _t = null;
268             });
269             getDefaultLoop().startTimer(_t);
270         }
271         // wait on the pair
272         final class ThreadEventHandler : FileEventHandler
273         {
274             override string describe() @safe
275             {
276                 return
277                     "thread event handler: f(args): %s(%s)".format(_f, _args);
278             }
279             override void eventHandler(int fd, AppEvent e) @trusted
280             {
281                 _box._pair.read(0, 1);
282                 getDefaultLoop().stopPoll(_box._pair[0], AppEvent.IN);
283                 getDefaultLoop().detach(_box._pair[0]);
284                 debug tracef("threaded done");
285                 if ( _t ) {
286                     getDefaultLoop.stopTimer(_t);
287                     _t = null;
288                 }
289                 auto throwable = _parent.call(Fiber.Rethrow.no);
290             }
291         }
292         _parent = Fiber.getThis();
293         assert(_parent, "You can call this only trom fiber");
294         debug tracef("wait - start listen on socketpair");
295         //auto eh = new ThreadEventHandler();
296         getDefaultLoop().startPoll(_box._pair[0], AppEvent.IN, new ThreadEventHandler());
297         Fiber.yield();
298         debug tracef("wait done");
299         if ( _ready && !_child_joined ) {
300             _child.join();
301             _child_joined = true;
302             _commands.close();
303             _box._pair.close();
304         }
305         return _ready;
306     }
307 
308     final auto run()
309     in(_child is null)
310     {
311         class CommandsHandler : FileEventHandler
312         {
313             override string describe()
314             {
315                 return "CommandsHandler";
316             }
317             override void eventHandler(int fd, AppEvent e)
318             {
319                 assert(fd == _commands[0]);
320                 if ( e & AppEvent.SHUTDOWN)
321                 {
322                     // just ignore
323                     return;
324                 }
325                 auto b = _commands.read(0, 1);
326                 final switch(b[0])
327                 {
328                     case Commands.StopLoop:
329                         debug safe_tracef("got stopLoop command");
330                         getDefaultLoop.stop();
331                         break;
332                     case Commands.WakeUpLoop:
333                         debug safe_tracef("got WakeUpLoop command");
334                         break;
335                     case Commands.ShutdownLoop:
336                         debug tracef("got Shutdown command");
337                         getDefaultLoop.shutdown();
338                         break;
339                 }
340             }
341         }
342         this._child = new Thread(
343             {
344                 getDefaultLoop.startPoll(_commands[0], AppEvent.IN, new CommandsHandler());
345                 try {
346                     debug safe_tracef("calling");
347                     static if (!Void) {
348                         _box._data = App(_f, _args);
349                     }
350                     else {
351                         App(_f, _args);
352                     }
353                 }
354                 catch (Throwable e) {
355                     _box._exception = e;
356                 }
357                 ubyte[1] b = [0];
358                 _ready = true;
359                 auto s = _box._pair.write(1, b);
360                 // clean up everything and release memory
361                 uninitializeLoops(); // close fds, free memory
362             }
363         );
364         this._child.isDaemon = _isDaemon;
365         if ( !_isDaemon )
366         {
367             _box._pair = makeSocketPair();
368             _commands = makeSocketPair();
369         }
370         this._child.start();
371         return this;
372     }
373 }
374 
375 ///
376 /// Task. Exacute computation. Inherits from Fiber
377 /// you can start, wait, check for readiness.
378 ///
379 class Task(F, A...) : Computation if (isCallable!F) {
380     private enum  Void = is(ReturnType!F==void);
381     alias start = call;
382     debug private
383     {
384         static ulong task_id;
385         ulong        _task_id;
386     }
387     private {
388         alias R = ReturnType!F;
389 
390         F            _f;
391         A            _args;
392         bool         _ready;
393         bool         _daemon;
394         // Notification _done;
395         Fiber        _waitor;
396         Throwable    _exception;
397 
398         Fiber        _executor;
399         static if ( !Void ) {
400             R       _result;
401         }
402     }
403 
404     final this(F f, A args) @safe
405     {
406         _f = f;
407         _args = args;
408         _waitor = null;
409         _exception = null;
410         static if (!Void) {
411             _result = R.init;
412         }
413         if (fiberPoolSize>0)
414         {
415             _executor = fiberPool[--fiberPoolSize];
416             () @trusted {
417                 _executor.reset(&run);
418             }();
419         }
420         else
421         {
422             () @trusted {
423                 _executor = new Fiber(&run, TASK_STACK_SIZE);
424             }();
425         }
426         debug
427         {
428             _task_id = task_id++;
429             debug tracef("t:%0X task %s created", Thread.getThis.id, _task_id);
430         }
431         //super(&run);
432     }
433     Throwable call(Fiber.Rethrow r = Fiber.Rethrow.yes)
434     {
435         return _executor.call(r);
436     }
437     void daemon(bool v)
438     {
439         _daemon = v;
440     }
441     void start() @trusted
442     {
443         _executor.call();
444     }
445     void reset()
446     {
447         _executor.reset();
448     }
449     auto state()
450     {
451         return _executor.state;
452     }
453     static int fiberPoolSize;
454     enum   FiberPoolCapacity = 1024;
455     static Fiber[FiberPoolCapacity] fiberPool;
456     ///
457     /// wait() - wait forewer
458     /// wait(Duration) - wait with timeout
459     /// 
460     override bool wait(Duration timeout = Duration.max) {
461         debug trace("enter wait");
462         assert(!_daemon);
463         if ( _ready || timeout <= 0.msecs )
464         {
465             if ( _exception !is null ) {
466                 throw _exception;
467             }
468             return _ready;
469         }
470         assert(this._waitor is null, "You can't wait twice");
471         this._waitor = Fiber.getThis();
472         assert(_waitor !is null, "You can wait task only from another task or fiber");
473         Timer t;
474         if ( timeout < Duration.max) 
475         {
476 
477            t = new Timer(timeout, (AppEvent e) @trusted {
478                 debug tracef("Event %s on 'wait' timer", e);
479                 if (_waitor)
480                 {
481                     auto w = _waitor;
482                     _waitor = null;
483                     w.call(Fiber.Rethrow.no);
484                 }
485             });
486             try
487             {
488                 getDefaultLoop().startTimer(t);
489             }
490             catch(LoopShutdownException e)
491             {
492                 // this can happens if we are in shutdown process
493                 t = null;
494             }
495         }
496         debug tracef("yeilding task");
497         Fiber.yield();
498         debug tracef("wait continue, task state %s", _executor.state);
499         if ( t )
500         {
501             getDefaultLoop().stopTimer(t);
502         }
503         if (fiberPoolSize < FiberPoolCapacity )
504         {
505             fiberPool[fiberPoolSize++] = _executor;
506         }
507         if ( _exception !is null )
508         {
509             throw _exception;
510         }
511         return _ready;
512     }
513 
514     static if (!Void) {
515         auto waitResult() {
516             wait();
517             enforce(_ready);
518             return _result;
519         }
520     }
521 
522     override bool ready() const {
523         return _ready;
524     }
525     static if (!Void) {
526         @property
527         final auto result() const {
528             enforce!NotReadyException(_ready, "You can't get result from not ready task");
529             return _result;
530         }
531         alias value = result;
532     }
533     private final void run() {
534         static if ( Void )
535         {
536             try
537             {
538                 _f(_args);
539             }
540             catch (Throwable e)
541             {
542                 _exception = e;
543                 version(unittest)
544                 {
545                     debug tracef("got throwable %s", e);
546                 }
547                 else
548                 {
549                     errorf("got throwable %s", e);
550                 }
551             }
552         }
553         else 
554         {
555             try
556             {
557                 _result = _f(_args);
558             }
559             catch(Throwable e)
560             {
561                 _exception = e;
562                 version(unittest)
563                 {
564                     debug tracef("got throwable %s", e);
565                 }
566                 else
567                 {
568                     errorf("got throwable %s", e);
569                 }
570             }
571             //debug tracef("run finished, result: %s, waitor: %s", _result, this._waitor);
572         }
573         this._ready = true;
574         if ( !_daemon && this._waitor ) {
575             auto w = this._waitor;
576             this._waitor = null;
577             debug tracef("t:%0X task %s finished, wakeup waitor", Thread.getThis.id, _task_id);
578             w.call();
579             return;
580         }
581         else
582         {
583             debug tracef("t:%0X task %s finsihed, no one to wake up(isdaemon=%s)", Thread.getThis.id, _task_id, _daemon);
584         }
585         if ( _daemon )
586         {
587             Fiber.getThis.reset();
588             return;
589         }
590         if (fiberPoolSize < FiberPoolCapacity )
591         {
592             fiberPool[fiberPoolSize++] = _executor;
593         }
594     }
595 }
596 
597 auto task(F, A...)(F f, A a) {
598     return new Task!(F,A)(f, a);
599 }
600 
601 auto threaded(F, A...)(F f, A a) {
602     return new Threaded!(F, A)(f, a);
603 }
604 
605 unittest {
606     int i;
607     int f(int s) {
608         i+=s;
609         return(i);
610     }
611     auto t = task(&f, 1);
612     t.start();
613     t.wait();
614     assert(t.result == 1);
615     assert(i==1, "i=%d, expected 1".format(i));
616     assert(t.result == 1, "result: %d, expected 1".format(t.result));
617 }
618 
619 unittest {
620     auto v = App(function int() {
621         Duration f(Duration t)
622         {
623             hlSleep(t);
624             return t;
625         }
626 
627         auto t100 = task(&f, 100.msecs);
628         auto t200 = task(&f, 200.msecs);
629         t100.start;
630         t200.start;
631         t100.wait();
632         return 1;
633     });
634     assert(v == 1);
635 }
636 
637 unittest
638 {
639     // test wakeup thread loop and stop thread loop
640     globalLogLevel = LogLevel.info;
641     App({
642         bool canary = true;
643         auto t = threaded({
644             hlSleep(10.seconds);
645             canary = false;
646         }).start;
647         hlSleep(100.msecs);
648         t.wakeUpThreadLoop();
649         hlSleep(500.msecs);
650         t.stopThreadLoop();
651         assert(canary);
652         //t.wait();
653     });
654     globalLogLevel = LogLevel.info;
655 }
656 
657 unittest
658 {
659     globalLogLevel = LogLevel.info;
660     auto v = App(function int() {
661         Duration f(Duration t)
662         {
663             hlSleep(t);
664             return t;
665         }
666 
667         auto t100 = threaded(&f, 100.msecs).start;
668         auto t200 = threaded(&f, 200.msecs).start;
669         t200.wait(100.msecs);
670         assert(!t200.ready);
671         t100.wait(300.msecs);
672         assert(t100.ready);
673         assert(t100.value == 100.msecs);
674         t200.wait();
675         return 1;
676     });
677     assert(v == 1);
678 }
679 
680 unittest
681 {
682     import core.memory;
683     // create lot of "daemon" tasks to check how they will survive GC collection.
684     // 2019-12-01T22:38:53.301 [info] scheduler.d:783:__lambda2  create 10000 tasks
685     // 2019-12-01T22:38:55.731 [info] scheduler.d:856:__unittest_L840_C1 test1 ok in FALLBACK mode
686     enum tasks = 10_000;
687     int N;
688     void t0(int i) {
689         if (i%5==0)
690             hlSleep((i%1000).msecs);
691         N++;
692         // if (N==tasks)
693         // {
694         //     getDefaultLoop().stop();
695         // }
696     }
697     App({
698         infof(" create %d tasks", tasks);
699         iota(tasks).each!((int i){
700             auto t = task(&t0, i);
701             t.start();
702         });
703         GC.collect();
704         infof(" created, sleep and let all timers to expire");
705 //        globalLogLevel = LogLevel.trace;
706         hlSleep(2.seconds);
707     });
708     assert(N==tasks);
709     info("done");
710 }
711 
712 unittest {
713     //
714     // just to test that we received correct value at return
715     //
716     globalLogLevel = LogLevel.info;
717     auto mode = globalLoopMode;
718     scope(exit) {
719         globalLoopMode = mode;
720     }
721     foreach(m; [Mode.FALLBACK, Mode.NATIVE]) {
722         globalLoopMode = m;
723         int f() {
724             return 1;
725         }
726         auto v = App(&f);
727         assert(v == 1, "expected v==1, but received v=%d".format(v));
728         infof("test1 ok in %s mode", m);
729     }
730 }
731 
732 unittest {
733     //
734     // call sleep in spawned thread
735     //
736     globalLogLevel = LogLevel.info;
737     auto mode = globalLoopMode;
738     foreach(m; [Mode.FALLBACK, Mode.NATIVE]) {
739         globalLoopMode = m;
740         int f() {
741             hlSleep(200.msecs);
742             return 2;
743         }
744         auto v = App(&f);
745         assert(v == 2, "expected v==2, but received v=%d".format(v));
746         infof("test2 ok in %s mode", m);
747     }
748     globalLoopMode = mode;
749 }
750 
751 version(unittest) {
752     class TestException : Exception {
753         this(string msg, string file = __FILE__, size_t line = __LINE__) {
754             super(msg, file, line);
755         }
756     }
757 }
758 
759 unittest {
760     //
761     // test exception delivery when called from thread
762     //
763     globalLogLevel = LogLevel.info;
764     auto mode = globalLoopMode;
765     foreach(m; [Mode.FALLBACK, Mode.NATIVE]) {
766         globalLoopMode = m;
767         int f() {
768             hlSleep(200.msecs);
769             throw new TestException("test exception");
770         }
771         assertThrown!TestException(App(&f));
772         infof("test3a ok in %s mode", m);
773     }
774     globalLoopMode = mode;
775 }
776 
777 unittest {
778     //
779     // test exception delivery when called from task
780     //
781     globalLogLevel = LogLevel.info;
782     auto mode = globalLoopMode;
783     scope(exit) {
784         globalLoopMode = mode;
785     }
786     foreach(m; [Mode.FALLBACK, Mode.NATIVE]) {
787         globalLoopMode = m;
788         int f() {
789             auto t = task((){
790                 hlSleep(200.msecs);
791                 throw new TestException("test exception");
792             });
793             t.start();
794             t.wait(300.msecs);
795             return 0;
796         }
797         assertThrown!TestException(App(&f));
798         infof("test3b ok in %s mode", m);
799     }
800 }
801 
802 unittest {
803     //
804     // test wait with timeout
805     //
806     globalLogLevel = LogLevel.info;
807     auto mode = globalLoopMode;
808     scope(exit) {
809         globalLoopMode = mode;
810     }
811     foreach(m; [Mode.FALLBACK, Mode.NATIVE]) {
812         globalLoopMode = m;
813         int f0() {
814             hlSleep(100.msecs);
815             return 4;
816         }
817         int f() {
818             auto t = task(&f0);
819             t.call();
820             t.wait();
821             return t.result;
822         }
823         auto r = App(&f);
824         assert(r == 4, "App returned %d, expected 4".format(r));
825         infof("test4 ok in %s mode", m);
826     }
827 }
828 
829 unittest {
830     //
831     // test calling void function
832     //
833     globalLogLevel = LogLevel.info;
834     auto mode = globalLoopMode;
835     scope(exit) {
836         globalLoopMode = mode;
837     }
838     foreach(m; [Mode.FALLBACK, Mode.NATIVE]) {
839         globalLoopMode = m;
840         void f() {
841             hlSleep(200.msecs);
842         }
843         App(&f);
844         infof("test6 ok in %s mode", m);
845     }
846 }
847 
848 
849 unittest {
850     globalLogLevel = LogLevel.info;
851     auto mode = globalLoopMode;
852     scope(exit) {
853         globalLoopMode = mode;
854     }
855     foreach(m; [Mode.FALLBACK, Mode.NATIVE]) {
856         globalLoopMode = m;
857         int f0() {
858             hlSleep(100.msecs);
859             tracef("sleep done");
860             return 6;
861         }
862         int f() {
863             auto v = App(&f0);
864             tracef("got value %s", v);
865             return v+1;
866         }
867         auto r = App(&f);
868         assert(r == 7, "App returned %d, expected 7".format(r));
869         infof("test7 ok in %s mode", m);
870     }
871 }
872 
873 //
874 // split array on N balanced chunks
875 // (not on chunks with N members)
876 //
877 private auto splitn(T)(T a, size_t slices) {
878     T[] r;
879     if (a.length == 0) {
880         return r;
881     }
882     if (a.length % slices == 0) {
883         return chunks(a, a.length / slices).array;
884     }
885     int n;
886     while (n < a.length) {
887         auto rest = a.length - n;
888         auto done = slices - r.length;
889         auto size = rest % done ? (rest / done + 1) : rest / done;
890         r ~= a[n .. n + size];
891         n += size;
892     }
893     return r;
894 }
895 unittest {
896     for(int n=1; n<100; n++) {
897         for (int slices = 1; slices < n; slices++) {
898             auto r = splitn(iota(n).array, slices);
899             assert(r.length == slices);
900             assert(equal(iota(n), r.join));
901         }
902     }
903 }
904 
905 // Map array on M threads and N fibers
906 // Non lazy. Return void if f is void.
907 //            :
908 //           /|\
909 //          / | \
910 //       ->/  |  \<- M threads
911 //        /   |   \
912 //       N    N    N
913 //      /|\  /|\  /|\
914 //      |||  |||  |||
915 //      |||  |||->|||<- N fibers
916 //      fff  fff  fff
917 //      ...  ...  ...
918 //      ...  ...  ... <- r splitted over MxN fibers
919 //      ...  ...  .. 
920 //
921 auto mapMxN(F, R)(R r, F f, ulong m, ulong n) {
922     enum Void = is(ReturnType!F == void);
923     struct V
924     {
925         Throwable exception;
926         static if (!Void)
927         {
928             ReturnType!F    _value;
929             auto value() inout
930             {
931                 check();
932                 return _value;
933             }
934             alias value this;
935         }
936         void check() inout
937         {
938             if (exception)
939             {
940                 throw exception;
941             }
942         }
943     }
944     assert(m > 0 && n > 0 && r.length > 0, "should be m > 0 && n > 0 && r.length > 0, you have %d,%d,%d".format(m,n,r.length));
945 
946     m = min(m, r.length);
947 
948     auto fiberWorker(R fiber_chunk) {
949         V[] result;
950         foreach(ref c; fiber_chunk)
951         {
952             try
953             {
954                 static if (!Void)
955                 {
956                     result ~= V(null, f(c));
957                 }
958                 else
959                 {
960                     f(c);
961                     result ~= V();
962                 }
963             }
964             catch(Throwable e)
965             {
966                 result ~= V(e);
967             }
968         }
969         return result;
970     }
971 
972     auto threadWorker(R thread_chunk) {
973         auto fibers = thread_chunk.splitn(n). // split on N chunks
974             map!(fiber_chunk => task(&fiberWorker, fiber_chunk)). // start fiber over each chunk
975             array;
976         fibers.each!"a.start";
977         debug tracef("%d tasks started", fibers.length);
978         auto ready = fibers.map!"a.wait".array;
979         assert(ready.all);
980         return fibers.map!"a.value".join;
981     }
982     auto threads = r.splitn(m). // split on M chunks
983         map!(thread_chunk => threaded(&threadWorker, thread_chunk)). // start thread over each chunk
984         array;
985     threads.each!"a.start";
986     debug tracef("threads started");
987     threads.each!"a.wait";
988     debug tracef("threads finished");
989     return threads.map!"a.value".array.join;
990     // auto ready = threads.map!"a.wait".array;
991     // assert(ready.all);
992     // return threads.map!"a.value".join;
993 }
994 
995 // map array on M threads
996 // Non lazy. Return void if f is void.
997 //            :
998 //           /|\
999 //          / | \
1000 //       ->/  |  \<- M threads
1001 //        /   |   \
1002 //       f    f    f
1003 //       .    .    . 
1004 //       .    .    .  <- r splitted over M threads
1005 //       .    .      
1006 //
1007 auto mapM(R, F)(R r, F f, ulong m) if (isArray!R) {
1008     enum Void = is(ReturnType!F == void);
1009 
1010     assert(m > 0 && r.length > 0);
1011 
1012     m = min(m, r.length);
1013 
1014     static if (Void) {
1015         void threadWorker(R chunk) {
1016             chunk.each!f;
1017         }
1018     } else {
1019         auto threadWorker(R chunk) {
1020             return chunk.map!f.array;
1021         }
1022     }
1023 
1024     auto threads = r.splitn(m).map!(thread_chunk => threaded(&threadWorker, thread_chunk).start).array;
1025 
1026     threads.each!"a.wait";
1027 
1028     static if (!Void) {
1029         return threads.map!"a.value".array.join;
1030     }
1031 }
1032 
1033 unittest {
1034     import std.range;
1035     import std.stdio;
1036     import core.atomic;
1037 
1038     shared int cnt;
1039 
1040     void f0(int arg) {
1041         atomicOp!"+="(cnt,arg);
1042     }
1043 
1044     int f1(int i) {
1045         return i * i;
1046     }
1047 
1048     int[] f2(int i) {
1049         return [i,i+1];
1050     }
1051 
1052     App({
1053         auto r = iota(20).array.mapM(&f1, 5);
1054         assert(equal(r, iota(20).map!"a*a"));
1055     });
1056 
1057     App({
1058         auto r = iota(20).array.mapM(&f2, 5);
1059         assert(equal(r, iota(20).map!"[a, a+1]"));
1060     });
1061 
1062     App({
1063         // woid function, updates shared counter
1064         iota(20).array.mapM(&f0, 5);
1065         assert(cnt == 190);
1066     });
1067 
1068     cnt = 0;
1069     App({
1070         // woid function, updates shared counter
1071         iota(20).array.mapMxN(&f0, 2, 3);
1072         assert(cnt == 190);
1073     });
1074 
1075     App({
1076         auto r = iota(20).array.mapMxN(&f1, 1, 1);
1077         assert(equal(r.map!"a.value", iota(20).map!"a*a"));
1078     });
1079 }
1080 unittest
1081 {
1082     globalLogLevel = LogLevel.info;
1083     // test shutdown
1084     App({
1085         void a()
1086         {
1087             hlSleep(100.msecs);
1088             getDefaultLoop.shutdown();
1089         }
1090         void b()
1091         {
1092             try
1093             {
1094                 hlSleep(1.seconds);
1095                 assert(0, "have to be interrupted");
1096             }
1097             catch (LoopShutdownException e)
1098             {
1099                 debug tracef("got what expected");
1100             }
1101         }
1102         auto ta = task(&a);
1103         auto tb = task(&b);
1104         ta.start();
1105         tb.start();
1106         ta.wait();
1107         tb.wait();
1108     });
1109     uninitializeLoops();
1110     globalLogLevel = LogLevel.info;
1111 }