tonic/transport/channel/service/
connection.rs

1use super::{AddOrigin, Reconnect, SharedExec, UserAgent};
2use crate::{
3    body::Body,
4    transport::{Endpoint, channel::BoxFuture, service::GrpcTimeout},
5};
6use http::{Request, Response, Uri};
7use hyper::rt;
8use hyper::{client::conn::http2::Builder, rt::Executor};
9use hyper_util::rt::TokioTimer;
10use std::{
11    fmt,
12    task::{Context, Poll},
13};
14use tower::load::Load;
15use tower::{
16    ServiceBuilder, ServiceExt,
17    layer::Layer,
18    limit::{concurrency::ConcurrencyLimitLayer, rate::RateLimitLayer},
19    util::BoxService,
20};
21use tower_service::Service;
22
23pub(crate) struct Connection {
24    inner: BoxService<Request<Body>, Response<Body>, crate::BoxError>,
25}
26
27impl Connection {
28    fn new<C>(connector: C, endpoint: Endpoint, is_lazy: bool) -> Self
29    where
30        C: Service<Uri> + Send + 'static,
31        C::Error: Into<crate::BoxError> + Send,
32        C::Future: Send,
33        C::Response: rt::Read + rt::Write + Unpin + Send + 'static,
34    {
35        let mut settings: Builder<SharedExec> = Builder::new(endpoint.executor.clone())
36            .initial_stream_window_size(endpoint.init_stream_window_size)
37            .initial_connection_window_size(endpoint.init_connection_window_size)
38            .keep_alive_interval(endpoint.http2_keep_alive_interval)
39            .timer(TokioTimer::new())
40            .clone();
41
42        if let Some(val) = endpoint.max_frame_size {
43            settings.max_frame_size(val);
44        }
45
46        if let Some(val) = endpoint.http2_keep_alive_timeout {
47            settings.keep_alive_timeout(val);
48        }
49
50        if let Some(val) = endpoint.http2_keep_alive_while_idle {
51            settings.keep_alive_while_idle(val);
52        }
53
54        if let Some(val) = endpoint.http2_adaptive_window {
55            settings.adaptive_window(val);
56        }
57
58        if let Some(val) = endpoint.http2_header_table_size {
59            settings.header_table_size(val);
60        }
61
62        if let Some(val) = endpoint.http2_max_header_list_size {
63            settings.max_header_list_size(val);
64        }
65
66        let stack = ServiceBuilder::new()
67            .layer_fn(|s| {
68                let origin = endpoint.origin.as_ref().unwrap_or(endpoint.uri()).clone();
69
70                AddOrigin::new(s, origin)
71            })
72            .layer_fn(|s| UserAgent::new(s, endpoint.user_agent.clone()))
73            .layer_fn(|s| GrpcTimeout::new(s, endpoint.timeout))
74            .option_layer(endpoint.concurrency_limit.map(ConcurrencyLimitLayer::new))
75            .option_layer(endpoint.rate_limit.map(|(l, d)| RateLimitLayer::new(l, d)))
76            .into_inner();
77
78        let make_service =
79            MakeSendRequestService::new(connector, endpoint.executor.clone(), settings);
80
81        let conn = Reconnect::new(make_service, endpoint.uri().clone(), is_lazy);
82
83        Self {
84            inner: BoxService::new(stack.layer(conn)),
85        }
86    }
87
88    pub(crate) async fn connect<C>(
89        connector: C,
90        endpoint: Endpoint,
91    ) -> Result<Self, crate::BoxError>
92    where
93        C: Service<Uri> + Send + 'static,
94        C::Error: Into<crate::BoxError> + Send,
95        C::Future: Unpin + Send,
96        C::Response: rt::Read + rt::Write + Unpin + Send + 'static,
97    {
98        Self::new(connector, endpoint, false).ready_oneshot().await
99    }
100
101    pub(crate) fn lazy<C>(connector: C, endpoint: Endpoint) -> Self
102    where
103        C: Service<Uri> + Send + 'static,
104        C::Error: Into<crate::BoxError> + Send,
105        C::Future: Send,
106        C::Response: rt::Read + rt::Write + Unpin + Send + 'static,
107    {
108        Self::new(connector, endpoint, true)
109    }
110}
111
112impl Service<Request<Body>> for Connection {
113    type Response = Response<Body>;
114    type Error = crate::BoxError;
115    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
116
117    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
118        Service::poll_ready(&mut self.inner, cx).map_err(Into::into)
119    }
120
121    fn call(&mut self, req: Request<Body>) -> Self::Future {
122        self.inner.call(req)
123    }
124}
125
126impl Load for Connection {
127    type Metric = usize;
128
129    fn load(&self) -> Self::Metric {
130        0
131    }
132}
133
134impl fmt::Debug for Connection {
135    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136        f.debug_struct("Connection").finish()
137    }
138}
139
140struct SendRequest {
141    inner: hyper::client::conn::http2::SendRequest<Body>,
142}
143
144impl From<hyper::client::conn::http2::SendRequest<Body>> for SendRequest {
145    fn from(inner: hyper::client::conn::http2::SendRequest<Body>) -> Self {
146        Self { inner }
147    }
148}
149
150impl tower::Service<Request<Body>> for SendRequest {
151    type Response = Response<Body>;
152    type Error = crate::BoxError;
153    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
154
155    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
156        self.inner.poll_ready(cx).map_err(Into::into)
157    }
158
159    fn call(&mut self, req: Request<Body>) -> Self::Future {
160        let fut = self.inner.send_request(req);
161
162        Box::pin(async move { fut.await.map_err(Into::into).map(|res| res.map(Body::new)) })
163    }
164}
165
166struct MakeSendRequestService<C> {
167    connector: C,
168    executor: SharedExec,
169    settings: Builder<SharedExec>,
170}
171
172impl<C> MakeSendRequestService<C> {
173    fn new(connector: C, executor: SharedExec, settings: Builder<SharedExec>) -> Self {
174        Self {
175            connector,
176            executor,
177            settings,
178        }
179    }
180}
181
182impl<C> tower::Service<Uri> for MakeSendRequestService<C>
183where
184    C: Service<Uri> + Send + 'static,
185    C::Error: Into<crate::BoxError> + Send,
186    C::Future: Send,
187    C::Response: rt::Read + rt::Write + Unpin + Send,
188{
189    type Response = SendRequest;
190    type Error = crate::BoxError;
191    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
192
193    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
194        self.connector.poll_ready(cx).map_err(Into::into)
195    }
196
197    fn call(&mut self, req: Uri) -> Self::Future {
198        let fut = self.connector.call(req);
199        let builder = self.settings.clone();
200        let executor = self.executor.clone();
201
202        Box::pin(async move {
203            let io = fut.await.map_err(Into::into)?;
204            let (send_request, conn) = builder.handshake(io).await?;
205
206            Executor::<BoxFuture<'static, ()>>::execute(
207                &executor,
208                Box::pin(async move {
209                    if let Err(e) = conn.await {
210                        tracing::debug!("connection task error: {:?}", e);
211                    }
212                }) as _,
213            );
214
215            Ok(SendRequest::from(send_request))
216        })
217    }
218}