1 module hio.socket; 2 3 import std.typecons; 4 import std.string; 5 import std.conv; 6 import std.traits; 7 import std.datetime; 8 import std.exception; 9 import std.algorithm; 10 11 import std.algorithm.comparison: min; 12 13 import std.experimental.logger; 14 15 import core.memory: pureMalloc, GC; 16 import core.exception : onOutOfMemoryError; 17 18 import std.experimental.allocator; 19 import std.experimental.allocator.building_blocks; 20 21 //static import std.socket; 22 23 import std.socket; 24 25 import core.sys.posix.sys.socket; 26 import core.sys.posix.unistd; 27 import core.sys.posix.arpa.inet; 28 import core.sys.posix.netinet.tcp; 29 import core.sys.posix.netinet.in_; 30 import core.sys.posix.sys.time : timeval; 31 32 import core.sys.posix.fcntl; 33 34 import core.stdc.string; 35 import core.stdc.errno; 36 37 public import hio.events; 38 import hio.common; 39 //import nbuff; 40 41 import hio; 42 43 //alias Socket = RefCounted!SocketImpl; 44 45 alias AcceptFunction = void function(int fileno); 46 alias AcceptDelegate = void delegate(hlSocket); 47 48 // static ~this() { 49 // trace("deinit"); 50 // } 51 52 //hlSocket[int] fd2so; 53 // 54 //void loopCallback(int fd, AppEvent ev) @safe { 55 // debug tracef("loopCallback for %d", fd); 56 // hlSocket s = fd2so[fd]; 57 // if ( s && s._fileno >= 0) { 58 // debug tracef("calling handler(%s) for %s", appeventToString(ev), s); 59 // s._handler(ev); 60 // } else { 61 // infof("impossible event %s on fd: %d", appeventToString(ev), fd); 62 // } 63 //} 64 65 class SocketException : Exception { 66 this(string msg, string file = __FILE__, size_t line = __LINE__) @safe { 67 super(msg, file, line); 68 } 69 } 70 71 class ConnectionRefused : Exception { 72 this(string msg, string file = __FILE__, size_t line = __LINE__) @safe { 73 super(msg, file, line); 74 } 75 } 76 77 class Timeout : Exception { 78 this(string msg, string file = __FILE__, size_t line = __LINE__) @safe { 79 super(msg, file, line); 80 } 81 } 82 83 bool isLinux() pure nothrow @nogc @safe { 84 version(linux) { 85 return true; 86 } else { 87 return false; 88 } 89 } 90 91 class hlSocket : FileEventHandler { 92 private { 93 enum State { 94 NEW = 0, 95 IDLE, 96 CONNECTING, 97 ACCEPTING, 98 IO, 99 } 100 101 immutable ubyte _af = AF_INET; 102 immutable int _sock_type = SOCK_STREAM; 103 int _fileno = -1; 104 int _errno; 105 HandlerDelegate _handler; 106 AppEvent _polling = AppEvent.NONE; 107 size_t _buffer_size = 16*1024; 108 hlEvLoop _loop; 109 immutable string _file; 110 immutable int _line; 111 State _state = State.NEW; 112 HandlerDelegate _callback; 113 // accept related fields 114 void delegate(int) @safe _accept_callback; 115 // io related fields 116 IORequest _iorq; 117 IOResult _result; 118 Timer _connect_timer; 119 Timer _io_timer; 120 AppEvent _pollingFor = AppEvent.NONE; 121 ubyte[] _input; 122 bool _connected; 123 uint _accepts_in_a_row = 10; 124 } 125 126 this(ubyte af = AF_INET, int sock_type = SOCK_STREAM, string f = __FILE__, int l = __LINE__) @safe { 127 debug tracef("create socket"); 128 _af = af; 129 _sock_type = sock_type; 130 _file = f; 131 _line = l; 132 } 133 134 this(int s, ubyte af = AF_INET, int sock_type = 0, string f = __FILE__, int l = __LINE__) @safe 135 in {assert(s>=0);} 136 body { 137 _af = af; 138 _sock_type = sock_type; 139 _fileno = s; 140 _file = f; 141 _line = l; 142 auto flags = (() @trusted => fcntl(_fileno, F_GETFL, 0) | O_NONBLOCK)(); 143 (() @trusted => fcntl(_fileno, F_SETFL, flags))(); 144 } 145 146 147 ~this() { 148 // if ( _fileno != -1 ) { 149 // if ( _loop && _polling != AppEvent.NONE ) { 150 // _loop.stopPoll(_fileno, _polling); 151 // _loop.detach(_fileno); 152 // } 153 // //fd2so[_fileno] = null; 154 // .close(_fileno); 155 // _fileno = -1; 156 // } 157 // close(); 158 } 159 160 override string toString() const @safe { 161 import std.format: format; 162 return "socket: fileno: %d, (%s:%d)".format(_fileno, _file, _line); 163 } 164 public bool connected() const pure @safe nothrow { 165 return _connected; 166 } 167 168 public auto fileno() const pure @safe nothrow { 169 return _fileno; 170 } 171 172 public auto socket_errno() const pure @safe nothrow { 173 return _errno; 174 } 175 176 void blocking(bool blocking) @property { 177 auto flags = () @trusted {return fcntl(_fileno, F_GETFL, 0);}(); 178 if ( blocking ) { 179 (() @trusted => fcntl(_fileno, F_SETFL, flags & ~O_NONBLOCK))(); 180 } else { 181 (() @trusted => fcntl(_fileno, F_SETFL, flags | O_NONBLOCK))(); 182 } 183 } 184 void timeoutHandler(AppEvent e) @safe { 185 debug 186 { 187 tracef("Timeout handler: %s", appeventToString(e)); 188 } 189 final switch (_state) { 190 case State.NEW: 191 assert(0); 192 case State.IDLE: 193 assert(0); 194 case State.CONNECTING: 195 debug tracef("connection timed out"); 196 _connected = false; 197 _errno = ETIMEDOUT; 198 _polling = AppEvent.NONE; 199 _loop.stopPoll(_fileno, AppEvent.OUT); 200 _loop.detach(_fileno); 201 _state = State.IDLE; 202 _callback(e); 203 return; 204 case State.ACCEPTING: 205 debug tracef("accept timed out"); 206 _connected = false; 207 _errno = ETIMEDOUT; 208 _polling = AppEvent.NONE; 209 _loop.stopPoll(_fileno, AppEvent.IN); 210 _loop.detach(_fileno); 211 _state = State.IDLE; 212 _accept_callback(-1); 213 return; 214 case State.IO: 215 assert(0); 216 } 217 } 218 override void eventHandler(int fd, AppEvent e) @safe { 219 debug tracef("event %s in state %s", appeventToString(e), _state); 220 final switch ( _state ) { 221 case State.NEW: 222 assert(0); 223 case State.IDLE: 224 assert(0); 225 case State.CONNECTING: 226 debug tracef("connection event: %s", appeventToString(e)); 227 assert(e & (AppEvent.OUT|AppEvent.HUP), "We can handle only OUT event in connectiong state"); 228 if ( e & AppEvent.OUT ) { 229 _connected = true; 230 } 231 if ( (e & AppEvent.HUP) ) { 232 int err; 233 uint err_s = err.sizeof; 234 auto rc = (() @trusted => .getsockopt(_fileno, SOL_SOCKET, SO_ERROR, &err, &err_s))(); 235 if ( rc == 0 ) { 236 _errno = err; 237 debug tracef("error connecting: %s", s_strerror(err)); 238 } 239 _connected = false; 240 } 241 _polling = AppEvent.NONE; 242 _state = State.IDLE; 243 if ( _connect_timer ) { 244 _loop.stopTimer(_connect_timer); 245 _connect_timer = null; 246 } 247 _loop.stopPoll(_fileno, AppEvent.OUT); 248 _callback(e); 249 return; 250 //_handler(e); 251 case State.ACCEPTING: 252 assert(e == AppEvent.IN, "We can handle only IN event in accepting state"); 253 foreach(_; 0.._accepts_in_a_row) { 254 if ( _fileno == -1 ) { 255 break; // socket can be closed in handler 256 } 257 sockaddr sa; 258 uint sa_len = sa.sizeof; 259 int new_s = (() @trusted => .accept(_fileno, &sa, &sa_len))(); 260 if ( new_s == -1 ) { 261 auto err = errno(); 262 if ( err == EWOULDBLOCK || err == EAGAIN ) { 263 // POSIX.1-2001 and POSIX.1-2008 allow 264 // either error to be returned for this case, and do not require 265 // these constants to have the same value, so a portable 266 // application should check for both possibilities. 267 break; 268 } 269 throw new Exception(s_strerror(err)); 270 } 271 debug tracef("New socket fd: %d", new_s); 272 immutable int flag = 1; 273 auto rc = (() @trusted => .setsockopt(new_s, IPPROTO_TCP, TCP_NODELAY, &flag, flag.sizeof))(); 274 if ( rc != 0 ) { 275 throw new Exception(s_strerror(errno())); 276 } 277 version(OSX) { 278 rc = (() @trusted => .setsockopt(_fileno, SOL_SOCKET, SO_NOSIGPIPE, &flag, flag.sizeof))(); 279 if ( rc != 0 ) { 280 throw new Exception(s_strerror(errno())); 281 } 282 } 283 auto flags = (() @trusted => fcntl(new_s, F_GETFL, 0) | O_NONBLOCK)(); 284 (() @trusted => fcntl(new_s, F_SETFL, flags))(); 285 //hlSocket ns = new hlSocket(_af, _sock_type, new_s); 286 //fd2so[new_s] = ns; 287 //_accept_callback(ns); 288 _accept_callback(new_s); 289 debug tracef("accept_callback for fd: %d - done", new_s); 290 } 291 //_handler(e); 292 return; 293 case State.IO: 294 io_handler(e); 295 return; 296 } 297 } 298 299 public int open() @trusted { 300 immutable flag = 1; 301 if (_fileno != -1) { 302 throw new SocketException("You can't open already opened socket: fileno(%d)".format(_fileno)); 303 } 304 _fileno = socket(_af, _sock_type, 0); 305 if ( _fileno < 0 ) 306 return _fileno; 307 _polling = AppEvent.NONE; 308 //fd2so[_fileno] = this; 309 auto rc = .setsockopt(_fileno, IPPROTO_TCP, TCP_NODELAY, &flag, flag.sizeof); 310 if ( rc != 0 ) { 311 throw new Exception(to!string(strerror(errno()))); 312 } 313 version(OSX) { 314 rc = .setsockopt(_fileno, SOL_SOCKET, SO_NOSIGPIPE, &flag, flag.sizeof); 315 if ( rc != 0 ) { 316 throw new Exception(to!string(strerror(errno()))); 317 } 318 } 319 auto flags = fcntl(_fileno, F_GETFL, 0) | O_NONBLOCK; 320 fcntl(_fileno, F_SETFL, flags); 321 return _fileno; 322 } 323 324 public void close() @safe { 325 if ( _fileno != -1 ) { 326 debug tracef("closing %d, polling: %x", _fileno, _polling); 327 if ( _loop && _polling != AppEvent.NONE ) { 328 debug tracef("detach from polling for %s", appeventToString(_polling)); 329 _loop.stopPoll(_fileno, _polling); 330 _loop.detach(_fileno); 331 } 332 //fd2so[_fileno] = null; 333 .close(_fileno); 334 _fileno = -1; 335 } 336 } 337 338 public void bind(string addr) @trusted { 339 debug { 340 tracef("binding to %s", addr); 341 } 342 switch (_af) { 343 case AF_INET: 344 { 345 import core.sys.posix.netinet.in_; 346 // addr must be "host:port" 347 auto internet_addr = str2inetaddr(addr); 348 sockaddr_in sin; 349 sin.sin_family = _af; 350 sin.sin_port = internet_addr[1]; 351 sin.sin_addr = in_addr(internet_addr[0]); 352 int flag = 1; 353 auto rc = .setsockopt(_fileno, SOL_SOCKET, SO_REUSEADDR, &flag, flag.sizeof); 354 debug tracef("setsockopt for bind result: %d", rc); 355 if ( rc != 0 ) { 356 throw new Exception(to!string(strerror(errno()))); 357 } 358 rc = .bind(_fileno, cast(sockaddr*)&sin, cast(uint)sin.sizeof); 359 debug { 360 tracef("bind result: %d", rc); 361 } 362 if ( rc != 0 ) { 363 throw new SocketException(to!string(strerror(errno()))); 364 } 365 } 366 break; 367 case AF_UNIX: 368 default: 369 throw new SocketException("unsupported address family"); 370 } 371 } 372 373 public void listen(int backlog = 10) @trusted { 374 int rc = .listen(_fileno, backlog); 375 if ( rc != 0 ) { 376 throw new SocketException(to!string(strerror(errno()))); 377 } 378 debug tracef("listen on %d ok", _fileno); 379 } 380 381 public void stopPolling(L)(L loop) @safe { 382 debug tracef("Stop polling on %d", _fileno); 383 loop.stopPoll(_fileno, _polling); 384 } 385 386 private auto getSndRcvTimeouts() @safe { 387 388 timeval sndtmo, rcvtmo; 389 socklen_t tmolen = sndtmo.sizeof; 390 auto rc = () @trusted { 391 return .getsockopt(_fileno, SOL_SOCKET, SO_SNDTIMEO, &sndtmo, &tmolen); 392 }(); 393 enforce(rc==0, "Failed to get sndtmo"); 394 debug tracef("got setsockopt sndtimeo: %d: %s", rc, sndtmo); 395 rc = () @trusted { 396 return .getsockopt(_fileno, SOL_SOCKET, SO_RCVTIMEO, &rcvtmo, &tmolen); 397 }(); 398 enforce(rc == 0, "Failed to get rcvtmo"); 399 debug tracef("got setsockopt sndtimeo: %d: %s", rc, rcvtmo); 400 return Tuple!(timeval, "sndtimeo", timeval, "rcvtimeo")(sndtmo, rcvtmo); 401 } 402 403 private void setSndRcvTimeouts(Tuple!(timeval, "sndtimeo", timeval, "rcvtimeo") timeouts) @safe { 404 405 timeval sndtmo = timeouts.sndtimeo, rcvtmo = timeouts.rcvtimeo; 406 socklen_t tmolen = sndtmo.sizeof; 407 408 auto rc = () @trusted { 409 return .setsockopt(_fileno, SOL_SOCKET, SO_SNDTIMEO, &sndtmo, tmolen); 410 }(); 411 debug tracef("got setsockopt sndtimeo: %d: %s", rc, sndtmo); 412 rc = () @trusted { 413 return .setsockopt(_fileno, SOL_SOCKET, SO_RCVTIMEO, &rcvtmo, tmolen); 414 }(); 415 debug tracef("got setsockopt sndtimeo: %d: %s", rc, rcvtmo); 416 } 417 418 private void setSndRcvTimeouts(Duration timeout) @safe { 419 timeval ntmo; 420 socklen_t stmo; 421 auto vals = timeout.split!("seconds", "usecs")(); 422 ntmo.tv_sec = cast(typeof(timeval.tv_sec)) vals.seconds; 423 ntmo.tv_usec = cast(typeof(timeval.tv_usec)) vals.usecs; 424 auto rc = () @trusted { 425 return .setsockopt(_fileno, SOL_SOCKET, SO_SNDTIMEO, &ntmo, ntmo.sizeof); 426 }(); 427 debug tracef("got setsockopt sndtimeo: %d: %s", rc, ntmo); 428 rc = () @trusted { 429 return .setsockopt(_fileno, SOL_SOCKET, SO_RCVTIMEO, &ntmo, ntmo.sizeof); 430 }(); 431 debug tracef("got setsockopt rcvtimeo: %d: %s", rc, ntmo); 432 } 433 434 /// 435 /// connect synchronously (no loop, no fibers) 436 /// in blocked mode 437 /// 438 public bool connect(string addr, Duration timeout) @safe { 439 switch (_af) { 440 case AF_INET: { 441 import core.sys.posix.netinet.in_; 442 import core.sys.posix.sys.time: timeval; 443 // addr must be "host:port" 444 auto internet_addr = str2inetaddr(addr); 445 446 // save old timeout values and set new 447 auto old_timeouts = getSndRcvTimeouts(); 448 setSndRcvTimeouts(timeout); 449 450 sockaddr_in sin; 451 sin.sin_family = _af; 452 sin.sin_port = internet_addr[1]; 453 sin.sin_addr = in_addr(internet_addr[0]); 454 uint sa_len = sin.sizeof; 455 auto rc = (() @trusted => .connect(_fileno, cast(sockaddr*)&sin, sa_len))(); 456 auto connerrno = errno(); 457 458 // restore timeouts 459 setSndRcvTimeouts(old_timeouts); 460 if (rc == -1 ) { 461 debug tracef("connect errno: %s %s", s_strerror(connerrno), sin); 462 _connected = false; 463 _state = State.IDLE; 464 return false; 465 } 466 _state = State.IDLE; 467 _connected = true; 468 return true; 469 } 470 default: 471 throw new SocketException("unsupported address family"); 472 } 473 } 474 /// 475 /// Return true if connect delayed 476 /// 477 public bool connect(string addr, hlEvLoop loop, HandlerDelegate f, Duration timeout) @safe { 478 assert(timeout > 0.seconds); 479 switch (_af) { 480 case AF_INET: 481 { 482 import core.sys.posix.netinet.in_; 483 // addr must be "host:port" 484 auto internet_addr = str2inetaddr(addr); 485 sockaddr_in sin; 486 sin.sin_family = _af; 487 sin.sin_port = internet_addr[1]; 488 sin.sin_addr = in_addr(internet_addr[0]); 489 uint sa_len = sin.sizeof; 490 auto rc = (() @trusted => .connect(_fileno, cast(sockaddr*)&sin, sa_len))(); 491 if ( rc == -1 && errno() != EINPROGRESS ) { 492 debug tracef("connect errno: %s", s_strerror(errno())); 493 _connected = false; 494 _state = State.IDLE; 495 f(AppEvent.ERR|AppEvent.IMMED); 496 return false; 497 } 498 _loop = loop; 499 _state = State.CONNECTING; 500 _callback = f; 501 _polling |= AppEvent.OUT; 502 loop.startPoll(_fileno, AppEvent.OUT, this); 503 } 504 break; 505 default: 506 throw new SocketException("unsupported address family"); 507 } 508 _connect_timer = new Timer(timeout, &timeoutHandler); 509 _loop.startTimer(_connect_timer); 510 return true; 511 } 512 513 public bool connect(Address addr, hlEvLoop loop, HandlerDelegate f, Duration timeout) @safe { 514 assert(timeout > 0.seconds); 515 switch (_af) { 516 case AF_INET: { 517 import core.sys.posix.netinet.in_; 518 sockaddr_in *sin = cast(sockaddr_in*)(addr.name); 519 uint sa_len = sockaddr.sizeof; 520 auto rc = (() @trusted => .connect(_fileno, cast(sockaddr*)sin, sa_len))(); 521 if (rc == -1 && errno() != EINPROGRESS) { 522 debug tracef("connect errno: %s", s_strerror(errno())); 523 _connected = false; 524 _state = State.IDLE; 525 f(AppEvent.ERR | AppEvent.IMMED); 526 return false; 527 } 528 _loop = loop; 529 _state = State.CONNECTING; 530 _callback = f; 531 _polling |= AppEvent.OUT; 532 loop.startPoll(_fileno, AppEvent.OUT, this); 533 } 534 break; 535 default: 536 throw new SocketException("unsupported address family"); 537 } 538 _connect_timer = new Timer(timeout, &timeoutHandler); 539 _loop.startTimer(_connect_timer); 540 return true; 541 } 542 543 public void accept(T)(hlEvLoop loop, Duration timeout, T f) { 544 _loop = loop; 545 _accept_callback = f; 546 // if ( _state != State.ACCEPTING ) { 547 _state = State.ACCEPTING; 548 _polling |= AppEvent.IN; 549 _connect_timer = new Timer(timeout, &timeoutHandler); 550 _loop.startTimer(_connect_timer); 551 loop.startPoll(_fileno, AppEvent.IN, this); 552 // } 553 } 554 555 void io_handler(AppEvent ev) @safe { 556 debug tracef("event %s on fd %d", appeventToString(ev), _fileno); 557 if ( ev == AppEvent.TMO ) { 558 debug tracef("io timedout"); 559 _loop.stopPoll(_fileno, _pollingFor); 560 _polling = AppEvent.NONE; 561 delegate void() @trusted { 562 _result.input = assumeUnique(_input); 563 }(); 564 // return what we collected 565 _result.input = (() @trusted => assumeUnique(_input))(); 566 // return timeout flag 567 _result.timedout = true; 568 // make callback 569 _iorq.callback(_result); 570 return; 571 } 572 if ( ev & AppEvent.IN ) 573 { 574 size_t _will_read = min(_buffer_size, _iorq.to_read); 575 ubyte[] b = new ubyte[_will_read]; 576 auto rc = (() @trusted => recv(_fileno, &b[0], _will_read, 0))(); 577 debug tracef("recv on fd %d returned %d", _fileno, rc); 578 if ( rc < 0 ) 579 { 580 _result.error = true; 581 _polling &= _pollingFor ^ AppEvent.ALL; 582 _loop.stopPoll(_fileno, _pollingFor); 583 if ( _io_timer ) { 584 _loop.stopTimer(_io_timer); 585 _io_timer = null; 586 } 587 _result.input = (() @trusted => assumeUnique(_input))(); 588 //_result.output = output; 589 _iorq.callback(_result); 590 return; 591 } 592 if ( rc > 0 ) 593 { 594 b.length = rc; 595 debug tracef("adding data %s", b); 596 _input ~= b; 597 b = null; 598 _iorq.to_read -= rc; 599 debug tracef("after adding data %s, %s", _iorq.to_read, _iorq.allowPartialInput); 600 if ( _iorq.to_read == 0 || _iorq.allowPartialInput ) { 601 _loop.stopPoll(_fileno, _pollingFor); 602 _polling = AppEvent.NONE; 603 if ( _io_timer ) { 604 _loop.stopTimer(_io_timer); 605 _io_timer = null; 606 } 607 _result.input = (() @trusted => assumeUnique(_input))(); 608 _iorq.callback(_result); 609 return; 610 } 611 return; 612 } 613 if ( rc == 0 ) 614 { 615 // socket closed 616 _loop.stopPoll(_fileno, _pollingFor); 617 if ( _io_timer ) { 618 _loop.stopTimer(_io_timer); 619 _io_timer = null; 620 } 621 _result.input = (() @trusted => assumeUnique(_input))(); 622 _polling = AppEvent.NONE; 623 _iorq.callback(_result); 624 return; 625 } 626 } 627 if ( ev & AppEvent.OUT ) { 628 debug tracef("sending %s", _iorq.output); 629 assert(_iorq.output.length>0); 630 uint flags = 0; 631 version(linux) { 632 flags = MSG_NOSIGNAL; 633 } 634 auto rc = (() @trusted => .send(_fileno, &_iorq.output[0], _iorq.output.length, flags))(); 635 if ( rc < 0 ) { 636 // error sending XXX 637 assert(0); 638 } 639 _iorq.output = _iorq.output[rc..$]; 640 if ( _iorq.output.length == 0 ) { 641 _loop.stopPoll(_fileno, _pollingFor); 642 if ( _io_timer ) { 643 _loop.stopTimer(_io_timer); 644 _io_timer = null; 645 } 646 _result.input = (() @trusted => assumeUnique(_input))(); 647 //_result.output = output; 648 _polling = AppEvent.NONE; 649 _iorq.callback(_result); 650 return; 651 } 652 } 653 else 654 { 655 debug tracef("Unhandled event on %d", _fileno); 656 } 657 658 } 659 /// 660 /// Make blocking IO without evelnt loop. 661 /// Can be called from non-fiber context 662 /// 663 /// return IOResult 664 /// 665 auto io(in IORequest iorq, in Duration timeout) @safe { 666 IOResult result; 667 668 version (linux) { 669 immutable uint flags = MSG_NOSIGNAL; 670 } else { 671 immutable uint flags = 0; 672 673 } 674 675 auto old_timeouts = getSndRcvTimeouts(); 676 setSndRcvTimeouts(timeout); 677 678 scope(exit) { 679 setSndRcvTimeouts(old_timeouts); 680 } 681 682 debug tracef("Blocked io request %s", iorq); 683 684 // handle requested output 685 result.output = iorq.output; 686 while(result.output.length > 0) { 687 auto rc = () @trusted { 688 return .send(_fileno, cast(void*)result.output.ptr, result.output.length, flags); 689 }(); 690 691 if ( rc > 0 ) { 692 result.output = result.output[rc..$]; 693 } else { 694 result.error = true; 695 return result; 696 } 697 } 698 // handle requested input 699 size_t to_read = iorq.to_read; 700 if ( to_read > 0 ) { 701 ubyte[] buffer = new ubyte[](to_read); 702 size_t ptr, l; 703 704 while(to_read>0) { 705 auto rc = () @trusted { 706 return .recv(_fileno, cast(void*)&buffer[ptr], to_read, 0); 707 }(); 708 debug tracef("got %d bytes to: %s", rc, buffer); 709 if ( rc == 0 || (rc > 0 && iorq.allowPartialInput) ) { 710 // client closed connection 711 l += rc; 712 buffer.length = l; 713 result.input = () @trusted {return assumeUnique(buffer);}(); 714 debug tracef("Blocked io returned %s", result); 715 return result; 716 } 717 if ( rc < 0 ) { 718 buffer.length = l; 719 result.error = true; 720 result.input = () @trusted { return assumeUnique(buffer); }(); 721 debug tracef("Blocked io returned %s", result); 722 return result; 723 } 724 to_read -= rc; 725 ptr += rc; 726 l += rc; 727 } 728 buffer.length = l; 729 result.input = () @trusted { return assumeUnique(buffer); }(); 730 debug tracef("Blocked io returned %s", result); 731 return result; 732 } 733 debug tracef("Blocked io returned %s", result); 734 return result; 735 } 736 /// 737 /// Make unblocked IO using loop 738 /// 739 auto io(hlEvLoop loop, in IORequest iorq, in Duration timeout) @safe { 740 741 AppEvent ev = AppEvent.NONE; 742 if ( iorq.output && iorq.output.length ) { 743 ev |= AppEvent.OUT; 744 } 745 if ( iorq.to_read > 0 ) { 746 ev |= AppEvent.IN; 747 _input.reserve(iorq.to_read); 748 } 749 _pollingFor = ev; 750 assert(_pollingFor != AppEvent.NONE); 751 752 if (_io_timer) { 753 debug tracef("closing prev timer: %s", _io_timer); 754 _loop.stopTimer(_io_timer); 755 } 756 757 _loop = loop; 758 _iorq = iorq; 759 _state = State.IO; 760 if ( timeout > 0.seconds ) { 761 _io_timer = new Timer(timeout, &io_handler); 762 _loop.startTimer(_io_timer); 763 } 764 _loop.startPoll(_fileno, _pollingFor, this); 765 return 0; 766 } 767 /** 768 * just send, no callbacks, no timeouts, nothing 769 * returns what os-level send returns 770 **/ 771 long send(immutable(ubyte)[] data) @trusted { 772 return .send(_fileno, data.ptr, data.length, 0); 773 } 774 /************************************************************************** 775 * Send data from data buffer 776 * input: data - data to send 777 * timeout - how long to wait until timedout 778 * callback - callback which accept IOResult 779 * 1. try to send as much as possible. If complete data sent, then return 780 * IOresult with empty output and clean timeout and error fileds. 781 * 2. If we can't send complete buffer, then prepare io call and return 782 * nonempty result output. 783 * So at return user have to check: 784 * a) if result.error == true - send failed 785 * b) if result.data.empty - data send completed 786 * c) otherwise io call were issued, user will receive callback 787 **************************************************************************/ 788 IOResult send(hlEvLoop loop, immutable(ubyte)[] data, Duration timeout, void delegate(IOResult) @safe callback) @safe { 789 790 enforce!SocketException(data.length > 0, "You must have non-empty 'data' when calling 'send'"); 791 792 IOResult result; 793 result.output = data; 794 795 uint flags = 0; 796 version(linux) { 797 flags = MSG_NOSIGNAL; 798 } 799 auto rc = (() @trusted => .send(_fileno, &data[0], data.length, flags))(); 800 if ( rc < 0 ) { 801 auto err = errno(); 802 if ( err != EWOULDBLOCK && err != EAGAIN ) { 803 // case a. 804 result.error = true; 805 return result; 806 } 807 rc = 0; // like we didn't sent anything 808 } 809 data = data[rc..$]; 810 result.output = data; 811 if ( result.output.empty ) { 812 // case b. send comleted 813 debug tracef("fast send to %d completed", _fileno); 814 return result; 815 } 816 // case c. - we have to use event loop 817 IORequest iorq; 818 iorq.output = data; 819 iorq.callback = callback; 820 io(loop, iorq, timeout); 821 return result; 822 } 823 } 824 825 826 private auto str2inetaddr(string addr) @safe pure { 827 auto pos = indexOf(addr, ':'); 828 if ( pos == -1 ) { 829 throw new Exception("incorrect addr %s, expect host:port", addr); 830 } 831 auto host = addr[0..pos].split('.'); 832 auto port = addr[pos+1..$]; 833 // auto s = addr.split(":"); 834 // if ( s.length != 2 ) { 835 // throw new Exception("incorrect addr %s, expect host:port", addr); 836 // } 837 // host = s[0].split("."); 838 if ( host.length != 4 ) { 839 throw new Exception("addr must be in form a.b.c.d:p, got: " ~ addr); 840 } 841 uint a = to!ubyte(host[0]) << 24 | to!ubyte(host[1]) << 16 | to!ubyte(host[2]) << 8 | to!ubyte(host[3]); 842 ushort p = to!ushort(port); 843 return tuple(core.sys.posix.arpa.inet.htonl(a), core.sys.posix.arpa.inet.htons(p)); 844 } 845 846 @safe unittest { 847 import core.sys.posix.arpa.inet; 848 assert(str2inetaddr("0.0.0.0:1") == tuple(0, htons(1))); 849 assert(str2inetaddr("1.0.0.0:0") == tuple(htonl(0x01000000),0 )); 850 assert(str2inetaddr("255.255.255.255:0") == tuple(0xffffffff, 0)); 851 } 852 853 @safe unittest { 854 globalLogLevel = LogLevel.info; 855 856 hlSocket s0 = new hlSocket(); 857 s0.open(); 858 hlSocket s1 = s0; 859 s0.close(); 860 s1.close(); 861 } 862 863 @safe unittest { 864 globalLogLevel = LogLevel.info; 865 866 hlSocket s = new hlSocket(); 867 s.open(); 868 s.close(); 869 } 870 871 //unittest { 872 // globalLogLevel = LogLevel.trace; 873 // 874 // hlSocket s = new hlSocket(); 875 // s.open(); 876 // 877 // 878 // auto mockLoop = new hlEvLoop(); 879 // mockLoop.run = (Duration d) { 880 // s._handler(AppEvent.IN); 881 // }; 882 // 883 // mockLoop.startPoll = (int fd, AppEvent ev, FileHandlerFunction f) @safe { 884 // tracef("called mock startPoll: %d, %s", fd, appeventToString(ev)); 885 // }; 886 // 887 // mockLoop.stopPoll = (int fd, AppEvent ev) @safe { 888 // tracef("called mock stopPoll: %d, %s", fd, appeventToString(ev)); 889 // }; 890 // 891 // IORequest iorq; 892 // iorq.to_read = 1; 893 // iorq.output = "abc".representation(); 894 // iorq.callback = (IOResult r) { 895 // tracef("called mock callback: %s", r); 896 // }; 897 // auto result = s.io(mockLoop, iorq, 1.seconds); 898 // mockLoop.run(1.seconds); 899 // assert(s._polling == AppEvent.NONE); 900 // iorq.to_read = 0; 901 // result = s.io(mockLoop, iorq, 1.seconds); 902 // mockLoop.run(1.seconds); 903 // assert(s._polling == AppEvent.NONE); 904 // s.close(); 905 //} 906 907 class HioSocket 908 { 909 import core.thread; 910 911 struct InputStream { 912 private { 913 size_t _buffer_size = 16*1024; 914 Duration _timeout = 1.seconds; 915 HioSocket _socket; 916 bool _started; 917 bool _done; 918 immutable(ubyte)[] _data; 919 } 920 this(HioSocket s, Duration t = 10.seconds) @safe { 921 _socket = s; 922 _timeout = t; 923 _buffer_size = s._socket._buffer_size; 924 } 925 bool empty() { 926 return _started && _done; 927 } 928 auto front() { 929 if ( _done ) { 930 _data.length = 0; 931 return _data; 932 } 933 if (!_started ) { 934 _started = true; 935 auto r = _socket.recv(_buffer_size, _timeout); 936 if (r.timedout || r.error || r.input.length == 0) { 937 _done = true; 938 } else { 939 _data = r.input; 940 } 941 } 942 debug tracef("InputStream front: %s", _data); 943 return _data; 944 } 945 void popFront() { 946 auto r = _socket.recv(_buffer_size, _timeout); 947 if (r.timedout || r.error || r.input.length == 0) { 948 _done = true; 949 } 950 else { 951 _data = r.input; 952 } 953 } 954 } 955 private { 956 hlSocket _socket; 957 Fiber _fiber; 958 } 959 this(ubyte af = AF_INET, int sock_type = SOCK_STREAM, string f = __FILE__, int l = __LINE__) @safe { 960 _socket = new hlSocket(af, sock_type, f, l); 961 _socket.open(); 962 } 963 964 this(int fileno, ubyte af = AF_INET, int sock_type = SOCK_STREAM, string f = __FILE__, int l = __LINE__) { 965 _socket = new hlSocket(fileno, af, sock_type, f, l); 966 } 967 968 ~this() { 969 // if ( _socket ) { 970 // _socket.close(); 971 // } 972 } 973 override string toString() { 974 return _socket.toString(); 975 } 976 977 void bind(string addr) { 978 _socket.bind(addr); 979 } 980 void handler(AppEvent e) @safe { 981 debug 982 { 983 tracef("HioSocket handler enter"); 984 } 985 (()@trusted{_fiber.call();})(); 986 } 987 void connect(Address addr, Duration timeout) @trusted { 988 auto loop = getDefaultLoop(); 989 _fiber = Fiber.getThis(); 990 void callback(AppEvent e) { 991 if (!(e & AppEvent.IMMED)) { 992 (() @trusted { _fiber.call(); })(); 993 } 994 } 995 996 if (_socket.connect(addr, loop, &callback, timeout)) { 997 Fiber.yield(); 998 } 999 } 1000 /// 1001 void connect(string addr, Duration timeout) @trusted { 1002 auto loop = getDefaultLoop(); 1003 _fiber = Fiber.getThis(); 1004 if ( _fiber is null ) { 1005 // we are not in context of any task, connect synchronously 1006 // 1. set blocking mode, socket timeout 1007 // 2. call connect, throw if faied 1008 // 3. set unblocking mode 1009 // 4. return 1010 _socket.blocking = true; 1011 _socket.connect(addr, timeout); 1012 _socket.blocking = false; 1013 return; 1014 } 1015 void callback(AppEvent e) { 1016 if ( !(e & AppEvent.IMMED) ) { 1017 // we called yield 1018 (() @trusted { _fiber.call(); })(); 1019 } 1020 } 1021 if ( _socket.connect(addr, loop, &callback, timeout) ) { 1022 Fiber.yield(); 1023 } 1024 if ( _socket._errno == ECONNREFUSED ) { 1025 throw new ConnectionRefused("Unable to connect socket: connection refused on " ~ addr); 1026 } 1027 } 1028 /// 1029 bool connected() const @safe { 1030 return _socket.connected; 1031 } 1032 /// 1033 auto errno() @safe { 1034 return _socket.socket_errno(); 1035 } 1036 /// 1037 auto listen(int backlog = 10) @safe { 1038 return _socket.listen(backlog); 1039 } 1040 /// 1041 void close() @safe { 1042 if ( _socket ) { 1043 _socket.close(); 1044 _socket = null; 1045 } 1046 } 1047 /// 1048 auto accept(Duration timeout = Duration.max) { 1049 HioSocket s; 1050 1051 auto loop = getDefaultLoop(); 1052 _fiber = Fiber.getThis(); 1053 1054 void callback(int fileno) @trusted { 1055 debug tracef("Got %d on accept", fileno); 1056 if ( fileno < 0 ) { 1057 s = null; 1058 _fiber.call(); 1059 return; 1060 } 1061 debug tracef("got accept callback for socket %d", fileno); 1062 if ( _socket._polling & AppEvent.IN ) { 1063 getDefaultLoop.stopPoll(_socket.fileno, AppEvent.IN); 1064 _socket._polling &= ~AppEvent.IN; 1065 } 1066 _socket._state = hlSocket.State.IDLE; 1067 s = new HioSocket(fileno); 1068 _fiber.call(); 1069 } 1070 _socket._accepts_in_a_row = 1; 1071 _socket.accept(loop, timeout, &callback); 1072 Fiber.yield(); 1073 return s; 1074 } 1075 /// 1076 IOResult recv(size_t n, Duration timeout = 10.seconds) @trusted { 1077 IORequest ioreq; 1078 IOResult iores; 1079 1080 _fiber = Fiber.getThis(); 1081 if ( _fiber is null) { 1082 // read not in context of any fiber. Blocked read. 1083 _socket.blocking = true; 1084 ioreq.to_read = n; 1085 iores = _socket.io(ioreq, timeout); 1086 _socket.blocking = false; 1087 return iores; 1088 } 1089 void callback(IOResult ior) @trusted { 1090 debug tracef("got ior on recv: %s", ior); 1091 iores = ior; 1092 _fiber.call(); 1093 } 1094 1095 ioreq.to_read = n; 1096 ioreq.callback = &callback; 1097 _socket.io(getDefaultLoop(), ioreq, timeout); 1098 debug tracef("recv yielding on %s", _socket); 1099 Fiber.yield(); 1100 debug tracef("recv done on %s", _socket); 1101 return iores; 1102 } 1103 /// 1104 size_t send(immutable (ubyte)[] data, Duration timeout = 1.seconds) @trusted { 1105 _fiber = Fiber.getThis(); 1106 IOResult ioresult; 1107 1108 if ( _fiber is null ) { 1109 IORequest ioreq; 1110 _socket.blocking = true; 1111 ioreq.output = data; 1112 ioresult = _socket.io(ioreq, timeout); 1113 _socket.blocking = false; 1114 if ( ioresult.error ) { 1115 return -1; 1116 } 1117 return 0; 1118 } 1119 1120 void callback(IOResult ior) @trusted { 1121 ioresult = ior; 1122 _fiber.call(); 1123 } 1124 ioresult = _socket.send(getDefaultLoop(), data, timeout, &callback); 1125 if ( ioresult.error ) { 1126 return -1; 1127 } 1128 if ( ioresult.output.empty ) { 1129 return data.length; 1130 } 1131 Fiber.yield(); 1132 if (ioresult.error) { 1133 return -1; 1134 } 1135 return data.length - ioresult.output.length; 1136 } 1137 /// 1138 InputStream inputStream(Duration t=10.seconds) @safe { 1139 return InputStream(this, t); 1140 } 1141 } 1142 1143 unittest { 1144 import core.thread; 1145 import hio.scheduler; 1146 1147 globalLogLevel = LogLevel.info; 1148 void server(ushort port) { 1149 auto s = new HioSocket(); 1150 s.bind("127.0.0.1:%s".format(port)); 1151 s.listen(); 1152 auto c = s.accept(2.seconds); 1153 if ( c is null ) { 1154 s.close(); 1155 throw new Exception("Accept failed"); 1156 } 1157 auto io = c.recv(64, 1.seconds); 1158 assert(io.input == "hello".representation); 1159 c.send("world".representation); 1160 c.close(); 1161 s.close(); 1162 } 1163 void client(ushort port) { 1164 auto s = new HioSocket(); 1165 s.connect("127.0.0.1:%d".format(port), 1.seconds); 1166 scope(exit) { 1167 s.close(); 1168 } 1169 if ( s.connected ) { 1170 auto rq = s.send("hello".representation, 1.seconds); 1171 auto rs = s.recv(64, 1.seconds); 1172 assert(rs.input == "world".representation); 1173 } else { 1174 throw new Exception("Can't connect to server"); 1175 } 1176 } 1177 1178 info("Test HioSockets 0"); 1179 1180 // all ok case 1181 auto t = new Thread({ 1182 try{ 1183 App(&server, cast(ushort)12345); 1184 } catch (Exception e) { 1185 infof("Got %s in server", e); 1186 } 1187 }).start; 1188 Thread.sleep(500.msecs); 1189 client(12345); 1190 t.join; 1191 1192 info("Test HioSockets 1"); 1193 // all fail case - everything should throw 1194 t = new Thread({ 1195 App(&server, cast(ushort) 12345); 1196 }).start; 1197 assertThrown!Exception(client(12346)); 1198 assertThrown!Exception(t.join); 1199 1200 info("Test HioSockets 2"); 1201 // the same but client in App 1202 // all ok case 1203 t = new Thread({ 1204 App(&server, cast(ushort) 12345); 1205 }).start; 1206 Thread.sleep(500.msecs); 1207 App(&client, cast(ushort)12345); 1208 t.join; 1209 1210 info("Test HioSockets 3"); 1211 // all fail case - everything should throw 1212 t = new Thread({ App(&server, cast(ushort) 12345); }).start; 1213 assertThrown!Exception(App(&client, cast(ushort)12346)); 1214 assertThrown!Exception(t.join); 1215 } 1216 1217 /// 1218 struct ByLineSplitter(R) { 1219 import nbuff; 1220 private { 1221 enum NL = "\n".representation; 1222 R source; 1223 Buffer buff; 1224 long last_position; 1225 string line; 1226 } 1227 /// 1228 this(R r) { 1229 source = r; 1230 popFront; 1231 } 1232 bool empty() { 1233 return line is null && source.empty && buff.empty; 1234 } 1235 string front() { 1236 return line; 1237 } 1238 void popFront() { 1239 while (true) { 1240 auto p = buff.countUntil(last_position, NL); 1241 if (p >= 0) { 1242 last_position = 0; 1243 if ( p == 0 ) { 1244 line = ""; 1245 } else { 1246 line = cast(string) buff[0 .. p].data; 1247 } 1248 buff = buff[p + 1 .. $]; 1249 return; 1250 } 1251 last_position = buff.length; 1252 if (source.empty) { 1253 line = cast(string) buff[0 .. $].data; 1254 buff = Buffer(); 1255 return; 1256 } 1257 // fill from source and retry 1258 buff.append(source.front); 1259 source.popFront; 1260 } 1261 } 1262 } 1263 1264 /// 1265 auto byLineSplitter(R)(R r) { 1266 return ByLineSplitter!R(r); 1267 } 1268 1269 unittest { 1270 import std.range; 1271 for(int s=1;s<7;s++) { 1272 auto result = "a\nbb\n\n\nc\n\n".chunks(s) 1273 .array.map!"to!string(a).representation".byLineSplitter; 1274 assert(equal(result, ["a", "bb", "", "", "c", ""])); 1275 } 1276 }