1 module hio.http.client; 2 3 import std.experimental.logger; 4 import std.stdio; 5 import std.socket; 6 import std.string; 7 import std.array; 8 import std.datetime; 9 import std.algorithm; 10 import std.exception; 11 import std.typecons: Tuple; 12 import std.uni: toLower; 13 import std.zlib: UnCompress, HeaderFormat; 14 15 import hio.socket; 16 import hio.resolver; 17 import hio.loop; 18 import hio.zlib; 19 import hio.tls; 20 21 public import hio.http.common; 22 import hio.http.http_parser; 23 24 import ikod.containers.hashmap: HashMap, hash_function; 25 import ikod.containers.unrolledlist; 26 27 import nbuff: Nbuff, NbuffChunk, SmartPtr; 28 29 AsyncSocketLike socketFabric(URL url) @safe 30 { 31 if ( url.schemaCode == SchCode.HTTP ) 32 { 33 auto s = new hlSocket(); 34 s.open(); 35 return s; 36 } 37 if ( url.schemaCode == SchCode.HTTPS ) 38 { 39 debug(hiohttp) trace("use ssl socket"); 40 auto s = new AsyncSSLSocket(); 41 s.open(); 42 s.set_host(url.host); 43 return s; 44 } 45 assert(0); 46 } 47 48 package struct ConnectionTriple 49 { 50 string schema; 51 string host; 52 ushort port; 53 bool opEquals(R)(const R other) const 54 { 55 return schema == other.schema && port == other.port && host == other.host; 56 } 57 hash_t toHash() const @safe pure nothrow 58 { 59 return hash_function(schema) + hash_function(host) + hash_function(port); 60 } 61 } 62 63 // handle all logics like: proxy, redirects, connection pool,... 64 class AsyncHTTPClient 65 { 66 enum State 67 { 68 INIT, 69 RESOLVING, 70 CONNECTING, 71 HANDLING, 72 ERROR 73 } 74 private 75 { 76 State _state = State.INIT; 77 HashMap!(ConnectionTriple, AsyncSocketLike) _conn_pool; 78 void delegate(AsyncHTTPResult) @safe _callback; 79 AsyncSocketLike function(URL) @safe _conn_factory = &socketFabric; 80 AsyncHTTP _handler; 81 Duration _connect_timeout = 30.seconds; 82 Duration _send_timeout = 30.seconds; 83 Duration _receive_timeout = 30.seconds; 84 int _buffer_size = 16*1024; 85 int _max_redirects = 10; 86 bool _keep_alive = true; 87 int _verbosity = 0; 88 // 89 hlEvLoop _loop; 90 // lifetime - request 91 Request _request; // request headers, body, etc 92 URL _original_url; // save at begin 93 URL _current_url; // current url - follow redirects 94 AsyncSocketLike _connection; 95 InternetAddress[] _endpoints; 96 int _redirects_counter; 97 } 98 ~this() 99 { 100 close(); 101 } 102 103 void close() @safe 104 { 105 if ( _connection ) 106 { 107 _connection.close(); 108 _connection = null; 109 } 110 // close everyting in pool 111 foreach (AsyncSocketLike s; _conn_pool.byValue()) 112 { 113 s.close(); 114 } 115 _conn_pool.clear(); 116 } 117 118 private bool use_proxy() @safe 119 { 120 return false; 121 } 122 123 bool redirectAllowed() pure @safe @nogc 124 { 125 if ( _redirects_counter < _max_redirects ) 126 { 127 return true; 128 } 129 return false; 130 } 131 132 private void prepareRedirect(string location) @safe 133 { 134 _redirects_counter++; 135 // return conn to pool 136 _conn_pool[ConnectionTriple(_current_url.schema, _current_url.host, _current_url.port)] = _connection; 137 138 debug(hiohttp) tracef("new location: %s", location); 139 _current_url = parse_url(location); 140 debug(hiohttp) tracef("new url: %s", _current_url); 141 } 142 private void _http_handler_callback(AsyncHTTPResult r) @safe 143 { 144 if ( r.status_code == 302) 145 { 146 if ( !redirectAllowed() ) 147 { 148 debug(hiohttp) tracef("Can't follow redirect"); 149 r.error = AsyncHTTPErrors.MaxRedirectsReached; 150 r.status_code = -1; 151 _callback(r); 152 return; 153 } 154 string location = r.getHeader("location").toString; 155 prepareRedirect(location); 156 _execute(); 157 return; 158 } 159 // reset state 160 _state = State.INIT; 161 // return connection to pool 162 _conn_pool.put(ConnectionTriple(_current_url.schema, _current_url.host, _current_url.port), _connection); 163 _connection = null; 164 _callback(r); 165 } 166 private void _http_handler_call(AsyncSocketLike c) @safe 167 { 168 assert(_handler._state == AsyncHTTP.State.INIT); 169 _handler._connection = _connection; 170 _handler._callback = &_http_handler_callback; 171 _handler.execute(); 172 } 173 174 private void _connect_callback(AppEvent e) @safe 175 { 176 if ( _connection is null ) 177 { 178 debug(hiohttp) tracef("connect_callback on closed connection, you probably closed it already?"); 179 return; 180 } 181 debug(hiohttp) tracef("connect_callback %s", e); 182 if ( _connection.connected ) 183 { 184 _http_handler_call(_connection); 185 } 186 else 187 { 188 // retry with next address if possible 189 if ( _endpoints.length > 0) 190 { 191 _connection.close(); 192 _connection = _conn_factory(_current_url); 193 _connect_call(); 194 } 195 else 196 { 197 // no more endpoints to connect 198 AsyncHTTPResult r; 199 r.error = AsyncHTTPErrors.ConnFailed; 200 _state = State.INIT; 201 _callback(r); 202 } 203 } 204 } 205 private void _connect_call() @safe 206 { 207 // connect to first endpoint 208 auto endpoint = _endpoints[0]; 209 _endpoints = _endpoints[1..$]; 210 _connection.connect(endpoint, _loop, &_connect_callback, _connect_timeout); 211 } 212 213 private void _resolver_callback(ResolverResult4 r) @safe 214 { 215 auto status = r.status; 216 debug(hiohttp) tracef("resolve calback"); 217 if ( status != ARES_SUCCESS) 218 { 219 AsyncHTTPResult result; 220 result.error = AsyncHTTPErrors.ResolveFailed; 221 _state = State.INIT; 222 _callback(result); 223 return; 224 } 225 _state = State.CONNECTING; 226 _endpoints = r.addresses.map!(a => cast(InternetAddress)a).array; 227 _connect_call(); 228 } 229 private void _resolver_call(URL url) @safe 230 { 231 debug(hiohttp) tracef("resolve %s", url); 232 auto r = hio_gethostbyname(url.host, &_resolver_callback, url.port); 233 if ( !r.isEmpty ) 234 { 235 _resolver_callback(r); 236 } 237 } 238 239 /// build request line. 240 /// Take proxy into account. 241 private NbuffChunk _build_request_line() @trusted 242 { 243 if (!use_proxy) 244 { 245 ulong ml = _request.method.length; 246 auto pqs = _current_url.path_off >= 0 ? _current_url.url[_current_url.path_off..$] : "/"; 247 ulong pl = pqs.length; 248 auto q = Nbuff.get(ml + 1 + pl + 10); 249 copy(_request.method, q.data[0 .. ml]); 250 copy(" ".representation, q.data[ml .. ml+1]); 251 copy(pqs.representation, q.data[ml+1..ml+1+pl]); 252 copy(" HTTP/1.1\n".representation, q.data[ml+1+pl..ml + 1 + pl + 10]); 253 debug(hiohttp) tracef("rq_line"); 254 return NbuffChunk(q, ml + 1 + pl + 10); 255 } 256 else 257 { 258 assert(0); 259 } 260 } 261 /// build all headers based on reuest url, headers, etc. 262 private Nbuff _build_request_header() @safe 263 { 264 Nbuff message_header; 265 auto request_line = _build_request_line(); 266 message_header.append(request_line); 267 if (_verbosity >= 1) writef("-> %s", request_line.toString); 268 269 if ( !_request.user_headers_flags.AcceptEncoding ) 270 { 271 message_header.append("Accept-Encoding: gzip,deflate\n"); 272 if (_verbosity >= 1) writefln("-> %s: %s", "Accept-Encoding", "gzip,deflate"); 273 } 274 if ( !_request.user_headers_flags.Host ) 275 { 276 message_header.append("Host: "); 277 if ( _current_url.port == standard_port(_current_url.schema) ) 278 { 279 message_header.append(_current_url.host); 280 if (_verbosity >= 1) writefln("-> Host: %s", _current_url.host); 281 } 282 else 283 { 284 message_header.append(_current_url.host ~ ":%d".format(_current_url.port)); 285 if (_verbosity >= 1) writefln("-> Host: %s", _current_url.host ~ ":%d".format(_current_url.port)); 286 } 287 message_header.append("\n"); 288 } 289 if (!_request.user_headers_flags.UserAgent) 290 { 291 message_header.append("UserAgent: hio\n"); 292 if (_verbosity >= 1) writefln("-> %s: %s", "UserAgent", "hio"); 293 } 294 if (!_request.user_headers_flags.Connection && !_keep_alive ) 295 { 296 message_header.append("Connection: Close\n"); 297 if (_verbosity >= 1) writefln("-> Connection: Close"); 298 } 299 foreach (ref h; _request.user_headers) 300 { 301 message_header.append(h.FieldName); 302 message_header.append(": "); 303 message_header.append(h.FieldValue); 304 message_header.append("\n"); 305 if (_verbosity >= 1) 306 { 307 writefln("-> %s: %s", h.FieldName, h.FieldValue); 308 } 309 } 310 message_header.append("\n"); 311 return message_header; 312 } 313 314 private void _execute() @safe 315 { 316 import std.stdio; 317 debug(hiohttp) writefln("pool: %s", _conn_pool); 318 debug(hiohttp) writefln("url: %s", _current_url); 319 auto f = _conn_pool.fetch(ConnectionTriple(_current_url.schema, _current_url.host, _current_url.port)); 320 debug(hiohttp) writefln("f: %s", f); 321 if (f.ok) 322 { 323 debug(hiohttp) tracef("Use connection from pool"); 324 _connection = f.value; 325 } 326 else 327 { 328 debug(hiohttp) tracef("Create new connection"); 329 _state = State.CONNECTING; 330 _connection = _conn_factory(_current_url); 331 } 332 333 assert(_connection !is null); 334 335 if ( _connection.connected ) 336 { 337 debug(hiohttp) tracef("Connected, start http"); 338 _state = State.HANDLING; 339 _http_handler_call(_connection); 340 return; 341 } 342 else 343 { 344 // resolve, connect and then call handler 345 // reset resolving 346 _endpoints.length = 0; 347 _state = State.HANDLING; 348 _resolver_call(_current_url); 349 return; 350 } 351 assert(0); 352 } 353 public void verbosity(int v) @safe @property @nogc nothrow 354 { 355 _verbosity = v; 356 } 357 public void connect_timeout(Duration v) @safe @property @nogc nothrow 358 { 359 _connect_timeout = v; 360 } 361 public void addHeader(Header h) @safe 362 { 363 _request.addHeader(h); 364 } 365 366 void execute(Method method, URL url, void delegate(AsyncHTTPResult) @safe callback) @safe 367 { 368 enforce(_state == State.INIT, "You can't reenter this"); 369 enforce(_connection is null, "You can't reenter this"); 370 _loop = getDefaultLoop(); 371 _original_url = url; 372 _current_url = url; 373 _request.method = method; 374 _redirects_counter = 0; 375 376 _callback = callback; 377 _handler._client = this; 378 _handler._verbosity = _verbosity; 379 _execute(); 380 } 381 } 382 383 unittest 384 { 385 import std.experimental.logger; 386 globalLogLevel = LogLevel.info; 387 import std.stdio; 388 AsyncHTTPClient c = new AsyncHTTPClient(); 389 c._request.method = Method("GET"); 390 c._current_url = parse_url("http://example.com:8080/path"); 391 auto q = c._build_request_line(); 392 393 assert(q == "GET /path HTTP/1.1\n".representation); 394 395 c.addHeader(Header("X-header", "x-value")); 396 c.addHeader(Header("Content-Length", "100")); 397 398 Nbuff header = c._build_request_header(); 399 //writeln(cast(string)header.data.data); 400 } 401 extern(C) 402 { 403 private int on_header_field(http_parser* parser, const char* at, size_t length) 404 { 405 AsyncHTTP* c = cast(AsyncHTTP*)parser.data; 406 assert(c._state == AsyncHTTP.State.HEADERS_RECEIVE); 407 assert(c._current_buffer_length > 0); 408 long position = cast(immutable(ubyte)*)at - c._current_buffer_ptr; 409 debug(hiohttp) tracef("on header field in state %s", c._expected_part); 410 debug(hiohttp) tracef("on header field: %d", c._offset + position); 411 if (length>0) 412 debug(hiohttp) tracef("on header field: %s", 413 cast(string)c._response_headers_buffer[c._offset + position..c._offset + position+length].data.data); 414 final switch(c._expected_part) 415 { 416 case AsyncHTTP.HeaderPart.Value: 417 // finalize value and start next Header 418 debug(hiohttp) tracef("got <%s: %s>", 419 cast(string)c._response_headers_buffer.data.data[c._field_beg..c._field_end], 420 cast(string)c._response_headers_buffer.data.data[c._value_beg..c._value_end]); 421 c.processHeader(); 422 goto case; 423 case AsyncHTTP.HeaderPart.None: 424 c._expected_part = AsyncHTTP.HeaderPart.Field; 425 c._field_beg = c._offset + position; 426 c._field_end = c._field_beg + length; 427 break; 428 case AsyncHTTP.HeaderPart.Field: 429 // continue receiving current header 430 c._field_end += length; 431 break; 432 } 433 return 0; 434 } 435 private int on_header_value(http_parser* parser, const char* at, size_t length) 436 { 437 AsyncHTTP* c = cast(AsyncHTTP*)parser.data; 438 assert(c._state == AsyncHTTP.State.HEADERS_RECEIVE); 439 debug(hiohttp) tracef("on header value in state %s", c._expected_part); 440 long position = cast(immutable(ubyte)*)at - c._current_buffer_ptr; 441 if (length>0) 442 debug(hiohttp) tracef("on header value: %s", 443 cast(string)c._response_headers_buffer[c._offset + position..c._offset + position+length].data.data); 444 // async_http_connection c = cast(async_http_connection)parser.data; 445 // c._state = State.IDLE; 446 final switch(c._expected_part) 447 { 448 case AsyncHTTP.HeaderPart.None: 449 assert(0); 450 case AsyncHTTP.HeaderPart.Value: 451 c._value_end += length; 452 break; 453 case AsyncHTTP.HeaderPart.Field: 454 // finalize header and start next value 455 debug(hiohttp) tracef("got Field: %s", cast(string)c._response_headers_buffer.data.data[c._field_beg..c._field_end]); 456 c._expected_part = AsyncHTTP.HeaderPart.Value; 457 c._value_beg = c._offset + position; 458 c._value_end = c._value_beg + length; 459 break; 460 } 461 return 0; 462 } 463 private int on_headers_complete(http_parser* parser) 464 { 465 debug(hiohttp) tracef("headers complete"); 466 AsyncHTTP* c = cast(AsyncHTTP*)parser.data; 467 debug(hiohttp) tracef("got <%s: %s>", 468 cast(string)c._response_headers_buffer.data.data[c._field_beg..c._field_end], 469 cast(string)c._response_headers_buffer.data.data[c._value_beg..c._value_end]); 470 c.processHeader(); 471 c._state = c.onHeadersComplete(); 472 debug(hiohttp) tracef("next state: %s", c._state); 473 return 0; 474 } 475 private int on_body(http_parser* parser, const char* at, size_t length) 476 { 477 debug(hiohttp) tracef("on body"); 478 AsyncHTTP* c = cast(AsyncHTTP*)parser.data; 479 long position = cast(immutable(ubyte)*)at - c._current_buffer_ptr; 480 assert(c._state == AsyncHTTP.State.BODY_RECEIVE, "Expected state BODY_RECEIVE, got %s".format(c._state)); 481 c.onBody(position, length); 482 return 0; 483 } 484 private int on_message_complete(http_parser* parser) 485 { 486 debug(hiohttp) tracef("message complete"); 487 // async_http_connection c = cast(async_http_connection)parser.data; 488 // c._state = State.IDLE; 489 AsyncHTTP* c = cast(AsyncHTTP*)parser.data; 490 c.onBodyComplete(); 491 return 0; 492 } 493 } 494 495 // handle strait forward http request-response 496 struct AsyncHTTP 497 { 498 private 499 { 500 enum State 501 { 502 INIT, 503 SENDING_REQUEST, 504 HEADERS_RECEIVE, 505 HEADERS_COMPLETED, 506 BODY_RECEIVE, 507 DONE, 508 ERROR 509 } 510 enum HeaderPart 511 { 512 None = 0, 513 Field, 514 Value 515 } 516 enum ContentEncoding 517 { 518 NONE, 519 GZIP, 520 DEFLATE 521 } 522 State _state; 523 AsyncSocketLike _connection; 524 void delegate(AsyncHTTPResult) @safe 525 _callback; 526 http_parser _parser; 527 http_parser_settings _parser_settings = { 528 on_header_field: &on_header_field, 529 on_header_value: &on_header_value, 530 on_headers_complete: &on_headers_complete, 531 on_body: &on_body, 532 on_message_complete: &on_message_complete, 533 }; 534 int _verbosity; 535 AsyncHTTPClient _client; 536 AsyncHTTPResult _result; 537 HeaderPart _expected_part; 538 size_t _offset; // offset of each socket input 539 size_t _field_beg, _field_end; // beg and end of headers field and value 540 size_t _value_beg, _value_end; // beg and end of headers field and value 541 NbuffChunk _current_input; 542 immutable(ubyte)* _current_buffer_ptr; 543 size_t _current_buffer_length; 544 Nbuff _response_body; 545 MessageHeaders _response_headers; 546 Nbuff _response_headers_buffer; 547 ContentEncoding _content_encoding; 548 549 ZLib _zlib; 550 } 551 ~this() 552 { 553 // we can destroy it only when it is idle 554 assert(_state == State.INIT, "You can't destroy in %s".format(_state)); 555 } 556 private void reset() @safe 557 { 558 _response_body.clear; 559 _response_headers.clear; 560 _state = State.INIT; 561 if (_content_encoding == ContentEncoding.GZIP || _content_encoding==ContentEncoding.DEFLATE) 562 { 563 _zlib.zFlush(); 564 } 565 _content_encoding = ContentEncoding.NONE; 566 } 567 568 private void processHeader() 569 { 570 import std.stdio; 571 // header field: _response_headers_buffer[_field_beg.._field_end] 572 // header value: _response_headers_buffer[_value_beg.._value_end] 573 NbuffChunk field = _response_headers_buffer.data(_field_beg, _field_end); 574 NbuffChunk value = _response_headers_buffer.data(_value_beg, _value_end); 575 if ( _verbosity >= 1 ) { 576 writefln("<- %s: %s", cast(string)field.data, cast(string)value.data); 577 } 578 if (field.toString.toLower == "content-encoding" ) 579 { 580 string v = value.toString.toLower; 581 switch (v) 582 { 583 case "gzip": 584 debug(hiohttp) tracef("decode from gzip"); 585 _content_encoding = ContentEncoding.GZIP; 586 _zlib.zInit(_client._buffer_size); 587 break; 588 case "deflate": 589 debug(hiohttp) tracef("decode from deflate"); 590 _content_encoding = ContentEncoding.DEFLATE; 591 _zlib.zInit(_client._buffer_size); 592 break; 593 default: 594 break; 595 } 596 597 } 598 else 599 { 600 _response_headers.pushBack(MessageHeader(field, value)); 601 } 602 } 603 604 private void _receiving_response(ref IOResult res) @safe 605 { 606 assert(_state == State.HEADERS_RECEIVE || _state == State.BODY_RECEIVE); 607 608 if (res.timedout) 609 { 610 onTimeout(); 611 return; 612 } 613 if ( res.error ) 614 { 615 onError(); 616 return; 617 } 618 619 _current_input = res.input; 620 621 size_t r; 622 switch(_state) 623 { 624 case State.HEADERS_RECEIVE: 625 _response_headers_buffer.append(_current_input); 626 () @trusted { 627 // ptr used in http_parser_execute callback 628 _current_buffer_ptr = _current_input.data.ptr; 629 _current_buffer_length = _current_input.data.length; 630 r = http_parser_execute(&_parser, &_parser_settings, cast(char*)_current_buffer_ptr, _current_buffer_length); 631 }(); 632 assert(r == _current_buffer_length); 633 _offset += _current_buffer_length; 634 _current_buffer_length = 0; 635 break; 636 case State.BODY_RECEIVE: 637 () @trusted { 638 // ptr used in http_parser_execute callback 639 _current_buffer_ptr = _current_input.data.ptr; 640 _current_buffer_length = _current_input.data.length; 641 r = http_parser_execute(&_parser, &_parser_settings, cast(char*)_current_buffer_ptr, _current_buffer_length); 642 }(); 643 assert(r == _current_buffer_length, "r=%d on body receive, instead of %d".format(r, _current_buffer_length)); 644 break; 645 case State.DONE: 646 break; 647 default: 648 assert(0); 649 } 650 // we processed data, what we have to do in new state 651 switch(_state) 652 { 653 case State.BODY_RECEIVE, State.HEADERS_RECEIVE: 654 IORequest iorq; 655 iorq.to_read = _client._buffer_size; 656 iorq.callback = &_receiving_response; 657 _parser.data = &_client._handler; 658 _connection.io(_client._loop, iorq, _client._receive_timeout); 659 return; 660 default: 661 return; 662 } 663 } 664 665 private void _send_request_done(ref IOResult res) @safe 666 { 667 //debug(hiohttp) tracef("result: %s", res); 668 if ( res.error || res.timedout ) 669 { 670 _state = State.ERROR; 671 _callback(_result); 672 return; 673 } 674 IORequest iorq; 675 iorq.to_read = _client._buffer_size; 676 iorq.callback = &_receiving_response; 677 _parser.data = &_client._handler; 678 _state = State.HEADERS_RECEIVE; 679 _connection.io(_client._loop, iorq, _client._receive_timeout); 680 } 681 private void _send_request() @safe 682 { 683 assert(_state == State.SENDING_REQUEST); 684 IORequest iorq; 685 iorq.output = _client._build_request_header(); 686 iorq.callback = &_send_request_done; 687 _connection.io(_client._loop, iorq, _client._send_timeout); 688 } 689 private State onHeadersComplete() 690 { 691 debug(hiohttp) tracef("Checking headers"); 692 _result.status_code = _parser.status_code; 693 _response_body.clear; 694 _result.response_headers = _response_headers; 695 return State.BODY_RECEIVE; 696 } 697 private void onBody(long off, size_t len) 698 { 699 import std.stdio; 700 NbuffChunk b = _current_input[off..off+len]; 701 if ( _content_encoding == ContentEncoding.GZIP || _content_encoding == ContentEncoding.DEFLATE ) 702 { 703 int processed = 0; 704 while(processed < b.length) 705 { 706 auto r = _zlib.zInflate(b[processed..$]); 707 processed += r.consumed; 708 if (r.result.length) 709 { 710 _response_body.append(r.result); 711 } 712 if ( r.status != 0 ) 713 { 714 break; 715 } 716 } 717 } 718 else 719 { 720 _response_body.append(b); 721 } 722 } 723 private void onBodyComplete() 724 { 725 debug(hiohttp) tracef("Body complete"); 726 _result.response_body = _response_body; 727 AsyncHTTPResult result = _result; 728 auto cb = _callback; 729 reset(); 730 cb(result); 731 return; 732 } 733 private void onTimeout() @safe 734 { 735 _result.status_code = -1; 736 _result.error = AsyncHTTPErrors.Timeout; 737 AsyncHTTPResult result = _result; 738 auto cb = _callback; 739 reset(); 740 cb(result); 741 return; 742 } 743 private void onError() @safe 744 { 745 _result.status_code = -1; 746 _result.error = AsyncHTTPErrors.DataError; 747 AsyncHTTPResult result = _result; 748 auto cb = _callback; 749 reset(); 750 cb(result); 751 return; 752 } 753 void execute() @safe 754 { 755 assert(_connection.connected); 756 assert(_state == State.INIT); 757 debug(hiohttp) tracef("handling %s", _client._current_url); 758 _state = State.SENDING_REQUEST; 759 http_parser_init(&_parser, http_parser_type.HTTP_RESPONSE); 760 _send_request(); 761 } 762 } 763 764 class HTTPClient 765 { 766 import core.thread: Fiber; 767 private 768 { 769 AsyncHTTPClient _async_client; 770 } 771 this() 772 { 773 _async_client = new AsyncHTTPClient(); 774 } 775 AsyncHTTPResult execute(Method method, URL url) @safe 776 { 777 AsyncHTTPResult result; 778 Fiber f = Fiber.getThis(); 779 assert(f !is null, "You can call this only inside from task/fiber"); 780 void callback(AsyncHTTPResult r) @trusted 781 { 782 result = r; 783 f.call(); 784 } 785 _async_client.execute(method, url, &callback); 786 () @trusted { 787 Fiber.yield(); 788 }(); 789 return result; 790 } 791 void close() 792 { 793 if ( _async_client !is null ) 794 { 795 _async_client.close(); 796 } 797 } 798 } 799 800 801 struct AsyncHTTPResult 802 { 803 int status_code = -1; 804 AsyncHTTPErrors error = AsyncHTTPErrors.None; 805 Nbuff response_body; 806 MessageHeaders response_headers; 807 private 808 { 809 HashMap!(string, NbuffChunk) cached_headers; 810 } 811 NbuffChunk getHeader(string h) @safe 812 { 813 auto hl = h.toLower; 814 auto f = cached_headers.fetch(hl); 815 if ( f.ok ) 816 { 817 return f.value; 818 } 819 // iterate over non-cached headers, store in cache and return value if found 820 while(!response_headers.empty) 821 { 822 auto hdr = response_headers.front; 823 response_headers.popFront; 824 string lowered_field = hdr.field.toLower(); 825 cached_headers.put(lowered_field, hdr.value); 826 if ( lowered_field == hl ) 827 { 828 return hdr.value; 829 } 830 } 831 return NbuffChunk(); 832 } 833 } 834 835 unittest 836 { 837 AsyncHTTPResult r; 838 r.response_headers.pushBack(MessageHeader(NbuffChunk("Abc"), NbuffChunk("abc"))); 839 r.response_headers.pushBack(MessageHeader(NbuffChunk("Def"), NbuffChunk("bbb"))); 840 auto abc = r.getHeader("abc"); 841 assert(abc.data == "abc"); 842 abc = r.getHeader("abc"); 843 assert(abc.data == "abc"); 844 abc = r.getHeader("def"); 845 assert(abc.data == "bbb"); 846 abc = r.getHeader("none"); 847 assert(abc.empty); 848 } 849 850 unittest 851 { 852 import std.stdio; 853 import hio.scheduler; 854 import std.experimental.logger; 855 globalLogLevel = LogLevel.info; 856 App({ 857 info("Test httpclient in task"); 858 HTTPClient c = new HTTPClient(); 859 auto r = c.execute(Method("GET"), parse_url("https://httpbin.org/get")); 860 writefln("result: %s", r); 861 r = c.execute(Method("GET"), parse_url("https://httpbin.org/get")); 862 writefln("result: %s", r); 863 c.close(); 864 }); 865 uninitializeLoops(); 866 } 867 868 unittest 869 { 870 import std.stdio; 871 import hio.scheduler; 872 import std.datetime; 873 import std.experimental.logger; 874 globalLogLevel = LogLevel.info; 875 info("Test http.client"); 876 App({ 877 info("test stream"); 878 AsyncHTTPClient client = new AsyncHTTPClient(); 879 void callback(AsyncHTTPResult result) @safe 880 { 881 //infof("Client called back with result: %s", result); 882 () @trusted { 883 if ( result.status_code < 0 ) 884 { 885 //writefln("<<<error: %s>>>", result.error.msg); 886 //writefln("<<<erro %s>>>", result.error.msg); 887 return; 888 } 889 //writefln("<<<body %s>>>", cast(string)result.response_body.data.data); 890 //writefln("<<<erro %s>>>", result.error.msg); 891 }(); 892 client.close(); 893 getDefaultLoop.stop(); 894 } 895 URL url = parse_url("http://httpbin.org/stream/100"); 896 client.verbosity = 1; 897 client.execute(Method("GET"), url, &callback); 898 getDefaultLoop.run(); 899 }); 900 globalLogLevel = LogLevel.info; 901 App({ 902 info("test conn reuse"); 903 int requests = 3; 904 AsyncHTTPClient client = new AsyncHTTPClient(); 905 URL url = parse_url("http://httpbin.org/get"); 906 void callback(AsyncHTTPResult result) @safe 907 { 908 if ( result.status_code != 200 ) 909 { 910 debug(hiohttp) tracef("status code = %d", result.status_code); 911 } 912 debug writefln("<%s>", cast(string)result.response_body.data.data); 913 if (--requests == 0) 914 { 915 client.close(); 916 getDefaultLoop.stop(); 917 return; 918 } 919 client.addHeader(Header("X-Request-No", "%d".format(requests))); 920 client.execute(Method("GET"), url, &callback); 921 } 922 client.verbosity = 1; 923 client.addHeader(Header("X-Request-No", "%d".format(requests))); 924 client.execute(Method("GET"), url, &callback); 925 hlSleep(25.seconds); 926 }); 927 // test redirects - disabled while httpbin.org give 404 on this url 928 // App({ 929 // info("test absolute redirects"); 930 // AsyncHTTPClient client = new AsyncHTTPClient(); 931 // void callback(AsyncHTTPResult result) @safe 932 // { 933 // assert(result.status_code == 200); 934 // debug writefln("<%s>", cast(string)result.response_body.data.data); 935 // client.close(); 936 // getDefaultLoop.stop(); 937 // } 938 // URL url = parse_url("http://httpbin.org/absolute-redirect/3"); 939 // client.verbosity = 1; 940 // client.execute(Method("GET"), url, &callback); 941 // hlSleep(25.seconds); 942 // }); 943 944 // App({ 945 // info("test absolute redirects with limited redirects number"); 946 // AsyncHTTPClient client = new AsyncHTTPClient(); 947 // void callback(AsyncHTTPResult result) @safe 948 // { 949 // assert(result.status_code == -1); 950 // assert(result.error == AsyncHTTPErrors.MaxRedirectsReached); 951 // debug writefln("<%s>", cast(string)result.response_body.data.data); 952 // client.close(); 953 // getDefaultLoop.stop(); 954 // } 955 // URL url = parse_url("http://httpbin.org/absolute-redirect/3"); 956 // client.verbosity = 1; 957 // client._max_redirects = 1; 958 // client.execute(Method("GET"), url, &callback); 959 // hlSleep(25.seconds); 960 // }); 961 globalLogLevel = LogLevel.info; 962 App({ 963 AsyncHTTPClient client = new AsyncHTTPClient(); 964 void callback(AsyncHTTPResult result) @safe 965 { 966 () @trusted { 967 assert(result.status_code == -1); 968 assert(result.error == AsyncHTTPErrors.ConnFailed); 969 }(); 970 client.close(); 971 getDefaultLoop.stop(); 972 } 973 URL url = parse_url("http://2.2.2.2/"); 974 client.connect_timeout = 1.seconds; 975 client.execute(Method("GET"), url, &callback); 976 hlSleep(10.seconds); 977 }); 978 979 globalLogLevel = LogLevel.info; 980 App({ 981 AsyncHTTPClient client = new AsyncHTTPClient(); 982 void callback(AsyncHTTPResult result) @safe 983 { 984 () @trusted { 985 assert(result.status_code == 200); 986 writefln("<%s>", cast(string)result.response_body.data.data); 987 }(); 988 client.close(); 989 getDefaultLoop.stop(); 990 } 991 URL url = parse_url("http://httpbin.org/gzip"); 992 client.connect_timeout = 1.seconds; 993 client.verbosity = 1; 994 client.addHeader(Header("Accept-Encoding","gzip")); 995 client.execute(Method("GET"), url, &callback); 996 hlSleep(10.seconds); 997 }); 998 globalLogLevel = LogLevel.info; 999 App({ 1000 AsyncHTTPClient client = new AsyncHTTPClient(); 1001 void callback(AsyncHTTPResult result) @safe 1002 { 1003 () @trusted { 1004 assert(result.status_code == 200); 1005 writefln("<%s>", cast(string)result.response_body.data.data); 1006 }(); 1007 client.close(); 1008 getDefaultLoop.stop(); 1009 } 1010 URL url = parse_url("http://httpbin.org/deflate"); 1011 client.connect_timeout = 1.seconds; 1012 client.verbosity = 0; 1013 client.addHeader(Header("Accept-Encoding","deflate")); 1014 client.execute(Method("GET"), url, &callback); 1015 hlSleep(10.seconds); 1016 }); 1017 globalLogLevel = LogLevel.info; 1018 App({ 1019 AsyncHTTPClient client = new AsyncHTTPClient(); 1020 void callback(AsyncHTTPResult result) @safe 1021 { 1022 () @trusted { 1023 assert(result.status_code == 200); 1024 writefln("<%s>", cast(string)result.response_body.data.data); 1025 }(); 1026 client.close(); 1027 getDefaultLoop.stop(); 1028 } 1029 URL url = parse_url("https://httpbin.org/gzip"); 1030 client.connect_timeout = 1.seconds; 1031 client.verbosity = 0; 1032 client.addHeader(Header("Accept-Encoding","gzip")); 1033 client.execute(Method("GET"), url, &callback); 1034 hlSleep(10.seconds); 1035 }); 1036 uninitializeLoops(); 1037 }