RESTinio
Loading...
Searching...
No Matches
ws_connection.hpp
Go to the documentation of this file.
1/*
2 restinio
3*/
4
9#pragma once
10
11#include <queue>
12
14
15#include <llhttp.h>
16
18
19#include <restinio/core.hpp>
25
27
29
30namespace restinio
31{
32
33namespace websocket
34{
35
36namespace basic
37{
38
39namespace impl
40{
41
42using write_groups_queue_t = std::queue< write_group_t >;
43
45constexpr size_t
47{
48 return 14;
49}
50
51//
52// ws_outgoing_data_t
53//
54
57{
58 public:
60 void
62 {
63 m_awaiting_write_groups.emplace( std::move( wg ) );
64 }
65
66 std::optional< write_group_t >
68 {
69 std::optional< write_group_t > result;
70
71 if( !m_awaiting_write_groups.empty() )
72 {
73 result = std::move( m_awaiting_write_groups.front() );
75 }
76
77 return result;
78 }
79
80 private:
83};
84
85//
86// connection_input_t
87//
88
91{
92 connection_input_t( std::size_t buffer_size )
93 : m_buf{ buffer_size }
94 {}
95
98
101
103 std::string m_payload;
104
106 void
108 {
109 m_parser.reset();
110 m_payload.clear();
111 }
112};
113
114//
115// ws_connection_t
116//
117
119template <
120 typename Traits,
121 typename WS_Message_Handler >
123 : public ws_connection_base_t
124 , public restinio::impl::executor_wrapper_t< typename Traits::strand_t >
125{
127
128 public:
130
131 using timer_manager_t = typename Traits::timer_manager_t;
132 using timer_manager_handle_t = std::shared_ptr< timer_manager_t >;
133 using timer_guard_t = typename timer_manager_t::timer_guard_t;
134 using logger_t = typename Traits::logger_t;
135 using strand_t = typename Traits::strand_t;
136 using stream_socket_t = typename Traits::stream_socket_t;
139
140 using ws_weak_handle_t = std::weak_ptr< ws_t >;
141
148 stream_socket_t socket,
154 , m_settings{ std::move( settings ) }
155 , m_socket{ std::move( socket ) }
157 , m_timer_guard{ m_settings->create_timer_guard() }
159 , m_msg_handler{ std::move( msg_handler ) }
160 , m_logger{ *( m_settings->m_logger ) }
161 {
162 // Notify of a new connection instance.
163 m_logger.trace( [&]{
164 return fmt::format(
166 "[connection:{}] move socket to [ws_connection:{}]" ),
168 connection_id() );
169 } );
170
171 m_logger.trace( [&]{
172 return fmt::format(
174 "[ws_connection:{}] start connection with {}" ),
176 fmtlib_tools::streamed( m_socket.remote_endpoint() ) );
177 } );
178
179 // Inform state listener if it used.
180 m_settings->call_state_listener( [this]() noexcept {
185 };
186 } );
187 }
188
189 ws_connection_t( const ws_connection_t & ) = delete;
193
195 {
196 try
197 {
198 // Notify of a new connection instance.
199 m_logger.trace( [&]{
200 return fmt::format(
202 "[ws_connection:{}] destructor called" ),
203 connection_id() );
204 } );
205 }
206 catch( ... )
207 {}
208 }
209
211 virtual void
212 shutdown() override
213 {
214 asio_ns::dispatch(
215 this->get_executor(),
216 [ this, ctx = shared_from_this() ]
217 // NOTE: this lambda is noexcept since v.0.6.0.
218 () noexcept {
219 try
220 {
221 // An exception from logger shouldn't prevent
222 // main shutdown actions.
224 [&]{
225 return fmt::format(
227 "[ws_connection:{}] shutdown" ),
228 connection_id() );
229 } );
230
233 }
234 catch( const std::exception & ex )
235 {
237 [&]{
238 return fmt::format(
240 "[ws_connection:{}] shutdown operation error: {}" ),
242 ex.what() );
243 } );
244 }
245 } );
246 }
247
249 virtual void
250 kill() override
251 {
252 asio_ns::dispatch(
253 this->get_executor(),
254 [ this, ctx = shared_from_this() ]
255 // NOTE: this lambda is noexcept since v.0.6.0.
256 () noexcept
257 {
258 try
259 {
260 // An exception from logger shouldn't prevent
261 // main kill actions.
263 [&]{
264 return fmt::format(
266 "[ws_connection:{}] kill" ),
267 connection_id() );
268 } );
269
272
273 close_impl();
274 }
275 catch( const std::exception & ex )
276 {
278 [&]{
279 return fmt::format(
281 "[ws_connection:{}] kill operation error: {}" ),
283 ex.what() );
284 } );
285 }
286 } );
287 }
288
290 void
292 {
294
295 // Run write message on io_context loop (direct invocation if possible).
296 asio_ns::dispatch(
297 this->get_executor(),
298 [ this, ctx = shared_from_this(), wswh = std::move( wswh ) ]
299 // NOTE: this lambda is noexcept since v.0.6.0.
300 () noexcept
301 {
302 try
303 {
304 // Start timeout checking.
307
308 m_websocket_weak_handle = std::move( wswh );
311 }
312 catch( const std::exception & ex )
313 {
316 [&]{
317 return fmt::format(
319 "[ws_connection:{}] unable to init read: {}" ),
321 ex.what() );
322 } );
323 }
324 } );
325 }
326
328 virtual void
331 bool is_close_frame ) override
332 {
334 asio_ns::dispatch(
335 this->get_executor(),
336 [ this,
337 actual_wg = std::move( wg ),
340 // NOTE: this lambda is noexcept since v.0.6.0.
341 () mutable noexcept
342 {
343 try
344 {
347 std::move( actual_wg ),
349 else
350 {
351 m_logger.warn( [&]{
352 return fmt::format(
354 "[ws_connection:{}] cannot write to websocket: "
355 "write operations disabled" ),
356 connection_id() );
357 } );
358 }
359 }
360 catch( const std::exception & ex )
361 {
364 [&]{
365 return fmt::format(
367 "[ws_connection:{}] unable to write data: {}" ),
369 ex.what() );
370 } );
371 }
372 } );
373 }
374 private:
376
380 void
382 {
384 [&]() noexcept {
386 [&]{
387 return fmt::format(
389 "[ws_connection:{}] close socket" ),
390 connection_id() );
391 } );
392
393 // This actions can throw and because of that we have
394 // to wrap them...
396 m_logger,
397 "ws_connection.close_impl.socket.shutdown",
398 [&] {
399 asio_ns::error_code ignored_ec;
400 m_socket.shutdown(
401 asio_ns::ip::tcp::socket::shutdown_both,
402 ignored_ec );
403 } );
404
406 m_logger,
407 "ws_connection.close_impl.socket.close",
408 [&] {
409 m_socket.close();
410 } );
411 } );
412 }
413
415 void
421
423 void
432
434 void
435 send_close_frame_to_peer( std::string payload )
436 {
438 bufs.reserve( 2 );
439
440 bufs.emplace_back(
443 opcode_t::connection_close_frame,
444 payload.size() ) );
445
446 bufs.emplace_back( std::move( payload ) );
447 m_outgoing_data.append( write_group_t{ std::move( bufs ) } );
448
450
451 // No more data must be written.
453 }
454
456 void
459 std::string desc = std::string{} )
460 {
462 }
463
465
473 template< typename MSG_BUILDER >
474 void
477 MSG_BUILDER msg_builder ) noexcept
478 {
479 // An exception in logger shouldn't prevent the main actions.
481 m_logger, std::move( msg_builder ) );
482
483 // This can throw but we have to suppress any exceptions.
485 m_logger, "ws_connection.call_close_handler_if_necessary",
486 [this, status] {
488 } );
489
491 }
492
493
495 void
497 {
498 m_logger.trace( [&]{
499 return fmt::format(
501 "[ws_connection:{}] start reading header" ),
502 connection_id() );
503 } );
504
505 // Prepare parser for consuming new message.
507
508 if( 0 == m_input.m_buf.length() )
509 {
511 }
512 else
513 {
514 // Has something to read from m_input.m_buf.
517 }
518 }
519
521 void
523 {
524 m_logger.trace( [&]{
525 return fmt::format(
527 "[ws_connection:{}] continue reading message" ),
528 connection_id() );
529 } );
530
531 m_socket.async_read_some(
533 asio_ns::bind_executor(
534 this->get_executor(),
535 [ this, ctx = shared_from_this() ]
536 // NOTE: this lambda is noexcept since v.0.6.0.
537 ( const asio_ns::error_code & ec, std::size_t length ) noexcept
538 {
539 try
540 {
541 after_read_header( ec, length );
542 }
543 catch( const std::exception & ex )
544 {
547 [&]{
548 return fmt::format(
550 "[ws_connection:{}] after read header "
551 "callback error: {}" ),
553 ex.what() );
554 } );
555 }
556 } ) );
557 }
558
560 void
561 handle_read_error( const char * desc, const asio_ns::error_code & ec )
562 {
563 // Assume that connection is lost.
566 [&]{
567 return fmt::format(
568 RESTINIO_FMT_FORMAT_STRING( "[ws_connection:{}] {}: {}" ),
570 desc,
571 ec.message() );
572 } );
573 }
574
576 void
578 const asio_ns::error_code & ec,
579 std::size_t length )
580 {
581 if( !ec )
582 {
583 m_logger.trace( [&]{
584 return fmt::format(
586 "[ws_connection:{}] received {} bytes" ),
587 this->connection_id(),
588 length );
589 } );
590
591 m_input.m_buf.obtained_bytes( length );
593 }
594 else
595 {
596 handle_read_error( "reading message header error", ec );
597 }
598 }
599
601 void
602 consume_header_from_buffer( const char * data, std::size_t length )
603 {
604 const auto nparsed = m_input.m_parser.parser_execute( data, length );
605
607
609 {
611 }
612 else
613 {
614 assert( nparsed == length );
616 }
617 }
618
620 void
622 {
623 m_logger.trace( [&]{
624 return fmt::format(
626 "[ws_connection:{}] start handling {} ({:#x})" ),
628 opcode_to_string( md.m_opcode ),
629 static_cast<std::uint16_t>(md.m_opcode) );
630 } );
631
632 const auto validation_result =
634
636 {
637 m_logger.error( [&]{
638 return fmt::format(
640 "[ws_connection:{}] invalid header" ),
641 connection_id() );
642 } );
643
645 {
647 [&]{
649 // Do not wait anything in return, because
650 // protocol is violated.
651 } );
652
654 }
656 {
657 // Wait for close frame cannot be done.
658 close_impl();
659 }
660
661 return;
662 }
663
665 }
666
668 void
670 {
671 const auto payload_length =
673
675
676 if( payload_length == 0 )
677 {
678 // Callback for message with 0-size payload.
680 }
681 else
682 {
683 const auto payload_part_size =
684 std::min( m_input.m_buf.length(), payload_length );
685
686 std::memcpy(
687 &m_input.m_payload.front(),
690
692
693 const std::size_t length_remaining =
695
697 &m_input.m_payload.front(),
700 {
701 if( 0 == length_remaining )
702 {
703 // All message is obtained.
705 }
706 else
707 {
708 // Read the rest of payload:
712 }
713 }
714 // Else payload is invalid and validate_payload_part()
715 // has handled the case so do nothing.
716 }
717 }
718
720 void
723 char * payload_data,
725 std::size_t length_remaining,
728 {
729 m_socket.async_read_some(
730 asio_ns::buffer( payload_data, length_remaining ),
731 asio_ns::bind_executor(
732 this->get_executor(),
733 [ this,
735 payload_data,
738 // NOTE: this lambda is noexcept since v.0.6.0.
739 ( const asio_ns::error_code & ec, std::size_t length ) noexcept
740 {
741 try
742 {
746 ec,
747 length,
749 }
750 catch( const std::exception & ex )
751 {
754 [&]{
755 return fmt::format(
757 "[ws_connection:{}] after read payload "
758 "callback error: {}" ),
760 ex.what() );
761 } );
762 }
763 } ) );
764 }
765
767 void
769 char * payload_data,
770 std::size_t length_remaining,
771 const asio_ns::error_code & ec,
772 std::size_t length,
774 {
775 if( !ec )
776 {
777 m_logger.trace( [&]{
778 return fmt::format(
780 "[ws_connection:{}] received {} bytes" ),
781 this->connection_id(),
782 length );
783 } );
784
785 assert( length <= length_remaining );
786
787 const std::size_t next_length_remaining =
788 length_remaining - length;
789
791 {
793 {
794 if( 0 == next_length_remaining )
795 {
796 // Here: all the payload is ready.
797
798 // All message is obtained.
800 }
801 else
802 {
803 //Here: not all payload is obtained,
804 // so inintiate read once again:
806 payload_data + length,
809 }
810 }
811 // Else payload is invalid and validate_payload_part()
812 // has handled the case so do nothing.
813 }
814 else
815 {
816 if( 0 == next_length_remaining )
817 {
819 }
820 else
821 {
823 payload_data + length,
824 length_remaining - length,
826 }
827 }
828 }
829 else
830 {
831 handle_read_error( "reading message payload error", ec );
832 }
833 }
834
836 void
838 {
839 if( auto wsh = m_websocket_weak_handle.lock() )
840 {
841 try
842 {
844 std::move( wsh ),
845 std::move( close_frame ) );
846 }
847 catch( const std::exception & ex )
848 {
849 m_logger.error( [&]{
850 return fmt::format(
852 "[ws_connection:{}] execute handler error: {}" ),
854 ex.what() );
855 } );
856 }
857 }
858 }
859
861 bool
863 char * payload_data,
864 std::size_t length,
865 std::size_t next_length_remaining )
866 {
867 const auto validation_result =
869
871 {
873
875 {
876 // Can skip this payload because it was not a bad close frame.
877
878 // It is the case we are expecting close frame
879 // so validator must be ready to receive more headers
880 // and payloads after this frame.
882
883 if( 0 == next_length_remaining )
884 {
886 }
887 else
888 {
889 // Skip checking payload for this frame:
892 payload_data + length,
895 }
896 }
897 return false;
898 }
899
900 return true;
901 }
902
904 void
906 {
907 m_logger.error( [&]{
908 return fmt::format(
910 "[ws_connection:{}] invalid paload" ),
911 connection_id() );
912 } );
913
915 {
916 // A corner case: invalid payload in close frame.
917
919 {
920 // Case: close frame was not expected.
921
922 // This actually must be executed:
924 [&]{
926 // Do not wait anything in return, because
927 // protocol is violated.
928 } );
929
930 // Notify user of a close but use a correct close code.
932 }
934 {
935 // Case: close frame was expected.
936
937 // We got a close frame but it is incorrect,
938 // so just close (there is not too much we can do).
939 close_impl();
940 }
941 }
942 else
943 {
945 {
947 [&]{
950 } );
951
953 }
954 }
955 }
956
957 void
959 {
961
964 {
966 {
967 if( opcode_t::connection_close_frame == md.m_opcode )
968 {
969 m_logger.trace( [&]{
970 return fmt::format(
972 "[ws_connection:{}] got close frame from "
973 "peer, status: {}" ),
975 static_cast<std::uint16_t>(
977 } );
978
981 [&]{
983 } );
984
986 }
987
989 std::make_shared< message_t >(
990 md.m_final_flag ? final_frame : not_final_frame,
991 md.m_opcode,
992 std::move( m_input.m_payload ) ) );
993
996 }
997 else
998 {
1000
1001 if( opcode_t::connection_close_frame == md.m_opcode )
1002 {
1003 // Got it!
1004 m_timer_guard.cancel();
1005
1006 close_impl();
1007
1008 m_logger.trace( [&]{
1009 return fmt::format(
1011 "[ws_connection:{}] expected close frame came" ),
1012 connection_id() );
1013 } );
1014 }
1015 else
1016 {
1017 // Wait for next frame.
1019 }
1020 }
1021 }
1022 else
1023 {
1025 }
1026 }
1027
1028 void
1030 {
1032 [&]{
1034 std::make_shared< message_t >(
1036 opcode_t::connection_close_frame,
1038 } );
1039 }
1040
1042 void
1044 {
1045 if( m_socket.is_open() )
1046 {
1047 if( is_close_frame )
1048 {
1049 m_logger.trace( [&]{
1050 return fmt::format(
1052 "[ws_connection:{}] user sends close frame" ),
1053 connection_id() );
1054 } );
1055
1056 m_close_frame_to_peer.disable(); // It is formed and sent by user
1057 m_close_frame_to_user.disable(); // And user knows that websocket is closed.
1058 // No more writes.
1060
1061 // Start waiting only close-frame.
1063 }
1064
1065 // Push write_group to queue.
1066 m_outgoing_data.append( std::move( wg ) );
1067
1069 }
1070 else
1071 {
1072 m_logger.warn( [&]{
1073 return fmt::format(
1075 "[ws_connection:{}] try to write while "
1076 "socket is closed" ),
1077 connection_id() );
1078 } );
1079
1080 try
1081 {
1082 wg.invoke_after_write_notificator_if_exists(
1085 }
1086 catch( ... )
1087 {}
1088 }
1089 }
1090
1093 void
1095 {
1097 {
1098 init_write();
1099 }
1100 }
1101
1103 void
1105 {
1106 // Here: not writing anything to socket, so
1107 // write operation can be initiated.
1109
1110 if( next_write_group )
1111 {
1112 m_logger.trace( [&]{
1113 return fmt::format(
1115 "[ws_connection:{}] start next write group, "
1116 "size: {}" ),
1117 this->connection_id(),
1118 next_write_group->items_count() );
1119 } );
1120
1121 // Initialize write context with a new write group.
1123 std::move( next_write_group ) );
1124
1125 // Start the loop of sending data from current write group.
1127 }
1128 }
1129
1130 // Use aliases for shorter names.
1134
1135 void
1137 {
1138 try
1139 {
1141
1142 if( std::holds_alternative< trivial_write_operation_t >( wo ) )
1143 {
1144 handle_trivial_write_operation( std::get< trivial_write_operation_t >( wo ) );
1145 }
1146 else if( std::holds_alternative< none_write_operation_t >( wo ) )
1147 {
1149 }
1150 else
1151 {
1152 assert( std::holds_alternative< file_write_operation_t >( wo ) );
1153 throw exception_t{ "sendfile write operation not implemented" };
1154 }
1155 }
1156 catch( const std::exception & ex )
1157 {
1160 [&]{
1161 return fmt::format(
1163 "[ws_connection:{}] handle_current_write_ctx failed: {}" ),
1164 connection_id(),
1165 ex.what() );
1166 } );
1167 }
1168 }
1169
1170 void
1172 {
1173 // Asio buffers (param for async write):
1174 auto & bufs = op.get_trivial_bufs();
1175
1176 m_logger.trace( [&]{
1177 return fmt::format(
1179 "[ws_connection:{}] sending data with "
1180 "buf count: {}, "
1181 "total size: {}" ),
1182 connection_id(),
1183 bufs.size(),
1184 op.size() ); } );
1185
1187
1188 // There is somethig to write.
1189 asio_ns::async_write(
1190 m_socket,
1191 bufs,
1192 asio_ns::bind_executor(
1193 this->get_executor(),
1194 [ this,
1195 ctx = shared_from_this() ]
1196 // NOTE: this lambda is noexcept since v.0.6.0.
1197 ( const asio_ns::error_code & ec, std::size_t written ) noexcept
1198 {
1199 try
1200 {
1201 if( !ec )
1202 {
1203 m_logger.trace( [&]{
1204 return fmt::format(
1206 "[ws_connection:{}] outgoing data was "
1207 "sent: {} bytes" ),
1208 connection_id(),
1209 written );
1210 } );
1211 }
1212
1213 after_write( ec );
1214 }
1215 catch( const std::exception & ex )
1216 {
1219 [&]{
1220 return fmt::format(
1222 "[ws_connection:{}] after write "
1223 "callback error: {}" ),
1224 connection_id(),
1225 ex.what() );
1226 } );
1227 }
1228 } ) );
1229 }
1230
1232 void
1234 {
1235 // Finishing writing this group.
1236 m_logger.trace( [&]{
1237 return fmt::format(
1239 "[ws_connection:{}] finishing current write group" ),
1240 this->connection_id() );
1241 } );
1242
1243 // Group notificators are called from here (if exist):
1245
1246 // Start another write opertion
1247 // if there is something to send.
1249 }
1250
1252 void
1253 after_write( const asio_ns::error_code & ec )
1254 {
1255 if( !ec )
1256 {
1258 }
1259 else
1260 {
1263 [&]{
1264 return fmt::format(
1266 "[ws_connection:{}] unable to write: {}" ),
1267 connection_id(),
1268 ec.message() );
1269 } );
1270
1271 try
1272 {
1274 }
1275 catch( const std::exception & ex )
1276 {
1277 m_logger.error( [&]{
1278 return fmt::format(
1280 "[ws_connection:{}] notificator error: {}" ),
1281 connection_id(),
1282 ex.what() );
1283 } );
1284 }
1285 }
1286 }
1287
1290
1293
1300
1303 static ws_connection_t &
1305 {
1306 return static_cast< ws_connection_t & >( base );
1307 }
1308
1309 virtual void
1311 {
1312 asio_ns::dispatch(
1313 this->get_executor(),
1314 [ ctx = std::move( self ) ]
1315 // NOTE: this lambda is noexcept since v.0.6.0.
1316 () noexcept
1317 {
1318 auto & conn_object = cast_to_self( *ctx );
1319 // If an exception will be thrown we can only
1320 // close the connection.
1321 try
1322 {
1323 conn_object.check_timeout_impl();
1324 }
1325 catch( const std::exception & x )
1326 {
1327 conn_object.trigger_error_and_close(
1329 [&] {
1330 return fmt::format(
1332 "[connection: {}] unexpected "
1333 "error during timeout handling: {}" ),
1334 conn_object.connection_id(),
1335 x.what() );
1336 } );
1337 }
1338 } );
1339 }
1340
1341 std::chrono::steady_clock::time_point m_write_operation_timeout_after;
1342 std::chrono::steady_clock::time_point m_close_frame_from_peer_timeout_after =
1343 std::chrono::steady_clock::time_point::max();
1346
1347 void
1349 {
1350 const auto now = std::chrono::steady_clock::now();
1352 {
1353 m_logger.trace( [&]{
1354 return fmt::format(
1356 "[wd_connection:{}] write operation timed out" ),
1357 connection_id() );
1358 } );
1361 close_impl();
1362 }
1364 {
1365 m_logger.trace( [&]{
1366 return fmt::format(
1368 "[wd_connection:{}] waiting for close-frame "
1369 "from peer timed out" ),
1370 connection_id() );
1371 } );
1372 close_impl();
1373 }
1374 else
1375 {
1377 }
1378 }
1379
1381 void
1386
1388 void
1390 {
1392 std::chrono::steady_clock::now() + m_settings->m_write_http_response_timelimit;
1393 }
1394
1395 void
1397 {
1399 std::chrono::steady_clock::now() + m_settings->m_read_next_http_message_timelimit;
1400 }
1402
1405
1408
1411
1414
1417
1420
1423
1425 enum class write_state_t
1426 {
1431 };
1432
1435
1437 enum class read_state_t
1438 {
1445 };
1446
1449
1453 {
1454 public:
1455 template < typename Action >
1456 void
1457 run_if_first( Action && action ) noexcept(noexcept(action()))
1458 {
1459 if( m_not_executed_yet )
1460 {
1461 m_not_executed_yet = false;
1462 action();
1463 }
1464 }
1465
1467 void
1469 {
1470 m_not_executed_yet = false;
1471 }
1472
1473 private:
1475 };
1476
1480};
1481
1482} /* namespace impl */
1483
1484} /* namespace basic */
1485
1486} /* namespace websocket */
1487
1488} /* namespace restinio */
Helper type for controlling the lifetime of the connection.
An object with info about connection to be passed to state listener.
endpoint_t remote_endpoint() const noexcept
Get the remote endpoint for the connection.
Type of object that tells that the connection has been upgraded to WebSocket.
Exception class for all exceptions thrown by RESTinio.
Definition exception.hpp:26
Wrapper for an executor (strand) used by connections.
Traits::strand_t & get_executor() noexcept
An executor for callbacks on async operations.
Helper class for reading bytes and feeding them to parser.
std::size_t length() const noexcept
How many unconsumed bytes are there in buffer.
void consumed_bytes(std::size_t length) noexcept
Mark how many bytes were obtained.
void obtained_bytes(std::size_t length) noexcept
Mark how many bytes were obtained.
auto make_asio_buffer() noexcept
Make asio buffer for reading bytes from socket.
const char * bytes() const noexcept
Get pointer to unconsumed bytes.
Helper class for writting response data.
void fail_write_group(const asio_ns::error_code &ec)
Handle current group write process failed.
solid_write_operation_variant_t extract_next_write_operation()
et an object with next write operation to perform.
void start_next_write_group(std::optional< write_group_t > next_wg) noexcept
Start handlong next write group.
void finish_write_group()
Finish writing group normally.
bool transmitting() const noexcept
Check if data is trunsmitting now.
connection_id_t connection_id() const noexcept
Get connection id.
Websocket message class with more detailed protocol information.
Definition ws_parser.hpp:63
A helper class for running exclusive action. Only a first action will run.
void run_if_first(Action &&action) noexcept(noexcept(action()))
void disable()
Disable ation: action will not be executed even on a first shot.
Context for handling websocket connections.
void finish_handling_current_write_ctx()
Do post write actions for current write group.
void after_read_header(const asio_ns::error_code &ec, std::size_t length)
Handle read operation result, when reading header.
restinio::impl::connection_settings_handle_t< Traits > m_settings
Common paramaters of a connection.
void consume_header_from_socket()
Initiate read operation on socket to receive bytes for header.
message_handler_t m_msg_handler
Websocket message handler provided by user.
void handle_parsed_header(const message_details_t &md)
Handle parsed header.
bool validate_payload_part(char *payload_data, std::size_t length, std::size_t next_length_remaining)
Validates a part of received payload.
ws_weak_handle_t m_websocket_weak_handle
A waek handler for owning ws_t to use it when call message handler.
virtual void shutdown() override
Shutdown websocket.
void send_close_frame_to_peer(std::string payload)
Send close frame to peer.
void write_data_impl(write_group_t wg, bool is_close_frame)
Implementation of writing data performed on the asio_ns::io_context.
@ read_only_close_frame
Reads only close frame: skip all frames until close-frame.
@ read_nothing
Do not read anything (before activation).
@ read_any_frame
Reads any type of frame and serve it to user.
@ write_disabled
No more outgoing data can be added (e.g. close-frame was sent).
std::chrono::steady_clock::time_point m_close_frame_from_peer_timeout_after
typename timer_manager_t::timer_guard_t timer_guard_t
void init_write_if_necessary()
Checks if there is something to write, and if so starts write operation.
void trigger_error_and_close(status_code_t status, MSG_BUILDER msg_builder) noexcept
Trigger an error.
void start_read_header()
Start the process of reading ws messages from socket.
void start_read_payload(char *payload_data, std::size_t length_remaining, bool do_validate_payload_and_call_msg_handler=true)
Start reading message payload.
tcp_connection_ctx_weak_handle_t m_prepared_weak_ctx
void handle_trivial_write_operation(const trivial_write_operation_t &op)
void init_next_timeout_checking()
schedule next timeout checking.
void handle_invalid_payload(validation_state_t validation_result)
Handle payload errors.
void after_read_payload(char *payload_data, std::size_t length_remaining, const asio_ns::error_code &ec, std::size_t length, bool do_validate_payload_and_call_msg_handler=true)
Handle read operation result, when reading payload.
void consume_header_from_buffer(const char *data, std::size_t length)
Parse header from internal buffer.
void send_close_frame_to_peer(status_code_t code, std::string desc=std::string{})
Send close frame to peer.
ws_connection_t(const ws_connection_t &)=delete
virtual void check_timeout(tcp_connection_ctx_handle_t &self) override
void handle_read_error(const char *desc, const asio_ns::error_code &ec)
Handle read error (reading header or payload)
write_state_t m_write_state
A state of a websocket output.
ws_protocol_validator_t m_protocol_validator
Helper for validating protocol.
void handle_parsed_and_valid_header(const message_details_t &md)
Handle parsed and valid header.
ws_connection_t & operator=(const ws_connection_t &)=delete
void start_waiting_close_frame_only()
Start waiting for close-frame.
std::chrono::steady_clock::time_point m_write_operation_timeout_after
virtual void kill() override
Kill websocket.
void guard_write_operation()
Start guard write operation if necessary.
ws_outgoing_data_t m_outgoing_data
Output buffers queue.
void init_read(ws_handle_t wsh) override
Start reading ws-messages.
std::shared_ptr< timer_manager_t > timer_manager_handle_t
void call_message_handler(message_handle_t close_frame)
Call user message handler with current message.
static ws_connection_t & cast_to_self(tcp_connection_ctx_base_t &base)
Timers.
typename connection_count_limit_types< Traits >::lifetime_monitor_t lifetime_monitor_t
lifetime_monitor_t m_lifetime_monitor
Monitor of the connection lifetime.
void after_write(const asio_ns::error_code &ec)
Handle write response finished.
restinio::impl::write_group_output_ctx_t m_write_output_ctx
Write to socket operation context.
void close_impl() noexcept
Standard close routine.
typename Traits::stream_socket_t stream_socket_t
virtual void write_data(write_group_t wg, bool is_close_frame) override
Write pieces of outgoing data.
void graceful_close()
Close WebSocket connection in a graceful manner.
read_state_t m_read_state
A state of a websocket input.
typename Traits::timer_manager_t timer_manager_t
write_groups_queue_t m_awaiting_write_groups
A queue of buffers.
void append(write_group_t wg)
Add buffers to queue.
std::optional< write_group_t > pop_ready_buffers()
const message_details_t & current_message() const
Get current mesasge details.
size_t parser_execute(const char *data, size_t size)
Parse piece of data from buffer.
bool header_parsed() const
Check header of current websocket message is parsed.
validation_state_t process_and_unmask_next_payload_part(char *data, size_t size)
Validate next part of current frame and reset source part to unmasked data.
validation_state_t process_new_frame(const message_details_t &frame)
Start work with new frame.
validation_state_t finish_frame()
Make final checks of payload if it is necessary and reset state.
Group of writable items transported to the context of underlying connection as one solid piece.
Definition buffers.hpp:727
Detection of compiler version and absence of various features.
#define RESTINIO_ENSURE_NOEXCEPT_CALL(expr)
A wrapper around static_assert for checking that an expression is noexcept and execution of that expr...
Include all core header files in one.
A special wrapper around fmtlib include files.
#define RESTINIO_FMT_FORMAT_STRING(s)
decltype(auto) streamed(T &&v) noexcept
std::shared_ptr< connection_settings_t< Traits > > connection_settings_handle_t
std::size_t uint64_to_size_t(std::uint64_t v)
Helper function for truncating uint64 to std::size_t with exception if that truncation will lead to d...
void suppress_exceptions(Logger &&logger, const char *block_description, Lambda &&lambda) noexcept
Helper function for execution a block of code with suppression of any exceptions raised inside that b...
void log_error_noexcept(Logger &&logger, Message_Builder &&builder) noexcept
void log_trace_noexcept(Logger &&logger, Message_Builder &&builder) noexcept
raw_data_t write_message_details(const message_details_t &message)
Serialize websocket message details into bytes buffer.
constexpr size_t websocket_header_max_size()
Max possible size of websocket frame header (a part before payload).
std::queue< write_group_t > write_groups_queue_t
validation_state_t
States of validated frame.
std::shared_ptr< ws_t > ws_handle_t
Alias for ws_t handle.
std::shared_ptr< message_t > message_handle_t
Request handler, that is the type for calling request handlers.
Definition message.hpp:227
constexpr final_frame_flag_t final_frame
Definition message.hpp:135
constexpr final_frame_flag_t not_final_frame
Definition message.hpp:136
const char * opcode_to_string(opcode_t opcode)
Helper sunction to get method string name.
Definition message.hpp:46
status_code_t status_code_from_bin(string_view_t data)
Definition message.hpp:112
std::string status_code_to_bin(status_code_t code)
Definition message.hpp:101
run_on_this_thread_settings_t< Traits > on_this_thread()
A special marker for the case when http_server must be run on the context of the current thread.
std::weak_ptr< tcp_connection_ctx_base_t > tcp_connection_ctx_weak_handle_t
Alias for http connection weak handle.
@ write_was_not_executed
After write notificator error: data was not sent, connection closed (or aborted) before a given piece...
std::uint64_t connection_id_t
Type for ID of connection.
asio_ns::error_code make_asio_compaible_error(asio_convertible_error_t err) noexcept
Make restinio error_code compatible with asio_ns::error_code.
std::vector< writable_item_t > writable_items_container_t
Definition buffers.hpp:703
std::shared_ptr< tcp_connection_ctx_base_t > tcp_connection_ctx_handle_t
Alias for http connection handle.
Helpers for safe truncation of unsigned integers.
restinio::impl::fixed_buffer_t m_buf
Input buffer.
void reset_parser_and_payload()
Prepare parser for reading new http-message.