1 module hio.socket;
2 
3 import std.typecons;
4 import std.string;
5 import std.conv;
6 import std.traits;
7 import std.datetime;
8 import std.exception;
9 import std.algorithm;
10 
11 import std.algorithm.comparison: min;
12 
13 import std.experimental.logger;
14 
15 import core.memory: pureMalloc, GC;
16 import core.exception : onOutOfMemoryError;
17 
18 import std.experimental.allocator;
19 import std.experimental.allocator.building_blocks;
20 
21 //static import std.socket;
22 
23 import std.socket;
24 
25 import core.sys.posix.sys.socket;
26 import core.sys.posix.unistd;
27 import core.sys.posix.arpa.inet;
28 import core.sys.posix.netinet.tcp;
29 import core.sys.posix.netinet.in_;
30 import core.sys.posix.sys.time : timeval;
31 
32 import core.sys.posix.fcntl;
33 
34 import core.stdc.string;
35 import core.stdc.errno;
36 
37 public import hio.events;
38 import hio.common;
39 //import nbuff;
40 
41 import hio;
42 
43 //alias Socket = RefCounted!SocketImpl;
44 
45 alias AcceptFunction = void function(int fileno);
46 alias AcceptDelegate = void delegate(hlSocket);
47 
48 // static ~this() {
49 //     trace("deinit");
50 // }
51 
52 //hlSocket[int] fd2so;
53 //
54 //void loopCallback(int fd, AppEvent ev) @safe {
55 //    debug tracef("loopCallback for %d", fd);
56 //    hlSocket s = fd2so[fd];
57 //    if ( s && s._fileno >= 0) {
58 //        debug tracef("calling handler(%s) for %s", appeventToString(ev), s);
59 //        s._handler(ev);
60 //    } else {
61 //        infof("impossible event %s on fd: %d", appeventToString(ev), fd);
62 //    }
63 //}
64 
65 class SocketException : Exception {
66     this(string msg, string file = __FILE__, size_t line = __LINE__) @safe {
67         super(msg, file, line);
68     }
69 }
70 
71 class ConnectionRefused : Exception {
72     this(string msg, string file = __FILE__, size_t line = __LINE__) @safe {
73         super(msg, file, line);
74     }
75 }
76 
77 class Timeout : Exception {
78     this(string msg, string file = __FILE__, size_t line = __LINE__) @safe {
79         super(msg, file, line);
80     }
81 }
82 
83 bool isLinux() pure nothrow @nogc @safe {
84     version(linux) {
85         return true;
86     } else {
87         return false;
88     }
89 }
90 
91 class hlSocket : FileEventHandler {
92     private {
93         enum State {
94             NEW = 0,
95             IDLE,
96             CONNECTING,
97             ACCEPTING,
98             IO,
99         }
100 
101         immutable ubyte      _af = AF_INET;
102         immutable int        _sock_type = SOCK_STREAM;
103         int                  _fileno = -1;
104         int                  _errno;
105         HandlerDelegate      _handler;
106         AppEvent             _polling = AppEvent.NONE;
107         size_t               _buffer_size = 16*1024;
108         hlEvLoop             _loop;
109         immutable string     _file;
110         immutable int        _line;
111         State                _state = State.NEW;
112         HandlerDelegate      _callback;
113         // accept related fields
114         void delegate(int) @safe _accept_callback;
115         // io related fields
116         IORequest            _iorq;
117         IOResult             _result;
118         Timer                _connect_timer;
119         Timer                _io_timer;
120         AppEvent             _pollingFor = AppEvent.NONE;
121         ubyte[]              _input;
122         bool                 _connected;
123         uint                 _accepts_in_a_row = 10;
124     }
125 
126     this(ubyte af = AF_INET, int sock_type = SOCK_STREAM, string f = __FILE__, int l =  __LINE__) @safe {
127         debug tracef("create socket");
128         _af = af;
129         _sock_type = sock_type;
130         _file = f;
131         _line = l;
132     }
133 
134     this(int s, ubyte af = AF_INET, int sock_type = 0, string f = __FILE__, int l =  __LINE__) @safe
135     in {assert(s>=0);}
136     body {
137         _af = af;
138         _sock_type = sock_type;
139         _fileno = s;
140         _file = f;
141         _line = l;
142         auto flags = (() @trusted => fcntl(_fileno, F_GETFL, 0) | O_NONBLOCK)();
143         (() @trusted => fcntl(_fileno, F_SETFL, flags))();
144     }
145 
146 
147     ~this() {
148         // if ( _fileno != -1 ) {
149         //     if ( _loop && _polling != AppEvent.NONE ) {
150         //         _loop.stopPoll(_fileno, _polling);
151         //         _loop.detach(_fileno);
152         //     }
153         //     //fd2so[_fileno] = null;
154         //     .close(_fileno);
155         //     _fileno = -1;
156         // }
157         // close();
158     }
159     
160     override string toString() const @safe {
161         import std.format: format;
162         return "socket: fileno: %d, (%s:%d)".format(_fileno, _file, _line);
163     }
164     public bool connected() const pure @safe nothrow {
165         return _connected;
166     }
167 
168     public auto fileno() const pure @safe nothrow {
169         return _fileno;
170     }
171 
172     public auto socket_errno() const pure @safe nothrow {
173         return _errno;
174     }
175 
176     void blocking(bool blocking) @property {
177         auto flags = () @trusted {return fcntl(_fileno, F_GETFL, 0);}();
178         if ( blocking ) {
179             (() @trusted => fcntl(_fileno, F_SETFL, flags & ~O_NONBLOCK))();
180         } else {
181             (() @trusted => fcntl(_fileno, F_SETFL, flags | O_NONBLOCK))();
182         }
183     }
184     void timeoutHandler(AppEvent e) @safe {
185         debug
186         {
187             tracef("Timeout handler: %s", appeventToString(e));
188         }
189         final switch (_state) {
190         case State.NEW:
191             assert(0);
192         case State.IDLE:
193             assert(0);
194         case State.CONNECTING:
195             debug tracef("connection timed out");
196             _connected = false;
197             _errno = ETIMEDOUT;
198             _polling = AppEvent.NONE;
199             _loop.stopPoll(_fileno, AppEvent.OUT);
200             _loop.detach(_fileno);
201             _state = State.IDLE;
202             _callback(e);
203             return;
204         case State.ACCEPTING:
205             debug tracef("accept timed out");
206             _connected = false;
207             _errno = ETIMEDOUT;
208             _polling = AppEvent.NONE;
209             _loop.stopPoll(_fileno, AppEvent.IN);
210             _loop.detach(_fileno);
211             _state = State.IDLE;
212             _accept_callback(-1);
213             return;
214         case State.IO:
215             assert(0);
216         }
217     }
218     override void eventHandler(int fd, AppEvent e) @safe {
219         debug tracef("event %s in state %s", appeventToString(e), _state);
220         final switch ( _state ) {
221         case State.NEW:
222             assert(0);
223         case State.IDLE:
224             assert(0);
225         case State.CONNECTING:
226             debug tracef("connection event: %s", appeventToString(e));
227             assert(e & (AppEvent.OUT|AppEvent.HUP), "We can handle only OUT event in connectiong state");
228             if ( e & AppEvent.OUT ) {
229                 _connected = true;
230             }
231             if ( (e & AppEvent.HUP) ) {
232                 int err;
233                 uint err_s = err.sizeof;
234                 auto rc = (() @trusted => .getsockopt(_fileno, SOL_SOCKET, SO_ERROR, &err, &err_s))();
235                 if ( rc == 0 ) {
236                     _errno = err;
237                     debug tracef("error connecting: %s", s_strerror(err));
238                 }
239                 _connected = false;
240             }
241             _polling = AppEvent.NONE;
242             _state = State.IDLE;
243             if ( _connect_timer ) {
244                 _loop.stopTimer(_connect_timer);
245                 _connect_timer = null;
246             }
247             _loop.stopPoll(_fileno, AppEvent.OUT);
248             _callback(e);
249             return;
250             //_handler(e);
251         case State.ACCEPTING:
252             assert(e == AppEvent.IN, "We can handle only IN event in accepting state");
253             foreach(_; 0.._accepts_in_a_row) {
254                 if ( _fileno == -1 ) {
255                     break; // socket can be closed in handler
256                 }
257                 sockaddr sa;
258                 uint sa_len = sa.sizeof;
259                 int new_s = (() @trusted => .accept(_fileno, &sa, &sa_len))();
260                 if ( new_s == -1 ) {
261                     auto err = errno();
262                     if ( err == EWOULDBLOCK || err == EAGAIN ) {
263                         // POSIX.1-2001 and POSIX.1-2008 allow
264                         // either error to be returned for this case, and do not require
265                         // these constants to have the same value, so a portable
266                         // application should check for both possibilities.
267                         break;
268                     }
269                     throw new Exception(s_strerror(err));
270                 }
271                 debug tracef("New socket fd: %d", new_s);
272                 immutable int flag = 1;
273                 auto rc = (() @trusted => .setsockopt(new_s, IPPROTO_TCP, TCP_NODELAY, &flag, flag.sizeof))();
274                 if ( rc != 0 ) {
275                      throw new Exception(s_strerror(errno()));
276                 }
277                 version(OSX) {
278                     rc = (() @trusted => .setsockopt(_fileno, SOL_SOCKET, SO_NOSIGPIPE, &flag, flag.sizeof))();
279                     if ( rc != 0 ) {
280                          throw new Exception(s_strerror(errno()));
281                     }
282                 }
283                 auto flags = (() @trusted => fcntl(new_s, F_GETFL, 0) | O_NONBLOCK)();
284                 (() @trusted => fcntl(new_s, F_SETFL, flags))();
285                 //hlSocket ns = new hlSocket(_af, _sock_type, new_s);
286                 //fd2so[new_s] = ns;
287                 //_accept_callback(ns);
288                 _accept_callback(new_s);
289                 debug tracef("accept_callback for fd: %d - done", new_s);
290             }
291             //_handler(e);
292             return;
293         case State.IO:
294             io_handler(e);
295             return;
296         }
297     }
298 
299     public int open() @trusted {
300         immutable flag = 1;
301         if (_fileno != -1) {
302             throw new SocketException("You can't open already opened socket: fileno(%d)".format(_fileno));
303         }
304         _fileno = socket(_af, _sock_type, 0);
305         if ( _fileno < 0 )
306             return _fileno;
307         _polling = AppEvent.NONE;
308         //fd2so[_fileno] = this;
309         auto rc = .setsockopt(_fileno, IPPROTO_TCP, TCP_NODELAY, &flag, flag.sizeof);
310         if ( rc != 0 ) {
311              throw new Exception(to!string(strerror(errno())));
312         }
313         version(OSX) {
314             rc = .setsockopt(_fileno, SOL_SOCKET, SO_NOSIGPIPE, &flag, flag.sizeof);
315             if ( rc != 0 ) {
316                  throw new Exception(to!string(strerror(errno())));
317             }
318         }
319         auto flags = fcntl(_fileno, F_GETFL, 0) | O_NONBLOCK;
320         fcntl(_fileno, F_SETFL, flags);
321         return _fileno;
322     }
323 
324     public void close() @safe {
325         if ( _fileno != -1 ) {
326             debug tracef("closing %d, polling: %x", _fileno, _polling);
327             if ( _loop && _polling != AppEvent.NONE ) {
328                 debug tracef("detach from polling for %s", appeventToString(_polling));
329                 _loop.stopPoll(_fileno, _polling);
330                 _loop.detach(_fileno);
331             }
332             //fd2so[_fileno] = null;
333             .close(_fileno);
334             _fileno = -1;
335         }
336     }
337     
338     public void bind(string addr) @trusted {
339          debug {
340              tracef("binding to %s", addr);
341          }
342          switch (_af) {
343              case AF_INET:
344                 {
345                     import core.sys.posix.netinet.in_;
346                     // addr must be "host:port"
347                     auto internet_addr = str2inetaddr(addr);
348                     sockaddr_in sin;
349                     sin.sin_family = _af;
350                     sin.sin_port = internet_addr[1];
351                     sin.sin_addr = in_addr(internet_addr[0]);
352                     int flag = 1;
353                     auto rc = .setsockopt(_fileno, SOL_SOCKET, SO_REUSEADDR, &flag, flag.sizeof);
354                     debug tracef("setsockopt for bind result: %d", rc);
355                     if ( rc != 0 ) {
356                     throw new Exception(to!string(strerror(errno())));
357                     }
358                     rc = .bind(_fileno, cast(sockaddr*)&sin, cast(uint)sin.sizeof);
359                     debug {
360                         tracef("bind result: %d", rc);
361                     }
362                     if ( rc != 0 ) {
363                         throw new SocketException(to!string(strerror(errno())));
364                     }
365                 }
366                 break;
367              case AF_UNIX:
368              default:
369                  throw new SocketException("unsupported address family");
370          }
371     }
372 
373     public void listen(int backlog = 10) @trusted {
374         int rc = .listen(_fileno, backlog);
375         if ( rc != 0 ) {
376             throw new SocketException(to!string(strerror(errno())));
377         }
378         debug tracef("listen on %d ok", _fileno);
379     }
380 
381     public void stopPolling(L)(L loop) @safe {
382         debug tracef("Stop polling on %d", _fileno);
383         loop.stopPoll(_fileno, _polling);
384     }
385 
386     private auto getSndRcvTimeouts() @safe {
387 
388         timeval sndtmo, rcvtmo;
389         socklen_t tmolen = sndtmo.sizeof;
390         auto rc = () @trusted {
391             return .getsockopt(_fileno, SOL_SOCKET, SO_SNDTIMEO, &sndtmo, &tmolen);
392         }();
393         enforce(rc==0, "Failed to get sndtmo");
394         debug tracef("got setsockopt sndtimeo: %d: %s", rc, sndtmo);
395         rc = () @trusted {
396             return .getsockopt(_fileno, SOL_SOCKET, SO_RCVTIMEO, &rcvtmo, &tmolen);
397         }();
398         enforce(rc == 0, "Failed to get rcvtmo");
399         debug tracef("got setsockopt sndtimeo: %d: %s", rc, rcvtmo);
400         return Tuple!(timeval, "sndtimeo", timeval, "rcvtimeo")(sndtmo, rcvtmo);
401     }
402 
403     private void setSndRcvTimeouts(Tuple!(timeval, "sndtimeo", timeval, "rcvtimeo") timeouts) @safe {
404 
405         timeval sndtmo = timeouts.sndtimeo, rcvtmo = timeouts.rcvtimeo;
406         socklen_t tmolen = sndtmo.sizeof;
407 
408         auto rc = () @trusted {
409             return .setsockopt(_fileno, SOL_SOCKET, SO_SNDTIMEO, &sndtmo, tmolen);
410         }();
411         debug tracef("got setsockopt sndtimeo: %d: %s", rc, sndtmo);
412         rc = () @trusted {
413             return .setsockopt(_fileno, SOL_SOCKET, SO_RCVTIMEO, &rcvtmo, tmolen);
414         }();
415         debug tracef("got setsockopt sndtimeo: %d: %s", rc, rcvtmo);
416     }
417 
418     private void setSndRcvTimeouts(Duration timeout) @safe {
419         timeval     ntmo;
420         socklen_t   stmo;
421         auto vals = timeout.split!("seconds", "usecs")();
422         ntmo.tv_sec = cast(typeof(timeval.tv_sec)) vals.seconds;
423         ntmo.tv_usec = cast(typeof(timeval.tv_usec)) vals.usecs;
424         auto rc = () @trusted {
425             return .setsockopt(_fileno, SOL_SOCKET, SO_SNDTIMEO, &ntmo, ntmo.sizeof);
426         }();
427         debug tracef("got setsockopt sndtimeo: %d: %s", rc, ntmo);
428         rc = () @trusted {
429             return .setsockopt(_fileno, SOL_SOCKET, SO_RCVTIMEO, &ntmo, ntmo.sizeof);
430         }();
431         debug tracef("got setsockopt rcvtimeo: %d: %s", rc, ntmo);
432     }
433 
434     ///
435     /// connect synchronously (no loop, no fibers)
436     /// in blocked mode
437     ///
438     public bool connect(string addr, Duration timeout) @safe {
439         switch (_af) {
440         case AF_INET: {
441                 import core.sys.posix.netinet.in_;
442                 import core.sys.posix.sys.time: timeval;
443                 // addr must be "host:port"
444                 auto internet_addr = str2inetaddr(addr);
445 
446                 // save old timeout values and set new
447                 auto old_timeouts = getSndRcvTimeouts();
448                 setSndRcvTimeouts(timeout);
449 
450                 sockaddr_in sin;
451                 sin.sin_family = _af;
452                 sin.sin_port = internet_addr[1];
453                 sin.sin_addr = in_addr(internet_addr[0]);
454                 uint sa_len = sin.sizeof;
455                 auto rc = (() @trusted => .connect(_fileno, cast(sockaddr*)&sin, sa_len))();
456                 auto connerrno = errno();
457 
458                 // restore timeouts
459                 setSndRcvTimeouts(old_timeouts);
460                 if (rc == -1 ) {
461                     debug tracef("connect errno: %s %s", s_strerror(connerrno), sin);
462                     _connected = false;
463                     _state = State.IDLE;
464                     return false;
465                 }
466                 _state = State.IDLE;
467                 _connected = true;
468                 return true;
469             }
470         default:
471             throw new SocketException("unsupported address family");
472         }
473     }
474     ///
475     /// Return true if connect delayed
476     ///
477     public bool connect(string addr, hlEvLoop loop, HandlerDelegate f, Duration timeout) @safe {
478         assert(timeout > 0.seconds);
479         switch (_af) {
480             case AF_INET:
481                 {
482                     import core.sys.posix.netinet.in_;
483                     // addr must be "host:port"
484                     auto internet_addr = str2inetaddr(addr);
485                     sockaddr_in sin;
486                     sin.sin_family = _af;
487                     sin.sin_port = internet_addr[1];
488                     sin.sin_addr = in_addr(internet_addr[0]);
489                     uint sa_len = sin.sizeof;
490                     auto rc = (() @trusted => .connect(_fileno, cast(sockaddr*)&sin, sa_len))();
491                     if ( rc == -1 && errno() != EINPROGRESS ) {
492                         debug tracef("connect errno: %s", s_strerror(errno()));
493                         _connected = false;
494                         _state = State.IDLE;
495                         f(AppEvent.ERR|AppEvent.IMMED);
496                         return false;
497                     }
498                     _loop = loop;
499                     _state = State.CONNECTING;
500                     _callback = f;
501                     _polling |= AppEvent.OUT;
502                     loop.startPoll(_fileno, AppEvent.OUT, this);
503                 }
504                 break;
505             default:
506                 throw new SocketException("unsupported address family");
507         }
508         _connect_timer = new Timer(timeout, &timeoutHandler);
509         _loop.startTimer(_connect_timer);
510         return true;
511     }
512 
513     public bool connect(Address addr, hlEvLoop loop, HandlerDelegate f, Duration timeout) @safe {
514         assert(timeout > 0.seconds);
515         switch (_af) {
516         case AF_INET: {
517                 import core.sys.posix.netinet.in_;
518                 sockaddr_in *sin = cast(sockaddr_in*)(addr.name);
519                 uint sa_len = sockaddr.sizeof;
520                 auto rc = (() @trusted => .connect(_fileno, cast(sockaddr*)sin, sa_len))();
521                 if (rc == -1 && errno() != EINPROGRESS) {
522                     debug tracef("connect errno: %s", s_strerror(errno()));
523                     _connected = false;
524                     _state = State.IDLE;
525                     f(AppEvent.ERR | AppEvent.IMMED);
526                     return false;
527                 }
528                 _loop = loop;
529                 _state = State.CONNECTING;
530                 _callback = f;
531                 _polling |= AppEvent.OUT;
532                 loop.startPoll(_fileno, AppEvent.OUT, this);
533             }
534             break;
535         default:
536             throw new SocketException("unsupported address family");
537         }
538         _connect_timer = new Timer(timeout, &timeoutHandler);
539         _loop.startTimer(_connect_timer);
540         return true;
541     }
542 
543     public void accept(T)(hlEvLoop loop, Duration timeout, T f) {
544         _loop = loop;
545         _accept_callback = f;
546 //        if ( _state != State.ACCEPTING ) {
547             _state = State.ACCEPTING;
548             _polling |= AppEvent.IN;
549         _connect_timer = new Timer(timeout, &timeoutHandler);
550         _loop.startTimer(_connect_timer);
551             loop.startPoll(_fileno, AppEvent.IN, this);
552 //        }
553     }
554 
555     void io_handler(AppEvent ev) @safe {
556         debug tracef("event %s on fd %d", appeventToString(ev), _fileno);
557         if ( ev == AppEvent.TMO ) {
558             debug tracef("io timedout");
559             _loop.stopPoll(_fileno, _pollingFor);
560             _polling = AppEvent.NONE;
561             delegate void() @trusted {
562                 _result.input = assumeUnique(_input);
563             }();
564             // return what we collected
565             _result.input = (() @trusted => assumeUnique(_input))();
566             // return timeout flag
567             _result.timedout = true;
568             // make callback
569             _iorq.callback(_result);
570             return;
571         }
572         if ( ev & AppEvent.IN )
573         {
574             size_t _will_read = min(_buffer_size, _iorq.to_read);
575             ubyte[] b = new ubyte[_will_read];
576             auto rc = (() @trusted => recv(_fileno, &b[0], _will_read, 0))();
577             debug tracef("recv on fd %d returned %d", _fileno, rc);
578             if ( rc < 0 )
579             {
580                 _result.error = true;
581                 _polling &= _pollingFor ^ AppEvent.ALL;
582                 _loop.stopPoll(_fileno, _pollingFor);
583                 if ( _io_timer ) {
584                     _loop.stopTimer(_io_timer);
585                     _io_timer = null;
586                 }
587                 _result.input = (() @trusted => assumeUnique(_input))();
588                 //_result.output = output;
589                 _iorq.callback(_result);
590                 return;
591             }
592             if ( rc > 0 )
593             {
594                 b.length = rc;
595                 debug tracef("adding data %s", b);
596                 _input ~= b;
597                 b = null;
598                 _iorq.to_read -= rc;
599                 debug tracef("after adding data %s, %s", _iorq.to_read, _iorq.allowPartialInput);
600                 if ( _iorq.to_read == 0 || _iorq.allowPartialInput ) {
601                     _loop.stopPoll(_fileno, _pollingFor);
602                     _polling = AppEvent.NONE;
603                     if ( _io_timer ) {
604                         _loop.stopTimer(_io_timer);
605                         _io_timer = null;
606                     }
607                     _result.input = (() @trusted => assumeUnique(_input))();
608                     _iorq.callback(_result);
609                     return;
610                 }
611                 return;
612             }
613             if ( rc == 0 )
614             {
615                 // socket closed
616                 _loop.stopPoll(_fileno, _pollingFor);
617                 if ( _io_timer ) {
618                     _loop.stopTimer(_io_timer);
619                     _io_timer = null;
620                 }
621                 _result.input = (() @trusted => assumeUnique(_input))();
622                 _polling = AppEvent.NONE;
623                 _iorq.callback(_result);
624                 return;
625             }
626         }
627         if ( ev & AppEvent.OUT ) {
628             debug tracef("sending %s", _iorq.output);
629             assert(_iorq.output.length>0);
630             uint flags = 0;
631             version(linux) {
632                 flags = MSG_NOSIGNAL;
633             }
634             auto rc = (() @trusted => .send(_fileno, &_iorq.output[0], _iorq.output.length, flags))();
635             if ( rc < 0 ) {
636                 // error sending XXX
637                 assert(0);
638             }
639             _iorq.output = _iorq.output[rc..$];
640             if ( _iorq.output.length == 0 ) {
641                 _loop.stopPoll(_fileno, _pollingFor);
642                 if ( _io_timer ) {
643                     _loop.stopTimer(_io_timer);
644                     _io_timer = null;
645                 }
646                 _result.input = (() @trusted => assumeUnique(_input))();
647                 //_result.output = output;
648                 _polling = AppEvent.NONE;
649                 _iorq.callback(_result);
650                 return;
651             }
652         }
653         else
654         {
655             debug tracef("Unhandled event on %d", _fileno);
656         }
657     
658     }
659     ///
660     /// Make blocking IO without evelnt loop.
661     /// Can be called from non-fiber context
662     ///
663     /// return IOResult
664     ///
665     auto io(in IORequest iorq, in Duration timeout) @safe {
666         IOResult result;
667 
668         version (linux) {
669             immutable uint flags = MSG_NOSIGNAL;
670         } else {
671             immutable uint flags = 0;
672 
673         }
674 
675         auto old_timeouts = getSndRcvTimeouts();
676         setSndRcvTimeouts(timeout);
677 
678         scope(exit) {
679             setSndRcvTimeouts(old_timeouts);
680         }
681 
682         debug tracef("Blocked io request %s", iorq);
683 
684         // handle requested output
685         result.output = iorq.output;
686         while(result.output.length > 0) {
687             auto rc = () @trusted {
688                 return .send(_fileno, cast(void*)result.output.ptr, result.output.length, flags);
689             }();
690 
691             if ( rc > 0 ) {
692                 result.output = result.output[rc..$];
693             } else {
694                 result.error = true;
695                 return result;
696             }
697         }
698         // handle requested input
699         size_t to_read = iorq.to_read;
700         if ( to_read > 0 ) {
701             ubyte[] buffer = new ubyte[](to_read);
702             size_t ptr, l;
703 
704             while(to_read>0) {
705                 auto rc = () @trusted {
706                     return .recv(_fileno, cast(void*)&buffer[ptr], to_read, 0);
707                 }();
708                 debug tracef("got %d bytes to: %s", rc, buffer);
709                 if ( rc == 0 || (rc > 0 && iorq.allowPartialInput) ) {
710                     // client closed connection
711                     l += rc;
712                     buffer.length = l;
713                     result.input = () @trusted {return assumeUnique(buffer);}();
714                     debug tracef("Blocked io returned %s", result);
715                     return result;
716                 }
717                 if ( rc < 0 ) {
718                     buffer.length = l;
719                     result.error = true;
720                     result.input = () @trusted { return assumeUnique(buffer); }();
721                     debug tracef("Blocked io returned %s", result);
722                     return result;
723                 }
724                 to_read -= rc;
725                 ptr += rc;
726                 l += rc;
727             }
728             buffer.length = l;
729             result.input = () @trusted { return assumeUnique(buffer); }();
730             debug tracef("Blocked io returned %s", result);
731             return result;
732         }
733         debug tracef("Blocked io returned %s", result);
734         return result;
735     }
736     ///
737     /// Make unblocked IO using loop
738     ///
739     auto io(hlEvLoop loop, in IORequest iorq, in Duration timeout) @safe {
740 
741         AppEvent ev = AppEvent.NONE;
742         if ( iorq.output && iorq.output.length ) {
743             ev |= AppEvent.OUT;
744         }
745         if ( iorq.to_read > 0 ) {
746             ev |= AppEvent.IN;
747             _input.reserve(iorq.to_read);
748         }
749         _pollingFor = ev;
750         assert(_pollingFor != AppEvent.NONE);
751 
752         if (_io_timer) {
753             debug tracef("closing prev timer: %s", _io_timer);
754             _loop.stopTimer(_io_timer);
755         }
756 
757         _loop = loop;
758         _iorq = iorq;
759         _state = State.IO;
760         if ( timeout > 0.seconds ) {
761             _io_timer = new Timer(timeout, &io_handler);
762             _loop.startTimer(_io_timer);
763         }
764         _loop.startPoll(_fileno, _pollingFor, this);
765         return 0;
766     }
767     /**
768     * just send, no callbacks, no timeouts, nothing
769     * returns what os-level send returns
770     **/
771     long send(immutable(ubyte)[] data) @trusted {
772         return .send(_fileno, data.ptr, data.length, 0);
773     }
774     /**************************************************************************
775      * Send data from data buffer
776      * input: data     - data to send
777      *        timeout  - how long to wait until timedout
778      *        callback - callback which accept IOResult
779      * 1. try to send as much as possible. If complete data sent, then return
780      *    IOresult with empty output and clean timeout and error fileds.
781      * 2. If we can't send complete buffer, then prepare io call and return 
782      *    nonempty result output.
783      * So at return user have to check:
784      * a) if result.error == true - send failed
785      * b) if result.data.empty - data send completed
786      * c) otherwise io call were issued, user will receive callback
787      **************************************************************************/
788     IOResult send(hlEvLoop loop, immutable(ubyte)[] data, Duration timeout, void delegate(IOResult) @safe callback) @safe {
789 
790         enforce!SocketException(data.length > 0, "You must have non-empty 'data' when calling 'send'");
791 
792         IOResult result;
793         result.output = data;
794 
795         uint flags = 0;
796         version(linux) {
797             flags = MSG_NOSIGNAL;
798         }
799         auto rc = (() @trusted => .send(_fileno, &data[0], data.length, flags))();
800         if ( rc < 0 ) {
801             auto err = errno();
802             if ( err != EWOULDBLOCK && err != EAGAIN ) {
803                 // case a.
804                 result.error = true;
805                 return result;
806             }
807             rc = 0; // like we didn't sent anything
808         }
809         data = data[rc..$];
810         result.output = data;
811         if ( result.output.empty ) {
812             // case b. send comleted
813             debug tracef("fast send to %d completed", _fileno);
814             return result;
815         }
816         // case c. - we have to use event loop
817         IORequest iorq;
818         iorq.output = data;
819         iorq.callback = callback;
820         io(loop, iorq, timeout);
821         return result;
822     }
823 }
824 
825 
826 private auto str2inetaddr(string addr) @safe pure {
827     auto pos = indexOf(addr, ':');
828     if ( pos == -1 ) {
829         throw new Exception("incorrect addr %s, expect host:port", addr);
830     }
831     auto host = addr[0..pos].split('.');
832     auto port = addr[pos+1..$];
833     // auto s = addr.split(":");
834     // if ( s.length != 2 ) {
835     //     throw new Exception("incorrect addr %s, expect host:port", addr);
836     // }
837     // host = s[0].split(".");
838     if ( host.length != 4 ) {
839         throw new Exception("addr must be in form a.b.c.d:p, got: " ~ addr);
840     }
841     uint   a = to!ubyte(host[0]) << 24 | to!ubyte(host[1]) << 16 | to!ubyte(host[2]) << 8 | to!ubyte(host[3]);
842     ushort p = to!ushort(port);
843     return tuple(core.sys.posix.arpa.inet.htonl(a), core.sys.posix.arpa.inet.htons(p));
844 }
845 
846 @safe unittest {
847     import core.sys.posix.arpa.inet;
848     assert(str2inetaddr("0.0.0.0:1") == tuple(0, htons(1)));
849     assert(str2inetaddr("1.0.0.0:0") == tuple(htonl(0x01000000),0 ));
850     assert(str2inetaddr("255.255.255.255:0") == tuple(0xffffffff, 0));
851 }
852 
853 @safe unittest {
854     globalLogLevel = LogLevel.info;
855 
856     hlSocket s0 = new hlSocket();
857     s0.open();
858     hlSocket s1 = s0;
859     s0.close();
860     s1.close();
861 }
862 
863 @safe unittest {
864     globalLogLevel = LogLevel.info;
865 
866     hlSocket s = new hlSocket();
867     s.open();
868     s.close();
869 }
870 
871 //unittest {
872 //    globalLogLevel = LogLevel.trace;
873 //
874 //    hlSocket s = new hlSocket();
875 //    s.open();
876 //
877 //
878 //    auto mockLoop = new hlEvLoop();
879 //    mockLoop.run = (Duration d) {
880 //        s._handler(AppEvent.IN);
881 //    };
882 //
883 //    mockLoop.startPoll = (int fd, AppEvent ev, FileHandlerFunction f) @safe {
884 //        tracef("called mock startPoll: %d, %s", fd, appeventToString(ev));
885 //    };
886 //
887 //    mockLoop.stopPoll = (int fd, AppEvent ev) @safe {
888 //        tracef("called mock stopPoll: %d, %s", fd, appeventToString(ev));
889 //    };
890 //
891 //    IORequest iorq;
892 //    iorq.to_read = 1;
893 //    iorq.output = "abc".representation();
894 //    iorq.callback = (IOResult r) {
895 //        tracef("called mock callback: %s", r);
896 //    };
897 //    auto result = s.io(mockLoop, iorq, 1.seconds);
898 //    mockLoop.run(1.seconds);
899 //    assert(s._polling == AppEvent.NONE);
900 //    iorq.to_read = 0;
901 //    result = s.io(mockLoop, iorq, 1.seconds);
902 //    mockLoop.run(1.seconds);
903 //    assert(s._polling == AppEvent.NONE);
904 //    s.close();
905 //}
906 
907 class HioSocket
908 {
909     import core.thread;
910 
911     struct InputStream {
912         private {
913             size_t      _buffer_size = 16*1024;
914             Duration    _timeout = 1.seconds;
915             HioSocket   _socket;
916             bool        _started;
917             bool        _done;
918             immutable(ubyte)[]     _data;
919         }
920         this(HioSocket s, Duration t = 10.seconds) @safe {
921             _socket = s;
922             _timeout = t;
923             _buffer_size = s._socket._buffer_size;
924         }
925         bool empty() {
926             return _started && _done;
927         }
928         auto front() {
929             if ( _done ) {
930                 _data.length = 0;
931                 return _data;
932             }
933             if (!_started ) {
934                 _started = true;
935                 auto r = _socket.recv(_buffer_size, _timeout);
936                 if (r.timedout || r.error || r.input.length == 0) {
937                     _done = true;
938                 } else {
939                     _data = r.input;
940                 }
941             }
942             debug tracef("InputStream front: %s", _data);
943             return _data;
944         }
945         void popFront() {
946             auto r = _socket.recv(_buffer_size, _timeout);
947             if (r.timedout || r.error || r.input.length == 0) {
948                 _done = true;
949             }
950             else {
951                 _data = r.input;
952             }
953         }
954     }
955     private {
956         hlSocket _socket;
957         Fiber    _fiber;
958     }
959     this(ubyte af = AF_INET, int sock_type = SOCK_STREAM, string f = __FILE__, int l = __LINE__) @safe {
960         _socket = new hlSocket(af, sock_type, f, l);
961         _socket.open();
962     }
963 
964     this(int fileno, ubyte af = AF_INET, int sock_type = SOCK_STREAM, string f = __FILE__, int l = __LINE__) {
965         _socket = new hlSocket(fileno, af, sock_type, f, l);
966     }
967 
968     ~this() {
969         // if ( _socket ) {
970         //     _socket.close();
971         // }
972     }
973     override string toString() {
974         return _socket.toString();
975     }
976 
977     void bind(string addr) {
978         _socket.bind(addr);
979     }
980     void handler(AppEvent e) @safe {
981         debug
982         {
983             tracef("HioSocket handler enter");
984         }
985         (()@trusted{_fiber.call();})();
986     }
987     void connect(Address addr, Duration timeout) @trusted {
988         auto loop = getDefaultLoop();
989         _fiber = Fiber.getThis();
990         void callback(AppEvent e) {
991             if (!(e & AppEvent.IMMED)) {
992                 (() @trusted { _fiber.call(); })();
993             }
994         }
995 
996         if (_socket.connect(addr, loop, &callback, timeout)) {
997             Fiber.yield();
998         }
999     }
1000     ///
1001     void connect(string addr, Duration timeout) @trusted {
1002         auto loop = getDefaultLoop();
1003         _fiber = Fiber.getThis();
1004         if ( _fiber is null ) {
1005             // we are not in context of any task, connect synchronously
1006             // 1. set blocking mode, socket timeout
1007             // 2. call connect, throw if faied
1008             // 3. set unblocking mode
1009             // 4. return
1010             _socket.blocking = true;
1011             _socket.connect(addr, timeout);
1012             _socket.blocking = false;
1013             return;
1014         }
1015         void callback(AppEvent e) {
1016             if ( !(e & AppEvent.IMMED) ) {
1017                 // we called yield
1018                 (() @trusted { _fiber.call(); })();
1019             }
1020         }
1021         if ( _socket.connect(addr, loop, &callback, timeout) ) {
1022             Fiber.yield();
1023         }
1024         if ( _socket._errno == ECONNREFUSED ) {
1025             throw new ConnectionRefused("Unable to connect socket: connection refused on " ~ addr);
1026         }
1027     }
1028     ///
1029     bool connected() const @safe {
1030         return _socket.connected;
1031     }
1032     ///
1033     auto errno() @safe {
1034         return _socket.socket_errno();
1035     }
1036     ///
1037     auto listen(int backlog = 10) @safe {
1038         return _socket.listen(backlog);
1039     }
1040     ///
1041     void close() @safe {
1042         if ( _socket ) {
1043             _socket.close();
1044             _socket = null;
1045         }
1046     }
1047     ///
1048     auto accept(Duration timeout = Duration.max) {
1049         HioSocket s;
1050 
1051         auto loop = getDefaultLoop();
1052         _fiber = Fiber.getThis();
1053 
1054         void callback(int fileno) @trusted {
1055             debug tracef("Got %d on accept", fileno);
1056             if ( fileno < 0 ) {
1057                 s = null;
1058                 _fiber.call();
1059                 return;
1060             }
1061             debug tracef("got accept callback for socket %d", fileno);
1062             if ( _socket._polling & AppEvent.IN ) {
1063                 getDefaultLoop.stopPoll(_socket.fileno, AppEvent.IN);
1064                 _socket._polling &= ~AppEvent.IN;
1065             }
1066             _socket._state = hlSocket.State.IDLE;
1067             s = new HioSocket(fileno);
1068             _fiber.call();
1069         }
1070         _socket._accepts_in_a_row = 1;
1071         _socket.accept(loop, timeout, &callback);
1072         Fiber.yield();
1073         return s;
1074     }
1075     ///
1076     IOResult recv(size_t n, Duration timeout = 10.seconds) @trusted {
1077         IORequest ioreq;
1078         IOResult  iores;
1079 
1080         _fiber = Fiber.getThis();
1081         if ( _fiber is null) {
1082             // read not in context of any fiber. Blocked read.
1083             _socket.blocking = true;
1084             ioreq.to_read = n;
1085             iores = _socket.io(ioreq, timeout);
1086             _socket.blocking = false;
1087             return iores;
1088         }
1089         void callback(IOResult ior) @trusted {
1090             debug tracef("got ior on recv: %s", ior);
1091             iores = ior;
1092             _fiber.call();
1093         }
1094 
1095         ioreq.to_read = n;
1096         ioreq.callback = &callback;
1097         _socket.io(getDefaultLoop(), ioreq, timeout);
1098         debug tracef("recv yielding on %s", _socket);
1099         Fiber.yield();
1100         debug tracef("recv done on %s", _socket);
1101         return iores;
1102     }
1103     ///
1104     size_t send(immutable (ubyte)[] data, Duration timeout = 1.seconds) @trusted {
1105         _fiber = Fiber.getThis();
1106         IOResult ioresult;
1107 
1108         if ( _fiber is null ) {
1109             IORequest ioreq;
1110             _socket.blocking = true;
1111             ioreq.output = data;
1112             ioresult = _socket.io(ioreq, timeout);
1113             _socket.blocking = false;
1114             if ( ioresult.error ) {
1115                 return -1;
1116             }
1117             return 0;
1118         }
1119 
1120         void callback(IOResult ior) @trusted {
1121             ioresult = ior;
1122             _fiber.call();
1123         }
1124         ioresult = _socket.send(getDefaultLoop(), data, timeout, &callback);
1125         if ( ioresult.error ) {
1126             return -1;
1127         }
1128         if ( ioresult.output.empty ) {
1129             return data.length;
1130         }
1131         Fiber.yield();
1132         if (ioresult.error) {
1133             return -1;
1134         }
1135         return data.length - ioresult.output.length;
1136     }
1137     ///
1138     InputStream inputStream(Duration t=10.seconds) @safe {
1139         return InputStream(this, t);
1140     }
1141 }
1142 
1143 unittest {
1144     import core.thread;
1145     import hio.scheduler;
1146 
1147     globalLogLevel = LogLevel.info;
1148     void server(ushort port) {
1149         auto s = new HioSocket();
1150         s.bind("127.0.0.1:%s".format(port));
1151         s.listen();
1152         auto c = s.accept(2.seconds);
1153         if ( c is null ) {
1154             s.close();
1155             throw new Exception("Accept failed");
1156         }
1157         auto io = c.recv(64, 1.seconds);
1158         assert(io.input == "hello".representation);
1159         c.send("world".representation);
1160         c.close();
1161         s.close();
1162     }
1163     void client(ushort port) {
1164         auto s = new HioSocket();
1165         s.connect("127.0.0.1:%d".format(port), 1.seconds);
1166         scope(exit) {
1167             s.close();
1168         }
1169         if ( s.connected ) {
1170             auto rq = s.send("hello".representation, 1.seconds);
1171             auto rs = s.recv(64, 1.seconds);
1172             assert(rs.input == "world".representation);
1173         } else {
1174             throw new Exception("Can't connect to server");
1175         }
1176     }
1177 
1178     info("Test HioSockets 0");
1179 
1180     // all ok case
1181     auto t = new Thread({
1182         try{
1183             App(&server, cast(ushort)12345);
1184         } catch (Exception e) {
1185             infof("Got %s in server", e);
1186         }
1187     }).start;
1188     Thread.sleep(500.msecs);
1189     client(12345);
1190     t.join;
1191 
1192     info("Test HioSockets 1");
1193     // all fail case - everything should throw
1194     t = new Thread({
1195         App(&server, cast(ushort) 12345);
1196     }).start;
1197     assertThrown!Exception(client(12346));
1198     assertThrown!Exception(t.join);
1199 
1200     info("Test HioSockets 2");
1201     // the same but client in App
1202     // all ok case
1203     t = new Thread({
1204         App(&server, cast(ushort) 12345);
1205     }).start;
1206     Thread.sleep(500.msecs);
1207     App(&client, cast(ushort)12345);
1208     t.join;
1209 
1210     info("Test HioSockets 3");
1211     // all fail case - everything should throw
1212     t = new Thread({ App(&server, cast(ushort) 12345); }).start;
1213     assertThrown!Exception(App(&client, cast(ushort)12346));
1214     assertThrown!Exception(t.join);
1215 }
1216 
1217 ///
1218 struct ByLineSplitter(R) {
1219     import nbuff;
1220     private {
1221         enum    NL = "\n".representation;
1222         R       source;
1223         Buffer  buff;
1224         long    last_position;
1225         string  line;
1226     }
1227     ///
1228     this(R r) {
1229         source = r;
1230         popFront;
1231     }
1232     bool empty() {
1233         return line is null && source.empty && buff.empty;
1234     }
1235     string front() {
1236         return line;
1237     }
1238     void popFront() {
1239         while (true) {
1240             auto p = buff.countUntil(last_position, NL);
1241             if (p >= 0) {
1242                 last_position = 0;
1243                 if ( p == 0 ) {
1244                     line = "";
1245                 } else {
1246                     line = cast(string) buff[0 .. p].data;
1247                 }
1248                 buff = buff[p + 1 .. $];
1249                 return;
1250             }
1251             last_position = buff.length;
1252             if (source.empty) {
1253                 line = cast(string) buff[0 .. $].data;
1254                 buff = Buffer();
1255                 return;
1256             }
1257             // fill from source and retry
1258             buff.append(source.front);
1259             source.popFront;
1260         }
1261     }
1262 }
1263 
1264 ///
1265 auto byLineSplitter(R)(R r) {
1266     return ByLineSplitter!R(r);
1267 }
1268 
1269 unittest {
1270     import std.range;
1271     for(int s=1;s<7;s++) {
1272         auto result = "a\nbb\n\n\nc\n\n".chunks(s)
1273             .array.map!"to!string(a).representation".byLineSplitter;
1274         assert(equal(result, ["a", "bb", "", "", "c", ""]));
1275    }
1276 }