RESTinio
Loading...
Searching...
No Matches
connection.hpp
Go to the documentation of this file.
1/*
2 restinio
3*/
4
9#pragma once
10
12
13#include <llhttp.h>
14
16
29
32
33namespace restinio
34{
35
36namespace impl
37{
38
39//
40// http_parser_ctx_t
41//
42
44
152
154#include "parser_callbacks.ipp"
155
156//
157// create_parser_settings()
158//
159
161
164template< typename Http_Methods >
167{
170
171 parser_settings.on_url =
172 []( llhttp_t * parser, const char * at, size_t length ) -> int {
173 return restinio_url_cb( parser, at, length );
174 };
175
176 parser_settings.on_header_field =
177 []( llhttp_t * parser, const char * at, size_t length ) -> int {
178 return restinio_header_field_cb( parser, at, length );
179 };
180
181 parser_settings.on_header_field_complete =
182 []( llhttp_t * parser ) -> int {
184 };
185
186 parser_settings.on_header_value =
187 []( llhttp_t * parser, const char * at, size_t length ) -> int {
188 return restinio_header_value_cb( parser, at, length );
189 };
190
191 parser_settings.on_header_value_complete =
192 []( llhttp_t * parser ) -> int {
194 };
195
196 parser_settings.on_headers_complete =
197 []( llhttp_t * parser ) -> int {
199 };
200
201 parser_settings.on_body =
202 []( llhttp_t * parser, const char * at, size_t length ) -> int {
203 return restinio_body_cb( parser, at, length );
204 };
205
206 parser_settings.on_chunk_header =
207 []( llhttp_t * parser ) -> int {
209 };
210
211 parser_settings.on_chunk_complete =
212 []( llhttp_t * parser ) -> int {
214 };
215
216 parser_settings.on_message_complete =
217 []( llhttp_t * parser ) -> int {
219 };
220
221 parser_settings.on_chunk_extension_name =
222 []( llhttp_t * parser, const char * at, size_t length ) -> int {
223 return restinio_chunk_extension_name_cb( parser, at, length );
224 };
225
226 parser_settings.on_chunk_extension_value =
227 []( llhttp_t * parser, const char * at, size_t length ) -> int {
229 };
230
231 parser_settings.on_chunk_extension_name_complete =
232 []( llhttp_t * parser ) -> int {
234 };
235
236 parser_settings.on_chunk_extension_value_complete =
237 []( llhttp_t * parser ) -> int {
239 };
240
241 return parser_settings;
242}
243
244//
245// connection_upgrade_stage_t
246//
247
263
264//
265// connection_input_t
266//
267
270{
272 std::size_t buffer_size,
276 , m_buf{ buffer_size }
277 {
278 llhttp_init( &m_parser, llhttp_type_t::HTTP_REQUEST, settings );
279 m_parser.data = &m_parser_ctx;
280 }
281
287
290
294
297
299 void
301 {
302 // Reinit parser.
304
305 // Reset context and attach it to parser.
307 }
308};
309
310template < typename Connection, typename Start_Read_CB, typename Failed_CB >
311void
313 asio_ns::ip::tcp::socket & ,
314 Connection & ,
316 Failed_CB )
317{
318 // No preparation is needed, start
320}
321
322// An overload for the case of non-TLS-connection.
323inline tls_socket_t *
325 asio_ns::ip::tcp::socket & ) noexcept
326{
327 return nullptr;
328}
329
330//
331// connection_t
332//
333
335/*
336 Working circle consists of the following steps:
337 * wait for request -- reading from socket and parsing header and body;
338 * handling request -- once the request is completely obtained it's handling
339 is deligated to a handler chosen by handler factory;
340 * writing response -- writing response to socket;
341 * back to first step o close connection -- depending on keep-alive property
342 of the last response the connection goes back to first step or
343 shutdowns.
344
345 Each step is controlled by timer (\see schedule_operation_timeout_callback())
346
347 In case of errors connection closes itself.
348*/
349template < typename Traits >
351 : public connection_base_t
352 , public executor_wrapper_t< typename Traits::strand_t >
353{
355
356 public:
357 using timer_manager_t = typename Traits::timer_manager_t;
358 using timer_guard_t = typename timer_manager_t::timer_guard_t;
361 using logger_t = typename Traits::logger_t;
362 using strand_t = typename Traits::strand_t;
363 using stream_socket_t = typename Traits::stream_socket_t;
366
371 stream_socket_t && socket,
375 endpoint_t remote_endpoint,
380 , m_socket{ std::move( socket ) }
381 , m_settings{ std::move( settings ) }
382 , m_remote_endpoint{ std::move( remote_endpoint ) }
383 , m_input{
384 m_settings->m_buffer_size,
385 m_settings->m_incoming_http_msg_limits,
386 &m_settings->m_parser_settings
387 }
388 , m_response_coordinator{ m_settings->m_max_pipelined_requests }
389 , m_timer_guard{ m_settings->create_timer_guard() }
391 , m_logger{ *( m_settings->m_logger ) }
393 {
394 // Notify of a new connection instance.
395 m_logger.trace( [&]{
396 return fmt::format(
398 "[connection:{}] start connection with {}" ),
401 } );
402 }
403
404 // Disable copy/move.
405 connection_t( const connection_t & ) = delete;
407 connection_t & operator = ( const connection_t & ) = delete;
409
410 ~connection_t() override
411 {
413 [&]{
414 return fmt::format(
416 "[connection:{}] destructor called" ),
417 connection_id() );
418 } );
419 }
420
421 void
423 {
425 m_socket,
426 *this,
427 [ & ]{
428 // Inform state listener if it used.
429 m_settings->call_state_listener( [this]() noexcept {
431 this->connection_id(),
432 this->m_remote_endpoint,
435 m_socket )
436 }
437 };
438 } );
439
440 // Start timeout checking.
443
444 // Start reading request.
446 },
447 [ & ]( const asio_ns::error_code & ec ){
449 return fmt::format(
451 "[connection:{}] prepare connection error: {}" ),
453 ec.message() );
454 } );
455 } );
456 }
457
459 void
461 {
462 m_logger.trace( [&]{
463 return fmt::format(
465 "[connection:{}] start waiting for request" ),
466 connection_id() );
467 } );
468
469 // Prepare parser for consuming new request message.
471
472 // Guard total time for a request to be read.
473 // guarding here makes the total read process
474 // to run in read_next_http_message_timelimit.
476
477 if( 0 != m_input.m_buf.length() )
478 {
479 // If a pipeline requests were sent by client
480 // then the biginning (or even entire request) of it
481 // is in the buffer obtained from socket in previous
482 // read operation.
484 }
485 else
486 {
487 // Next request (if any) must be obtained from socket.
489 }
490 }
491
511
515 {
516 return upgrade_internals_t{
518 std::move(m_socket),
519 std::move(m_lifetime_monitor)
520 };
521 }
522
523 private:
525 inline void
527 {
529 {
530 m_logger.trace( [&]{
531 return fmt::format(
533 "[connection:{}] continue reading request" ),
534 connection_id() );
535 } );
536
537
539 m_socket.async_read_some(
541 asio_ns::bind_executor(
542 this->get_executor(),
543 [this, ctx = shared_from_this()]
544 // NOTE: this lambda is noexcept since v.0.6.0.
545 ( const asio_ns::error_code & ec,
546 std::size_t length ) noexcept {
549 } ) );
550 }
551 else
552 {
553 m_logger.trace( [&]{
554 return fmt::format(
556 "[connection:{}] skip read operation: already running" ),
557 connection_id() );
558 } );
559 }
560 }
561
563 inline void
564 after_read( const asio_ns::error_code & ec, std::size_t length ) noexcept
565 {
566 if( !ec )
567 {
568 // Exceptions shouldn't go out of `after_read`.
569 // So intercept them and close the connection in the case
570 // of an exception.
571 try
572 {
573 m_logger.trace( [&]{
574 return fmt::format(
576 "[connection:{}] received {} bytes" ),
577 this->connection_id(),
578 length );
579 } );
580
581 m_input.m_buf.obtained_bytes( length );
582
583 consume_data( m_input.m_buf.bytes(), length );
584 }
585 catch( const std::exception & x )
586 {
588 return fmt::format(
590 "[connection:{}] unexpected exception during the "
591 "handling of incoming data: {}" ),
593 x.what() );
594 } );
595 }
596 }
597 else
598 {
599 // Well, if it is actually an error
600 // then close connection.
602 {
605 return fmt::format(
607 "[connection:{}] read socket error: {}; "
608 "parsed bytes: {}" ),
610 ec.message(),
612 } );
613 else
614 {
615 // A case that is not such an error:
616 // on a connection (most probably keeped alive
617 // after previous request, but a new also applied)
618 // no bytes were consumed and remote peer closes connection.
620 [&]{
621 return fmt::format(
623 "[connection:{}] EOF and no request, "
624 "close connection" ),
625 connection_id() );
626 } );
627
629 }
630 }
631 // else: read operation was cancelled.
632 }
633 }
634
636 void
637 consume_data( const char * data, std::size_t length )
638 {
639 auto * parser = &m_input.m_parser;
640
641 const auto parse_err =
642 llhttp_execute( parser, data, length );
643
644 const auto nparsed = [&]{
645 if( !parser->error_pos )
646 return length;
647 return static_cast< std::size_t >( parser->error_pos - data );
648 }();
649
650 if( nparsed > length )
651 {
652 // Parser is in the unreliable state,
653 // so we done with this connection.
655 return fmt::format(
657 "[connection:{}] unexpected parser behavior: "
658 "llhttp_execute() reports parsed bytes number ({}) "
659 "is greater than the size of a buffer ({})"
660 "that was fed to the parser" ),
662 nparsed,
663 length );
664 } );
665 return;
666 }
667
669
670 // If entire http-message was obtained,
671 // parser is stopped and the might be a part of consecutive request
672 // left in buffer, so we mark how many bytes were obtained.
673 // and next message read (if any) will be started from already existing
674 // data left in buffer.
676
677 if( HPE_OK != parse_err &&
680 {
681 // Parser error must be the one defined by llhttp_errno.
682 // It is possible to get a random error value
683 // (e.g. providing parser-callbacks that return
684 // unconventional errors), here we put an assert
685 // to quickly highlight the problems.
686 // As all the parser-callbacks are implemented by
687 // restinio the assert bellow MUST pass.
688 assert( llhttp_errno::HPE_OK <= parse_err &&
689 llhttp_errno::HPE_CB_RESET >= parse_err );
690
691 // TODO: handle case when there are some request in process.
693 return fmt::format(
695 "[connection:{}] parser error {}: {}" ),
699 } );
700
701 // nothing to do.
702 return;
703 }
704
706 {
708 }
709 else
711 }
712
714 void
716 {
717 try
718 {
719 auto & parser = m_input.m_parser;
721
722 if( m_input.m_parser.upgrade )
723 {
724 // Start upgrade connection operation.
725
726 // The first thing is to make sure
727 // that upgrade request will be handled in
728 // a non pipelined fashion.
731 }
732
735 {
736 // Run ordinary HTTP logic.
737 const auto request_id = m_response_coordinator.register_new_request();
738
739 m_logger.trace( [&]{
740 return fmt::format(
742 "[connection:{}] request received (#{}): {} {}" ),
744 request_id,
746 static_cast<llhttp_method>( parser.method ) ),
747 parser_ctx.m_header.request_target() );
748 } );
749
750 // TODO: mb there is a way to
751 // track if response was emmited immediately in handler
752 // or it was delegated
753 // so it is possible to omit this timer scheduling.
755
756 const auto handling_result =
758 std::make_shared< generic_request_t >(
759 request_id,
760 std::move( parser_ctx.m_header ),
761 std::move( parser_ctx.m_body ),
762 parser_ctx.make_chunked_input_info_if_necessary(),
765 m_settings->extra_data_factory() ) );
766
767 switch( handling_result )
768 {
771 // If handler refused request, say not implemented.
773 request_id,
778 break;
779
782 {
783 // Request was accepted,
784 // didn't create immediate response that closes connection after,
785 // and it is possible to receive more requests
786 // then start consuming yet another request.
788 }
789 break;
790 }
791 }
792 else
793 {
794 m_logger.trace( [&]{
795 const std::string default_value{};
796
797 return fmt::format(
799 "[connection:{}] upgrade request received: {} {}; "
800 "Upgrade: '{}';" ),
803 static_cast<llhttp_method>( parser.method ) ),
804 parser_ctx.m_header.request_target(),
805 parser_ctx.m_header.get_field_or(
806 http_field::upgrade, default_value ) );
807 } );
808
810 {
811 // There are no requests in handling
812 // So the current request with upgrade
813 // is the only one and can be handled directly.
814 // It is safe to call a handler for it.
816 }
817 else
818 {
819 // There are pipelined request
820 m_logger.trace( [&]{
821 return fmt::format(
823 "[connection:{}] upgrade request happened to "
824 "be a pipelined one, "
825 "and will be handled after previous requests "
826 "are handled" ),
827 connection_id() );
828 } );
829 }
830
831 // No further actions (like continue reading) in both cases are needed.
832 }
833
834 }
835 catch( const std::exception & ex )
836 {
838 return fmt::format(
840 "[connection:{}] error while handling request: {}" ),
841 this->connection_id(),
842 ex.what() );
843 } );
844 }
845 }
846
848
851 void
853 {
854 auto & parser = m_input.m_parser;
856
857 // If user responses with error
858 // then connection must be able to send
859 // (hence to receive) response.
860
861 const auto request_id = m_response_coordinator.register_new_request();
862
863 m_logger.info( [&]{
864 return fmt::format(
866 "[connection:{}] handle upgrade request (#{}): {} {}" ),
868 request_id,
870 static_cast<llhttp_method>( parser.method ) ),
871 parser_ctx.m_header.request_target() );
872 } );
873
874 // Do not guard upgrade request.
876
877 // After calling handler we expect the results or
878 // no further operations with connection
881
883 std::make_shared< generic_request_t >(
884 request_id,
885 std::move( parser_ctx.m_header ),
886 std::move( parser_ctx.m_body ),
887 parser_ctx.make_chunked_input_info_if_necessary(),
890 m_settings->extra_data_factory() ) );
891 switch( handling_result )
892 {
895 if( m_socket.is_open() )
896 {
897 // Request is rejected, so our socket
898 // must not be moved out to websocket connection.
899
900 // If handler refused request, say not implemented.
902 request_id,
907 }
908 else
909 {
910 // Request is rejected, but the socket
911 // was moved out to somewhere else???
912
913 m_logger.error( [&]{
914 return fmt::format(
916 "[connection:{}] upgrade request handler rejects "
917 "request, but socket was moved out from connection" ),
918 connection_id() );
919 } );
920 }
921 break;
922
924 /* nothing to do */
925 break;
926 }
927
928 // Else 2 cases:
929 // 1. request is handled asynchronously, so
930 // what happens next depends on handling.
931 // 2. handling was immediate, so further destiny
932 // of a connection was already determined.
933 //
934 // In both cases: here do nothing.
935 // We can't even do read-only access because
936 // upgrade handling might take place
937 // in distinct execution context.
938 // So no even a log messages here.
939 }
940
942 virtual void
945 request_id_t request_id,
947 response_output_flags_t response_output_flags,
949 write_group_t wg ) override
950 {
952 asio_ns::dispatch(
953 this->get_executor(),
954 [ this,
955 request_id,
956 response_output_flags,
957 actual_wg = std::move( wg ),
959 // NOTE that this lambda is noexcept since v.0.6.0.
960 () mutable noexcept
961 {
962 try
963 {
965 request_id,
966 response_output_flags,
967 std::move( actual_wg ) );
968 }
969 catch( const std::exception & ex )
970 {
972 return fmt::format(
974 "[connection:{}] unable to handle response: {}" ),
976 ex.what() );
977 } );
978 }
979 } );
980 }
981
983 void
986 request_id_t request_id,
988 response_output_flags_t response_output_flags,
991 {
993 try
994 {
995 wg.invoke_after_write_notificator_if_exists(
998 }
999 catch( const std::exception & ex )
1000 {
1001 m_logger.error( [&]{
1002 return fmt::format(
1004 "[connection:{}] notificator error: {}" ),
1005 connection_id(),
1006 ex.what() );
1007 } );
1008 }
1009 };
1010
1011 if( m_socket.is_open() )
1012 {
1016 {
1017 // It is response for a connection-upgrade request.
1018 // If we receive it here then it is constructed via
1019 // message builder and so connection was not transformed
1020 // to websocket connection.
1021 // So it is necessary to resume pipeline logic that was stopped
1022 // for upgrade-request to be handled as the only request
1023 // on the connection for that moment.
1025 {
1027 }
1028 }
1029
1031 {
1032 m_logger.trace( [&]{
1033 return fmt::format(
1035 "[connection:{}] append response (#{}), "
1036 "flags: {}, write group size: {}" ),
1037 connection_id(),
1038 request_id,
1039 fmtlib_tools::streamed( response_output_flags ),
1040 wg.items_count() );
1041 } );
1042
1044 request_id,
1045 response_output_flags,
1046 std::move( wg ) );
1047
1049 }
1050 else
1051 {
1052 m_logger.warn( [&]{
1053 return fmt::format(
1055 "[connection:{}] receive response parts for "
1056 "request (#{}), but response with connection-close "
1057 "attribute happened before" ),
1058 connection_id(),
1059 request_id );
1060 } );
1062 }
1063 }
1064 else
1065 {
1066 m_logger.warn( [&]{
1067 return fmt::format(
1069 "[connection:{}] try to write response, "
1070 "while socket is closed" ),
1071 connection_id() );
1072 } );
1074 }
1075 }
1076
1077 // Check if there is something to write,
1078 // and if so starts write operation.
1079 void
1081 {
1083
1085 {
1086 init_write();
1087 }
1088 }
1089
1091 void
1093 {
1094 // Here: not writing anything to socket, so
1095 // write operation can be initiated.
1096
1097 // Remember if all response cells were busy.
1100
1102
1103 if( next_write_group )
1104 {
1105 m_logger.trace( [&]{
1106 return fmt::format(
1108 "[connection:{}] start next write group for response (#{}), "
1109 "size: {}" ),
1110 this->connection_id(),
1111 next_write_group->second,
1112 next_write_group->first.items_count() );
1113 } );
1114
1115 // Check if all response cells busy:
1118
1119 // Whether we need to resume read after this group is written?
1121 response_coordinator_full_before &&
1123
1124 if( 0 < next_write_group->first.status_line_size() )
1125 {
1126 // We need to extract status line out of the first buffer
1127 assert(
1129 next_write_group->first.items().front().write_type() );
1130
1131 m_logger.trace( [&]{
1132 // Get status line:
1133 const string_view_t status_line{
1134 asio_ns::buffer_cast< const char * >(
1135 next_write_group->first.items().front().buf() ),
1136 next_write_group->first.status_line_size() };
1137
1138 return fmt::format(
1140 "[connection:{}] start response (#{}): {}" ),
1141 this->connection_id(),
1142 next_write_group->second,
1143 fmtlib_tools::streamed( status_line ) );
1144 } );
1145 }
1146
1147 // Initialize write context with a new write group.
1149 std::move( next_write_group->first ) );
1150
1151 // Start the loop of sending data from current write group.
1153 }
1154 else
1155 {
1157 }
1158 }
1159
1160 // Use aliases for shorter names.
1164
1166
1177 void
1179 {
1180 try
1181 {
1183
1184 if( std::holds_alternative< trivial_write_operation_t >( wo ) )
1185 {
1186 handle_trivial_write_operation( std::get< trivial_write_operation_t >( wo ) );
1187 }
1188 else if( std::holds_alternative< file_write_operation_t >( wo ) )
1189 {
1190 handle_file_write_operation( std::get< file_write_operation_t >( wo ) );
1191 }
1192 else
1193 {
1194 assert( std::holds_alternative< none_write_operation_t >( wo ) );
1196 }
1197 }
1198 catch( const std::exception & ex )
1199 {
1201 return fmt::format(
1203 "[connection:{}] handle_current_write_ctx failed: {}" ),
1204 connection_id(),
1205 ex.what() );
1206 } );
1207 }
1208 }
1209
1211 void
1213 {
1214 // Asio buffers (param for async write):
1215 auto & bufs = op.get_trivial_bufs();
1216
1218 {
1219 m_logger.trace( [&]{
1220 return fmt::format(
1222 "[connection:{}] sending resp data with "
1223 "connection-close attribute "
1224 "buf count: {}, "
1225 "total size: {}" ),
1226 connection_id(),
1227 bufs.size(),
1228 op.size() );
1229 } );
1230
1231 // Reading new requests is useless.
1232 asio_ns::error_code ignored_ec;
1233 m_socket.cancel( ignored_ec );
1234 }
1235 else
1236 {
1237 m_logger.trace( [&]{
1238 return fmt::format(
1240 "[connection:{}] sending resp data, "
1241 "buf count: {}, "
1242 "total size: {}" ),
1243 connection_id(),
1244 bufs.size(),
1245 op.size() ); } );
1246 }
1247
1248 // There is somethig to write.
1249 asio_ns::async_write(
1250 m_socket,
1251 bufs,
1252 asio_ns::bind_executor(
1253 this->get_executor(),
1254 [this, ctx = shared_from_this()]
1255 // NOTE: since v.0.6.0 this lambda is noexcept.
1256 ( const asio_ns::error_code & ec, std::size_t written ) noexcept
1257 {
1258 if( !ec )
1259 {
1261 [&]{
1262 return fmt::format(
1264 "[connection:{}] outgoing data was "
1265 "sent: {} bytes" ),
1266 connection_id(),
1267 written );
1268 } );
1269 }
1270
1272 } ) );
1273
1275 }
1276
1278 void
1280 {
1282 {
1283 m_logger.trace( [&]{
1284 return fmt::format(
1286 "[connection:{}] sending resp file data with "
1287 "connection-close attribute, "
1288 "total size: {}" ),
1289 connection_id(),
1290 op.size() );
1291 } );
1292
1293 // Reading new requests is useless.
1294 asio_ns::error_code ignored_ec;
1295 m_socket.cancel( ignored_ec );
1296 }
1297 else
1298 {
1299 m_logger.trace( [&]{
1300 return fmt::format(
1302 "[connection:{}] sending resp file data, total size: {}" ),
1303 connection_id(),
1304 op.size() );
1305 } );
1306 }
1307
1308 guard_sendfile_operation( op.timelimit() );
1309
1310 auto op_ctx = op;
1311
1312 op_ctx.start_sendfile_operation(
1313 this->get_executor(),
1314 m_socket,
1315 asio_ns::bind_executor(
1316 this->get_executor(),
1317 [this, ctx = shared_from_this(),
1318 // Store operation context till the end
1319 op_ctx ]
1320 // NOTE: since v.0.6.0 this lambda is noexcept
1321 (const asio_ns::error_code & ec, file_size_t written ) mutable noexcept
1322 {
1323 // NOTE: op_ctx should be reset just before return from
1324 // that lambda. We can't call reset() until the end of
1325 // the lambda because lambda object itself will be
1326 // destroyed.
1328 [&op_ctx] {
1329 // Reset sendfile operation context.
1331 } );
1332
1333 if( !ec )
1334 {
1336 [&]{
1337 return fmt::format(
1339 "[connection:{}] file data was sent: "
1340 "{} bytes" ),
1341 connection_id(),
1342 written );
1343 } );
1344 }
1345 else
1346 {
1348 [&]{
1349 return fmt::format(
1351 "[connection:{}] send file data error: "
1352 "{} ({}) bytes" ),
1353 connection_id(),
1354 ec.value(),
1355 ec.message() );
1356 } );
1357 }
1358
1360 } ) );
1361 }
1362
1364 void
1366 {
1367 // Finishing writing this group.
1368 m_logger.trace( [&]{
1369 return fmt::format(
1371 "[connection:{}] finishing current write group" ),
1372 this->connection_id() );
1373 } );
1374
1375 // Group notificators are called from here (if exist):
1377
1379 {
1380 m_logger.trace( [&]{
1381 return fmt::format(
1383 "[connection:{}] should keep alive" ),
1384 this->connection_id() );
1385 } );
1386
1389 {
1390 // Run ordinary HTTP logic.
1392 {
1394 }
1395
1396 // Start another write opertion
1397 // if there is something to send.
1399 }
1400 else
1401 {
1403 {
1404 // Here upgrade req is the only request
1405 // to be handled by this connection.
1406 // So it is safe to call a handler for it.
1408 }
1409 else
1410 {
1411 // Do not start reading in any case,
1412 // but if there is at least one request preceding
1413 // upgrade-req, logic must continue http interaction.
1415 }
1416 }
1417 }
1418 else
1419 {
1420 // No keep-alive, close connection.
1421 close();
1422 }
1423 }
1424
1425 void
1427 {
1429 {
1430 // Bufs empty but there happened to
1431 // be a response context marked as complete
1432 // (final_parts) and having connection-close attr.
1433 // It is because `init_write_if_necessary()`
1434 // is called only under `!m_response_coordinator.closed()`
1435 // condition, so if no bufs were obtained
1436 // and response coordinator is closed means
1437 // that a first response stored by
1438 // response coordinator was marked as complete
1439 // without data.
1440
1441 m_logger.trace( [&]{
1442 return fmt::format(
1444 "[connection:{}] last sent response was marked "
1445 "as complete" ),
1446 connection_id() ); } );
1447 close();
1448 }
1449 else
1450 {
1451 // Not writing anything, so need to deal with timouts.
1453 {
1454 // No requests in processing.
1455 // So set read next request timeout.
1457 }
1458 else
1459 {
1460 // Have requests in process.
1461 // So take control over request handling.
1463 }
1464 }
1465 }
1466
1468
1472 void
1473 after_write( const asio_ns::error_code & ec ) noexcept
1474 {
1475 if( !ec )
1476 {
1478 }
1479 else
1480 {
1482 {
1484 return fmt::format(
1486 "[connection:{}] unable to write: {}" ),
1487 connection_id(),
1488 ec.message() );
1489 } );
1490 }
1491 // else: Operation aborted only in case of close was called.
1492
1493 try
1494 {
1496 }
1497 catch( const std::exception & ex )
1498 {
1500 [&]{
1501 return fmt::format(
1503 "[connection:{}] notificator error: {}" ),
1504 connection_id(),
1505 ex.what() );
1506 } );
1507 }
1508 }
1509 }
1510
1513
1515 void
1517 {
1519 [&]{
1520 return fmt::format(
1521 RESTINIO_FMT_FORMAT_STRING( "[connection:{}] close" ),
1522 connection_id() );
1523 } );
1524
1525 // shutdown() and close() should be called regardless of
1526 // possible exceptions.
1528 m_logger,
1529 "connection.socket.shutdown",
1530 [this] {
1531 asio_ns::error_code ignored_ec;
1532 m_socket.shutdown(
1533 asio_ns::ip::tcp::socket::shutdown_both,
1534 ignored_ec );
1535 } );
1537 m_logger,
1538 "connection.socket.close",
1539 [this] {
1540 m_socket.close();
1541 } );
1542
1544 [&]{
1545 return fmt::format(
1547 "[connection:{}] close: close socket" ),
1548 connection_id() );
1549 } );
1550
1551 // Clear stuff.
1553
1555 [&]{
1556 return fmt::format(
1558 "[connection:{}] close: timer canceled" ),
1559 connection_id() );
1560 } );
1561
1563
1565 [&]{
1566 return fmt::format(
1568 "[connection:{}] close: reset responses data" ),
1569 connection_id() );
1570 } );
1571
1572 // Inform state listener if it used.
1573 m_settings->call_state_listener_suppressing_exceptions(
1574 [this]() noexcept {
1576 this->connection_id(),
1577 this->m_remote_endpoint,
1579 };
1580 } );
1581 }
1582
1584
1588 template< typename Message_Builder >
1589 void
1591 {
1592 // An exception from logger/msg_builder shouldn't prevent
1593 // a call to close().
1595 m_logger, std::move(msg_builder) );
1596
1598 }
1600
1603
1606
1609
1612
1615
1616 // Memo flag: whether we need to resume read after this group is written
1618
1621
1624
1626 static connection_t &
1628 {
1629 return static_cast< connection_t & >( base );
1630 }
1631
1634 virtual void
1636 {
1637 asio_ns::dispatch(
1638 this->get_executor(),
1639 [ ctx = std::move( self ) ]
1640 // NOTE: this lambda is noexcept since v.0.6.0.
1641 () noexcept {
1642 auto & conn_object = cast_to_self( *ctx );
1643 // If an exception will be thrown we can only
1644 // close the connection.
1645 try
1646 {
1647 conn_object.check_timeout_impl();
1648 }
1649 catch( const std::exception & x )
1650 {
1651 conn_object.trigger_error_and_close( [&] {
1652 return fmt::format(
1654 "[connection: {}] unexpected "
1655 "error during timeout handling: {}" ),
1656 conn_object.connection_id(),
1657 x.what() );
1658 } );
1659 }
1660 } );
1661 }
1662
1665
1668
1670 std::chrono::steady_clock::time_point m_current_timeout_after;
1675
1677 void
1679 {
1680 if( std::chrono::steady_clock::now() > m_current_timeout_after )
1681 {
1683 (this->*m_current_timeout_cb)();
1684 }
1685 else
1686 {
1688 }
1689 }
1690
1692 void
1694 {
1695 m_timer_guard.schedule( m_prepared_weak_ctx );
1696 }
1697
1699 void
1705
1707 void
1715
1716 void
1718 std::chrono::steady_clock::duration timeout,
1720 {
1722 std::chrono::steady_clock::now() + timeout,
1723 timout_cb );
1724 }
1725
1726 void
1728 {
1729 m_logger.trace( [&]{
1730 return fmt::format(
1731 RESTINIO_FMT_FORMAT_STRING( "[connection:{}] {} timed out" ),
1732 connection_id(),
1734 } );
1735
1736 close();
1737 }
1738
1739 void
1741 {
1742 handle_xxx_timeout( "wait for request" );
1743 }
1744
1746 void
1748 {
1750 {
1752 m_settings->m_read_next_http_message_timelimit,
1754 }
1755 }
1756
1757 void
1759 {
1760 handle_xxx_timeout( "handle request" );
1761 }
1762
1764 void
1774
1775 void
1777 {
1778 handle_xxx_timeout( "writing response" );
1779 }
1780
1782 void
1789
1790 void
1792 {
1793 handle_xxx_timeout( "writing response (sendfile)" );
1794 }
1795
1796 void
1797 guard_sendfile_operation( std::chrono::steady_clock::duration timelimit )
1798 {
1799 if( std::chrono::steady_clock::duration::zero() == timelimit )
1800 timelimit = m_settings->m_write_http_response_timelimit;
1801
1803 timelimit,
1805 }
1807
1810
1813
1823};
1824
1825//
1826// connection_factory_t
1827//
1828
1830template < typename Traits >
1832{
1833 public:
1834 using logger_t = typename Traits::logger_t;
1835 using stream_socket_t = typename Traits::stream_socket_t;
1838
1841 std::unique_ptr< socket_options_setter_t > socket_options_setter )
1843 , m_socket_options_setter{ std::move( socket_options_setter ) }
1845 {}
1846
1847 // NOTE: since v.0.6.3 it returns non-empty
1848 // shared_ptr<connection_t<Traits>> or an exception is thrown in
1849 // the case of an error.
1850 // NOTE: since v.0.6.12 it accepts yet another parameter: lifetime_monitor.
1851 auto
1853 stream_socket_t socket,
1854 endpoint_t remote_endpoint,
1856 {
1858
1859 {
1860 socket_options_t options{ socket.lowest_layer() };
1861 (*m_socket_options_setter)( options );
1862 }
1863
1864 return std::make_shared< connection_type_t >(
1866 std::move( socket ),
1868 std::move( remote_endpoint ),
1869 std::move( lifetime_monitor ) );
1870 }
1871
1872 private:
1874
1876
1877 std::unique_ptr< socket_options_setter_t > m_socket_options_setter;
1878
1880};
1881
1882} /* namespace impl */
1883
1884} /* namespace restinio */
A simple implementation of at_scope_exit concept.
Helper type for controlling the lifetime of the connection.
Type of object that tells that new connection has been accepted.
Type of object that tells that the connection has been closed.
An object with info about connection to be passed to state listener.
auto fields_count() const noexcept
std::unique_ptr< socket_options_setter_t > m_socket_options_setter
connection_factory_t(connection_settings_handle_t< Traits > connection_settings, std::unique_ptr< socket_options_setter_t > socket_options_setter)
typename Traits::logger_t logger_t
connection_settings_handle_t< Traits > m_connection_settings
auto create_new_connection(stream_socket_t socket, endpoint_t remote_endpoint, lifetime_monitor_t lifetime_monitor)
typename Traits::stream_socket_t stream_socket_t
typename connection_count_limit_types< Traits >::lifetime_monitor_t lifetime_monitor_t
Context for handling http connections.
virtual void check_timeout(tcp_connection_ctx_handle_t &self) override
Schedules real timedout operations check on the executer of a connection.
connection_t & operator=(const connection_t &)=delete
typename Traits::strand_t strand_t
void handle_upgrade_request()
Calls handler for upgrade request.
typename connection_count_limit_types< Traits >::lifetime_monitor_t lifetime_monitor_t
tcp_connection_ctx_weak_handle_t m_prepared_weak_ctx
A prepared weak handle for passing it to timer guard.
void on_request_message_complete()
Handle a given request message.
virtual void write_response_parts(request_id_t request_id, response_output_flags_t response_output_flags, write_group_t wg) override
Write parts for specified request.
void handle_xxx_timeout(const char *operation_name)
void guard_write_operation()
Start guard write operation if necessary.
void finish_handling_current_write_ctx()
Do post write actions for current write group.
const endpoint_t m_remote_endpoint
Remote endpoint for this connection.
lifetime_monitor_t m_lifetime_monitor
Monitor of the connection lifetime.
stream_socket_t m_socket
Connection.
connection_input_t m_input
Input routine.
void schedule_operation_timeout_callback(std::chrono::steady_clock::time_point timeout_after, timout_cb_t timout_cb)
Helper function to work with timer guard.
void guard_sendfile_operation(std::chrono::steady_clock::duration timelimit)
request_handler_t & m_request_handler
Request handler.
void handle_trivial_write_operation(const trivial_write_operation_t &op)
Run trivial buffers write operation.
void handle_file_write_operation(file_write_operation_t &op)
Run sendfile write operation.
timout_cb_t m_current_timeout_cb
Callback to all if timeout happened.
void wait_for_http_message()
Start reading next htttp-message.
void after_read(const asio_ns::error_code &ec, std::size_t length) noexcept
Handle read operation result.
logger_t & m_logger
Logger for operation.
std::chrono::steady_clock::time_point m_current_timeout_after
Timeout point of a current guarded operation.
void init_write()
Initiate write operation.
typename Traits::timer_manager_t timer_manager_t
upgrade_internals_t move_upgrade_internals()
Move socket out of connection.
void cancel_timeout_checking() noexcept
Stop timout guarding.
static connection_t & cast_to_self(tcp_connection_ctx_base_t &base)
Timer to controll operations.
connection_t(const connection_t &)=delete
void handle_current_write_ctx() noexcept
Start/continue/continue handling output data of current write group.
void(connection_t::*)(void) timout_cb_t
Callback type for timedout operations.
connection_t(connection_id_t conn_id, stream_socket_t &&socket, connection_settings_handle_t< Traits > settings, endpoint_t remote_endpoint, lifetime_monitor_t lifetime_monitor)
void close() noexcept
Close connection functions.
typename Traits::logger_t logger_t
void after_write(const asio_ns::error_code &ec) noexcept
Handle write response finished.
void trigger_error_and_close(Message_Builder msg_builder) noexcept
Trigger an error.
response_coordinator_t m_response_coordinator
Response coordinator.
void guard_request_handling_operation()
Start guard request handling operation if necessary.
typename timer_manager_t::timer_guard_t timer_guard_t
request_handler_type_from_traits_t< Traits > request_handler_t
void init_next_timeout_checking()
Schedule next timeout checking.
void consume_data(const char *data, std::size_t length)
Parse some data.
void consume_message()
Start (continue) a chain of read-parse-read-... operations.
timer_guard_t m_timer_guard
Timer guard.
typename Traits::stream_socket_t stream_socket_t
void check_timeout_impl()
Check timed out operation.
connection_settings_handle_t< Traits > m_settings
Common paramaters of a connection.
void guard_read_operation()
Statr guard read operation if necessary.
void write_response_parts_impl(request_id_t request_id, response_output_flags_t response_output_flags, write_group_t wg)
Write parts for specified request.
connection_t(connection_t &&)=delete
void schedule_operation_timeout_callback(std::chrono::steady_clock::duration timeout, timout_cb_t timout_cb)
write_group_output_ctx_t m_write_output_ctx
Write to socket operation context.
Wrapper for an executor (strand) used by connections.
Traits::strand_t & get_executor() noexcept
An executor for callbacks on async operations.
Helper class for reading bytes and feeding them to parser.
std::size_t length() const noexcept
How many unconsumed bytes are there in buffer.
void consumed_bytes(std::size_t length) noexcept
Mark how many bytes were obtained.
void obtained_bytes(std::size_t length) noexcept
Mark how many bytes were obtained.
auto make_asio_buffer() noexcept
Make asio buffer for reading bytes from socket.
const char * bytes() const noexcept
Get pointer to unconsumed bytes.
Coordinator for process of sending responses with respect to http pipeline technique and chunk transf...
void reset() noexcept
Remove all contexts.
request_id_t register_new_request()
Create a new request and reserve context for its response.
bool is_able_to_get_more_messages() const noexcept
Check if it is possible to accept more requests.
std::optional< std::pair< write_group_t, request_id_t > > pop_ready_buffers()
Extract a portion of data available for write.
void append_response(request_id_t req_id, response_output_flags_t response_output_flags, write_group_t wg)
Add outgoing data for specified request.
Socket adapter for asio::ssl::stream< asio::ip::tcp::socket >.
Helper class for writting response data.
void fail_write_group(const asio_ns::error_code &ec)
Handle current group write process failed.
solid_write_operation_variant_t extract_next_write_operation()
et an object with next write operation to perform.
void start_next_write_group(std::optional< write_group_t > next_wg) noexcept
Start handlong next write group.
void finish_write_group()
Finish writing group normally.
bool transmitting() const noexcept
Check if data is trunsmitting now.
A type of holder of limits related to an incoming HTTP message.
An adapter for setting acceptor options before running server.
Definition settings.hpp:248
connection_id_t connection_id() const noexcept
Get connection id.
Group of writable items transported to the context of underlying connection as one solid piece.
Definition buffers.hpp:727
#define RESTINIO_ENSURE_NOEXCEPT_CALL(expr)
A wrapper around static_assert for checking that an expression is noexcept and execution of that expr...
Stuff related to limits of active parallel connections.
A special wrapper around fmtlib include files.
#define RESTINIO_FMT_FORMAT_STRING(s)
decltype(auto) streamed(T &&v) noexcept
llhttp_settings_t create_parser_settings() noexcept
Include parser callbacks.
tls_socket_t * make_tls_socket_pointer_for_state_listener(asio_ns::ip::tcp::socket &) noexcept
auto create_not_implemented_resp()
void prepare_connection_and_start_read(asio_ns::ip::tcp::socket &, Connection &, Start_Read_CB start_read_cb, Failed_CB)
connection_upgrade_stage_t
Enum for a flag specifying that connection is going to upgrade or not.
@ wait_for_upgrade_handling_result_or_nothing
Handler for request with connection-upgrade header was called so any response data comming is for tha...
@ none
No connection request in progress.
@ pending_upgrade_handling
Request with connection-upgrade header came and waits for request handler to be called in non pipelin...
std::shared_ptr< connection_settings_t< Traits > > connection_settings_handle_t
void suppress_exceptions(Logger &&logger, const char *block_description, Lambda &&lambda) noexcept
Helper function for execution a block of code with suppression of any exceptions raised inside that b...
scope_exit_details::at_exit_t< L > at_scope_exit(L &&l)
Helper function for creation action to be performed at scope exit.
void log_error_noexcept(Logger &&logger, Message_Builder &&builder) noexcept
void log_trace_noexcept(Logger &&logger, Message_Builder &&builder) noexcept
asio_ns::ip::tcp::endpoint endpoint_t
An alias for endpoint type from Asio.
typename details::actual_request_handler_type_detector< typename Traits::request_handler_t, typename Traits::extra_data_factory_t >::request_handler_t request_handler_type_from_traits_t
A metafunction for extraction a request-handler type from server's traits.
Definition traits.hpp:375
unsigned int request_id_t
Request id in scope of single connection.
run_on_this_thread_settings_t< Traits > on_this_thread()
A special marker for the case when http_server must be run on the context of the current thread.
std::weak_ptr< tcp_connection_ctx_base_t > tcp_connection_ctx_weak_handle_t
Alias for http connection weak handle.
std::string_view string_view_t
@ trivial_write_operation
Item is a buffer and must be written trivially.
bool error_is_operation_aborted(const asio_ns::error_code &ec) noexcept
std::unique_ptr< chunk_ext_params_t > chunk_ext_params_unique_ptr_t
@ accepted
Request accepted for handling.
@ not_handled
The request wasn't handled. If there is another handler to be tried it should be tried....
@ rejected
Request wasn't accepted for handling.
@ write_was_not_executed
After write notificator error: data was not sent, connection closed (or aborted) before a given piece...
std::unique_ptr< chunked_input_info_t > chunked_input_info_unique_ptr_t
Alias of unique_ptr for chunked_input_info.
bool error_is_eof(const asio_ns::error_code &ec) noexcept
std::uint64_t connection_id_t
Type for ID of connection.
@ connection_close
This response says to close connection.
asio_ns::error_code make_asio_compaible_error(asio_convertible_error_t err) noexcept
Make restinio error_code compatible with asio_ns::error_code.
std::uint64_t file_size_t
std::shared_ptr< tcp_connection_ctx_base_t > tcp_connection_ctx_handle_t
Alias for http connection handle.
@ final_parts
Final parts (response ands with these parts).
int restinio_header_field_complete_cb(llhttp_t *parser)
int restinio_body_cb(llhttp_t *parser, const char *at, size_t length)
int restinio_url_cb(llhttp_t *parser, const char *at, size_t length)
int restinio_header_value_cb(llhttp_t *parser, const char *at, size_t length)
int restinio_chunk_extension_name_complete_cb(llhttp_t *parser)
int restinio_chunk_extension_name_cb(llhttp_t *parser, const char *at, size_t length)
int restinio_chunk_extension_value_cb(llhttp_t *parser, const char *at, size_t length)
int restinio_chunk_complete_cb(llhttp_t *)
int restinio_chunk_header_cb(llhttp_t *parser)
int restinio_header_value_complete_cb(llhttp_t *parser)
int restinio_chunk_extension_value_complete_cb(llhttp_t *)
int restinio_headers_complete_cb(llhttp_t *parser)
int restinio_header_field_cb(llhttp_t *parser, const char *at, size_t length)
Helpers for safe truncation of unsigned integers.
Bunch of data related to chunked input.
std::vector< chunk_info_t > m_chunks
All non-empty chunks from the input.
http_header_fields_t m_trailing_fields
Trailing fields found in the input.
Data associated with connection read routine.
connection_input_t(std::size_t buffer_size, incoming_http_msg_limits_t limits, const llhttp_settings_t *settings)
fixed_buffer_t m_buf
Input buffer.
bool m_read_operation_is_running
Flag to track whether read operation is performed now.
void reset_parser()
Prepare parser for reading new http-message.
connection_upgrade_stage_t m_connection_upgrade_stage
Connection upgrade request stage.
Internals that are necessary for upgrade.
connection_settings_handle_t< Traits > m_settings
upgrade_internals_t(connection_settings_handle_t< Traits > settings, stream_socket_t socket, lifetime_monitor_t lifetime_monitor)
upgrade_internals_t(upgrade_internals_t &&)=default
Parsing result context for using in parser callbacks.
void reset()
Prepare context to handle new request.
chunked_input_info_unique_ptr_t make_chunked_input_info_if_necessary()
Creates an instance of chunked_input_info if there is an info about chunks in the body.
chunk_ext_params_unique_ptr_t m_chunk_ext_params
Chunk extnsion's params if any.
http_request_header_t m_header
Request data.
std::size_t m_total_field_count
Total number of parsed HTTP-fields.
bool m_message_complete
Flag: is http message parsed completely.
const incoming_http_msg_limits_t m_limits
Limits for the incoming message.
chunked_input_info_block_t m_chunked_info_block
std::string m_current_field_name
Parser context temp values and flags.
http_parser_ctx_t(incoming_http_msg_limits_t limits)
The main constructor.
std::size_t m_bytes_parsed
How many bytes were parsed for current request.
Response output flags for buffers commited to response-coordinator.