Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/beast2
8 : //
9 :
10 : #ifndef BOOST_BEAST2_SERVER_HTTP_STREAM_HPP
11 : #define BOOST_BEAST2_SERVER_HTTP_STREAM_HPP
12 :
13 : #include <boost/beast2/detail/config.hpp>
14 : #include <boost/beast2/log_service.hpp>
15 : #include <boost/beast2/format.hpp>
16 : #include <boost/beast2/read.hpp>
17 : #include <boost/beast2/write.hpp>
18 : #include <boost/beast2/server/any_lambda.hpp>
19 : #include <boost/beast2/server/route_handler_asio.hpp>
20 : #include <boost/beast2/server/router_asio.hpp>
21 : #include <boost/beast2/error.hpp>
22 : #include <boost/beast2/detail/except.hpp>
23 : #include <boost/capy/application.hpp>
24 : #include <boost/http_proto/request_parser.hpp>
25 : #include <boost/http_proto/response.hpp>
26 : #include <boost/http_proto/serializer.hpp>
27 : #include <boost/http_proto/string_body.hpp>
28 : #include <boost/http_proto/server/basic_router.hpp>
29 : #include <boost/url/parse.hpp>
30 : #include <boost/asio/prepend.hpp>
31 :
32 : namespace boost {
33 : namespace beast2 {
34 :
35 : //------------------------------------------------
36 :
37 : /** An HTTP server stream which routes requests to handlers and sends responses.
38 :
39 : An object of this type wraps an asynchronous Boost.ASIO stream and implements
40 : a high level server connection which reads HTTP requests, routes them to
41 : handlers installed in a router, and sends the HTTP response.
42 :
43 : @par Requires
44 : `AsyncStream` must satisfy <em>AsyncReadStream</em> and <em>AsyncWriteStream</em>
45 :
46 : @tparam AsyncStream The type of asynchronous stream.
47 : */
48 : template<class AsyncStream>
49 : class http_stream
50 : : private http::suspender::owner
51 : {
52 : public:
53 : /** Constructor.
54 :
55 : This initializes a new HTTP connection object that operates on
56 : the given stream, uses the specified router to dispatch incoming
57 : requests, and calls the supplied completion function when the
58 : connection closes or fails.
59 :
60 : Construction does not start any I/O; call @ref on_stream_begin when
61 : the stream is connected to the remote peer to begin reading
62 : requests and processing them.
63 :
64 : @param app The owning application, used to access shared services
65 : such as logging and protocol objects.
66 : @param stream The underlying asynchronous stream to read from
67 : and write to. The caller is responsible for maintaining its
68 : lifetime for the duration of the session.
69 : @param routes The router used to dispatch incoming HTTP requests.
70 : @param close_fn The function invoked when the connection is closed
71 : or an unrecoverable error occurs.
72 : */
73 : http_stream(
74 : capy::application& app,
75 : AsyncStream& stream,
76 : router_asio<AsyncStream&> routes,
77 : any_lambda<void(system::error_code)> close_fn);
78 :
79 : /** Called to start a new HTTP session
80 :
81 : The stream must be in a connected,
82 : correct state for a new session.
83 : */
84 : void on_stream_begin(http::acceptor_config const& config);
85 :
86 : private:
87 : void do_read();
88 : void on_read(
89 : system::error_code ec,
90 : std::size_t bytes_transferred);
91 : void on_headers();
92 : void do_dispatch(http::route_result rv = {});
93 : void do_read_body();
94 : void on_read_body(
95 : system::error_code ec,
96 : std::size_t bytes_transferred);
97 : void do_respond(http::route_result rv);
98 : void do_write();
99 : void on_write(
100 : system::error_code const& ec,
101 : std::size_t bytes_transferred);
102 : void on_complete();
103 : http::resumer do_suspend() override;
104 : void do_resume(http::route_result const& ec) override;
105 : void do_close();
106 : void do_fail(core::string_view s,
107 : system::error_code const& ec);
108 : void clear() noexcept;
109 :
110 : protected:
111 0 : std::string id() const
112 : {
113 0 : return std::string("[") + std::to_string(id_) + "] ";
114 : }
115 :
116 : protected:
117 : struct resetter;
118 : section sect_;
119 : std::size_t id_ = 0;
120 : AsyncStream& stream_;
121 : router_asio<AsyncStream&> routes_;
122 : any_lambda<void(system::error_code)> close_;
123 : http::acceptor_config const* pconfig_ = nullptr;
124 :
125 : using work_guard = asio::executor_work_guard<decltype(
126 : std::declval<AsyncStream&>().get_executor())>;
127 : std::unique_ptr<work_guard> pwg_;
128 : asio_route_params<AsyncStream&> rp_;
129 : };
130 :
131 : //------------------------------------------------
132 :
133 : // for exception safety
134 : template<class AsyncStream>
135 : struct http_stream<AsyncStream>::
136 : resetter
137 : {
138 : ~resetter()
139 : {
140 : if(clear_)
141 : owner_.clear();
142 : }
143 :
144 : explicit resetter(
145 : http_stream<AsyncStream>& owner) noexcept
146 : : owner_(owner)
147 : {
148 : }
149 :
150 : void accept()
151 : {
152 : clear_ = false;
153 : }
154 :
155 : private:
156 : http_stream<AsyncStream>& owner_;
157 : bool clear_ = true;
158 : };
159 :
160 : //------------------------------------------------
161 :
162 : template<class AsyncStream>
163 0 : http_stream<AsyncStream>::
164 : http_stream(
165 : capy::application& app,
166 : AsyncStream& stream,
167 : router_asio<AsyncStream&> routes,
168 : any_lambda<void(system::error_code)> close)
169 0 : : sect_(use_log_service(app).get_section("http_stream"))
170 0 : , id_(
171 0 : []() noexcept
172 : {
173 : static std::size_t n = 0;
174 0 : return ++n;
175 0 : }())
176 0 : , stream_(stream)
177 0 : , routes_(std::move(routes))
178 0 : , close_(close)
179 0 : , rp_(stream_)
180 : {
181 0 : rp_.parser = http::request_parser(app);
182 :
183 0 : rp_.serializer = http::serializer(app);
184 0 : rp_.suspend = http::suspender(*this);
185 0 : }
186 :
187 : // called to start a new HTTP session.
188 : // the connection must be in the correct state already.
189 : template<class AsyncStream>
190 : void
191 0 : http_stream<AsyncStream>::
192 : on_stream_begin(
193 : http::acceptor_config const& config)
194 : {
195 0 : pconfig_ = &config;
196 :
197 0 : rp_.parser.reset();
198 0 : rp_.session_data.clear();
199 0 : do_read();
200 0 : }
201 :
202 : // begin reading the request
203 : template<class AsyncStream>
204 : void
205 0 : http_stream<AsyncStream>::
206 : do_read()
207 : {
208 0 : rp_.parser.start();
209 :
210 0 : beast2::async_read_some(
211 : stream_,
212 : rp_.parser,
213 0 : call_mf(&http_stream::on_read, this));
214 0 : }
215 :
216 : // called when the read operation completes
217 : template<class AsyncStream>
218 : void
219 0 : http_stream<AsyncStream>::
220 : on_read(
221 : system::error_code ec,
222 : std::size_t bytes_transferred)
223 : {
224 : (void)bytes_transferred;
225 :
226 0 : if(ec.failed())
227 0 : return do_fail("http_stream::on_read", ec);
228 :
229 0 : LOG_TRC(this->sect_)(
230 : "{} http_stream::on_read bytes={}",
231 : this->id(), bytes_transferred);
232 :
233 0 : on_headers();
234 : }
235 :
236 : // called to set up the response after reading the request
237 : template<class AsyncStream>
238 : void
239 0 : http_stream<AsyncStream>::
240 : on_headers()
241 : {
242 : // set up Request and Response objects
243 : // VFALCO HACK for now we make a copy of the message
244 0 : rp_.req = rp_.parser.get();
245 0 : rp_.route_data.clear();
246 0 : rp_.res.set_start_line( // VFALCO WTF
247 : http::status::ok, rp_.req.version());
248 0 : rp_.res.set_keep_alive(rp_.req.keep_alive());
249 0 : rp_.serializer.reset();
250 :
251 : // parse the URL
252 : {
253 0 : auto rv = urls::parse_uri_reference(rp_.req.target());
254 0 : if(rv.has_error())
255 : {
256 : // error parsing URL
257 0 : rp_.status(http::status::bad_request);
258 0 : rp_.set_body("Bad Request: " + rv.error().message());
259 0 : return do_respond(rv.error());
260 : }
261 :
262 0 : rp_.url = rv.value();
263 : }
264 :
265 : // invoke handlers for the route
266 0 : do_dispatch();
267 : }
268 :
269 : // called to dispatch or resume the route
270 : template<class AsyncStream>
271 : void
272 0 : http_stream<AsyncStream>::
273 : do_dispatch(
274 : http::route_result rv)
275 : {
276 0 : if(! rv.failed())
277 : {
278 0 : BOOST_ASSERT(! pwg_); // can't be suspended
279 0 : rv = routes_.dispatch(
280 0 : rp_.req.method(), rp_.url, rp_);
281 : }
282 : else
283 : {
284 0 : rv = routes_.resume(rp_, rv);
285 : }
286 :
287 0 : do_respond(rv);
288 0 : }
289 :
290 : // finish reading the body
291 : template<class AsyncStream>
292 : void
293 0 : http_stream<AsyncStream>::
294 : do_read_body()
295 : {
296 0 : beast2::async_read(
297 : stream_,
298 : rp_.parser,
299 0 : call_mf(&http_stream::on_read_body, this));
300 0 : }
301 :
302 : // called repeatedly when reading the body
303 : template<class AsyncStream>
304 : void
305 0 : http_stream<AsyncStream>::
306 : on_read_body(
307 : system::error_code ec,
308 : std::size_t bytes_transferred)
309 : {
310 0 : if(ec.failed())
311 0 : return do_fail("http_stream::on_read_body", ec);
312 :
313 0 : LOG_TRC(this->sect_)(
314 : "{} http_stream::on_read_body bytes={}",
315 : this->id(), bytes_transferred);
316 :
317 0 : BOOST_ASSERT(rp_.parser.is_complete());
318 :
319 0 : rp_.do_finish();
320 : }
321 :
322 : // called after obtaining a route result
323 : template<class AsyncStream>
324 : void
325 0 : http_stream<AsyncStream>::
326 : do_respond(
327 : http::route_result rv)
328 : {
329 0 : BOOST_ASSERT(rv != http::route::next_route);
330 :
331 0 : if(rv == http::route::close)
332 : {
333 0 : return do_close();
334 : }
335 :
336 0 : if(rv == http::route::complete)
337 : {
338 : // VFALCO what if the connection was closed or keep-alive=false?
339 : // handler sent the response?
340 0 : BOOST_ASSERT(rp_.serializer.is_done());
341 0 : return on_write(system::error_code(), 0);
342 : }
343 :
344 0 : if(rv == http::route::suspend)
345 : {
346 : // didn't call suspend()?
347 0 : if(! pwg_)
348 0 : detail::throw_logic_error();
349 0 : if(rp_.parser.is_body_set())
350 0 : return do_read_body();
351 0 : return;
352 : }
353 :
354 0 : if(rv == http::route::next)
355 : {
356 : // unhandled request
357 0 : auto const status = http::status::not_found;
358 0 : rp_.status(status);
359 0 : rp_.set_body(http::to_string(status));
360 : }
361 0 : else if(rv != http::route::send)
362 : {
363 : // error message of last resort
364 0 : BOOST_ASSERT(rv.failed());
365 0 : BOOST_ASSERT(! http::is_route_result(rv));
366 0 : rp_.status(http::status::internal_server_error);
367 0 : std::string s;
368 0 : format_to(s, "An internal server error occurred: {}", rv.message());
369 0 : rp_.res.set_keep_alive(false); // VFALCO?
370 0 : rp_.set_body(s);
371 0 : }
372 :
373 0 : do_write();
374 : }
375 :
376 : // begin writing the response
377 : template<class AsyncStream>
378 : void
379 0 : http_stream<AsyncStream>::
380 : do_write()
381 : {
382 0 : BOOST_ASSERT(! rp_.serializer.is_done());
383 0 : beast2::async_write(stream_, rp_.serializer,
384 0 : call_mf(&http_stream::on_write, this));
385 0 : }
386 :
387 : // called when the write operation completes
388 : template<class AsyncStream>
389 : void
390 0 : http_stream<AsyncStream>::
391 : on_write(
392 : system::error_code const& ec,
393 : std::size_t bytes_transferred)
394 : {
395 : (void)bytes_transferred;
396 :
397 0 : if(ec.failed())
398 0 : return do_fail("http_stream::on_write", ec);
399 :
400 0 : BOOST_ASSERT(rp_.serializer.is_done());
401 :
402 0 : LOG_TRC(this->sect_)(
403 : "{} http_stream::on_write bytes={}",
404 : this->id(), bytes_transferred);
405 :
406 0 : if(rp_.res.keep_alive())
407 0 : return do_read();
408 :
409 0 : do_close();
410 : }
411 :
412 : template<class AsyncStream>
413 : auto
414 0 : http_stream<AsyncStream>::
415 : do_suspend() ->
416 : http::resumer
417 : {
418 0 : BOOST_ASSERT(stream_.get_executor().running_in_this_thread());
419 :
420 : // can't call twice
421 0 : BOOST_ASSERT(! pwg_);
422 0 : pwg_.reset(new work_guard(stream_.get_executor()));
423 :
424 : // VFALCO cancel timer
425 :
426 0 : return http::resumer(*this);
427 : }
428 :
429 : // called by resume(rv)
430 : template<class AsyncStream>
431 : void
432 0 : http_stream<AsyncStream>::
433 : do_resume(
434 : http::route_result const& rv)
435 : {
436 0 : asio::dispatch(
437 0 : stream_.get_executor(),
438 0 : [this, rv]
439 : {
440 0 : BOOST_ASSERT(pwg_.get() != nullptr);
441 0 : pwg_.reset();
442 :
443 0 : do_dispatch(rv);
444 : });
445 0 : }
446 :
447 : // called when a non-recoverable error occurs
448 : template<class AsyncStream>
449 : void
450 0 : http_stream<AsyncStream>::
451 : do_fail(
452 : core::string_view s, system::error_code const& ec)
453 : {
454 0 : LOG_TRC(this->sect_)("{}: {}", s, ec.message());
455 :
456 : // tidy up lingering objects
457 0 : rp_.parser.reset();
458 0 : rp_.serializer.reset();
459 :
460 0 : close_(ec);
461 0 : }
462 :
463 : // end the session
464 : template<class AsyncStream>
465 : void
466 0 : http_stream<AsyncStream>::
467 : do_close()
468 : {
469 0 : clear();
470 0 : close_({});
471 0 : }
472 :
473 : // clear everything, releasing transient objects
474 : template<class AsyncStream>
475 : void
476 0 : http_stream<AsyncStream>::
477 : clear() noexcept
478 : {
479 0 : rp_.parser.reset();
480 0 : rp_.serializer.reset();
481 0 : rp_.res.clear();
482 0 : }
483 :
484 : } // beast2
485 : } // boost
486 :
487 : #endif
|