tonic/transport/channel/service/
connection.rs1use super::{AddOrigin, Reconnect, SharedExec, UserAgent};
2use crate::{
3 body::Body,
4 transport::{Endpoint, channel::BoxFuture, service::GrpcTimeout},
5};
6use http::{Request, Response, Uri};
7use hyper::rt;
8use hyper::{client::conn::http2::Builder, rt::Executor};
9use hyper_util::rt::TokioTimer;
10use std::{
11 fmt,
12 task::{Context, Poll},
13};
14use tower::load::Load;
15use tower::{
16 ServiceBuilder, ServiceExt,
17 layer::Layer,
18 limit::{concurrency::ConcurrencyLimitLayer, rate::RateLimitLayer},
19 util::BoxService,
20};
21use tower_service::Service;
22
23pub(crate) struct Connection {
24 inner: BoxService<Request<Body>, Response<Body>, crate::BoxError>,
25}
26
27impl Connection {
28 fn new<C>(connector: C, endpoint: Endpoint, is_lazy: bool) -> Self
29 where
30 C: Service<Uri> + Send + 'static,
31 C::Error: Into<crate::BoxError> + Send,
32 C::Future: Send,
33 C::Response: rt::Read + rt::Write + Unpin + Send + 'static,
34 {
35 let mut settings: Builder<SharedExec> = Builder::new(endpoint.executor.clone())
36 .initial_stream_window_size(endpoint.init_stream_window_size)
37 .initial_connection_window_size(endpoint.init_connection_window_size)
38 .keep_alive_interval(endpoint.http2_keep_alive_interval)
39 .timer(TokioTimer::new())
40 .clone();
41
42 if let Some(val) = endpoint.max_frame_size {
43 settings.max_frame_size(val);
44 }
45
46 if let Some(val) = endpoint.http2_keep_alive_timeout {
47 settings.keep_alive_timeout(val);
48 }
49
50 if let Some(val) = endpoint.http2_keep_alive_while_idle {
51 settings.keep_alive_while_idle(val);
52 }
53
54 if let Some(val) = endpoint.http2_adaptive_window {
55 settings.adaptive_window(val);
56 }
57
58 if let Some(val) = endpoint.http2_header_table_size {
59 settings.header_table_size(val);
60 }
61
62 if let Some(val) = endpoint.http2_max_header_list_size {
63 settings.max_header_list_size(val);
64 }
65
66 let stack = ServiceBuilder::new()
67 .layer_fn(|s| {
68 let origin = endpoint.origin.as_ref().unwrap_or(endpoint.uri()).clone();
69
70 AddOrigin::new(s, origin)
71 })
72 .layer_fn(|s| UserAgent::new(s, endpoint.user_agent.clone()))
73 .layer_fn(|s| GrpcTimeout::new(s, endpoint.timeout))
74 .option_layer(endpoint.concurrency_limit.map(ConcurrencyLimitLayer::new))
75 .option_layer(endpoint.rate_limit.map(|(l, d)| RateLimitLayer::new(l, d)))
76 .into_inner();
77
78 let make_service =
79 MakeSendRequestService::new(connector, endpoint.executor.clone(), settings);
80
81 let conn = Reconnect::new(make_service, endpoint.uri().clone(), is_lazy);
82
83 Self {
84 inner: BoxService::new(stack.layer(conn)),
85 }
86 }
87
88 pub(crate) async fn connect<C>(
89 connector: C,
90 endpoint: Endpoint,
91 ) -> Result<Self, crate::BoxError>
92 where
93 C: Service<Uri> + Send + 'static,
94 C::Error: Into<crate::BoxError> + Send,
95 C::Future: Unpin + Send,
96 C::Response: rt::Read + rt::Write + Unpin + Send + 'static,
97 {
98 Self::new(connector, endpoint, false).ready_oneshot().await
99 }
100
101 pub(crate) fn lazy<C>(connector: C, endpoint: Endpoint) -> Self
102 where
103 C: Service<Uri> + Send + 'static,
104 C::Error: Into<crate::BoxError> + Send,
105 C::Future: Send,
106 C::Response: rt::Read + rt::Write + Unpin + Send + 'static,
107 {
108 Self::new(connector, endpoint, true)
109 }
110}
111
112impl Service<Request<Body>> for Connection {
113 type Response = Response<Body>;
114 type Error = crate::BoxError;
115 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
116
117 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
118 Service::poll_ready(&mut self.inner, cx).map_err(Into::into)
119 }
120
121 fn call(&mut self, req: Request<Body>) -> Self::Future {
122 self.inner.call(req)
123 }
124}
125
126impl Load for Connection {
127 type Metric = usize;
128
129 fn load(&self) -> Self::Metric {
130 0
131 }
132}
133
134impl fmt::Debug for Connection {
135 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136 f.debug_struct("Connection").finish()
137 }
138}
139
140struct SendRequest {
141 inner: hyper::client::conn::http2::SendRequest<Body>,
142}
143
144impl From<hyper::client::conn::http2::SendRequest<Body>> for SendRequest {
145 fn from(inner: hyper::client::conn::http2::SendRequest<Body>) -> Self {
146 Self { inner }
147 }
148}
149
150impl tower::Service<Request<Body>> for SendRequest {
151 type Response = Response<Body>;
152 type Error = crate::BoxError;
153 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
154
155 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
156 self.inner.poll_ready(cx).map_err(Into::into)
157 }
158
159 fn call(&mut self, req: Request<Body>) -> Self::Future {
160 let fut = self.inner.send_request(req);
161
162 Box::pin(async move { fut.await.map_err(Into::into).map(|res| res.map(Body::new)) })
163 }
164}
165
166struct MakeSendRequestService<C> {
167 connector: C,
168 executor: SharedExec,
169 settings: Builder<SharedExec>,
170}
171
172impl<C> MakeSendRequestService<C> {
173 fn new(connector: C, executor: SharedExec, settings: Builder<SharedExec>) -> Self {
174 Self {
175 connector,
176 executor,
177 settings,
178 }
179 }
180}
181
182impl<C> tower::Service<Uri> for MakeSendRequestService<C>
183where
184 C: Service<Uri> + Send + 'static,
185 C::Error: Into<crate::BoxError> + Send,
186 C::Future: Send,
187 C::Response: rt::Read + rt::Write + Unpin + Send,
188{
189 type Response = SendRequest;
190 type Error = crate::BoxError;
191 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
192
193 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
194 self.connector.poll_ready(cx).map_err(Into::into)
195 }
196
197 fn call(&mut self, req: Uri) -> Self::Future {
198 let fut = self.connector.call(req);
199 let builder = self.settings.clone();
200 let executor = self.executor.clone();
201
202 Box::pin(async move {
203 let io = fut.await.map_err(Into::into)?;
204 let (send_request, conn) = builder.handshake(io).await?;
205
206 Executor::<BoxFuture<'static, ()>>::execute(
207 &executor,
208 Box::pin(async move {
209 if let Err(e) = conn.await {
210 tracing::debug!("connection task error: {:?}", e);
211 }
212 }) as _,
213 );
214
215 Ok(SendRequest::from(send_request))
216 })
217 }
218}