44 *--------------------------------------------------------------------------------------------*/
55
66use crate :: { constants:: APPLICATION_NAME , util:: errors:: CodeError } ;
7- use async_trait :: async_trait ;
7+ use std :: future :: Future ;
88use std:: path:: { Path , PathBuf } ;
99use std:: pin:: Pin ;
10- use std:: task:: { Context , Poll } ;
1110use tokio:: io:: { AsyncRead , AsyncWrite } ;
1211use tokio:: net:: TcpListener ;
1312use uuid:: Uuid ;
@@ -46,6 +45,7 @@ cfg_if::cfg_if! {
4645 } else {
4746 use tokio:: { time:: sleep, io:: ReadBuf } ;
4847 use tokio:: net:: windows:: named_pipe:: { ClientOptions , ServerOptions , NamedPipeClient , NamedPipeServer } ;
48+ use std:: task:: { Context , Poll } ;
4949 use std:: { time:: Duration , io} ;
5050 use pin_project:: pin_project;
5151
@@ -176,57 +176,6 @@ cfg_if::cfg_if! {
176176 }
177177}
178178
179- impl AsyncPipeListener {
180- pub fn into_pollable ( self ) -> PollableAsyncListener {
181- PollableAsyncListener {
182- listener : Some ( self ) ,
183- write_fut : tokio_util:: sync:: ReusableBoxFuture :: new ( make_accept_fut ( None ) ) ,
184- }
185- }
186- }
187-
188- pub struct PollableAsyncListener {
189- listener : Option < AsyncPipeListener > ,
190- write_fut : tokio_util:: sync:: ReusableBoxFuture <
191- ' static ,
192- ( AsyncPipeListener , Result < AsyncPipe , CodeError > ) ,
193- > ,
194- }
195-
196- async fn make_accept_fut (
197- data : Option < AsyncPipeListener > ,
198- ) -> ( AsyncPipeListener , Result < AsyncPipe , CodeError > ) {
199- match data {
200- Some ( mut l) => {
201- let c = l. accept ( ) . await ;
202- ( l, c)
203- }
204- None => unreachable ! ( "this future should not be pollable in this state" ) ,
205- }
206- }
207-
208- impl hyper:: server:: accept:: Accept for PollableAsyncListener {
209- type Conn = AsyncPipe ;
210- type Error = CodeError ;
211-
212- fn poll_accept (
213- mut self : Pin < & mut Self > ,
214- cx : & mut Context < ' _ > ,
215- ) -> Poll < Option < Result < Self :: Conn , Self :: Error > > > {
216- if let Some ( l) = self . listener . take ( ) {
217- self . write_fut . set ( make_accept_fut ( Some ( l) ) )
218- }
219-
220- match self . write_fut . poll ( cx) {
221- Poll :: Ready ( ( l, cnx) ) => {
222- self . listener = Some ( l) ;
223- Poll :: Ready ( Some ( cnx) )
224- }
225- Poll :: Pending => Poll :: Pending ,
226- }
227- }
228- }
229-
230179/// Gets a random name for a pipe/socket on the platform
231180pub fn get_socket_name ( ) -> PathBuf {
232181 cfg_if:: cfg_if! {
@@ -243,28 +192,41 @@ pub type AcceptedRW = (
243192 Box < dyn AsyncWrite + Send + Unpin > ,
244193) ;
245194
246- #[ async_trait]
247195pub trait AsyncRWAccepter {
248- async fn accept_rw ( & mut self ) -> Result < AcceptedRW , CodeError > ;
196+ fn accept_rw (
197+ & mut self ,
198+ ) -> Pin < Box < dyn Future < Output = Result < AcceptedRW , CodeError > > + Send + ' _ > > ;
249199}
250200
251- #[ async_trait]
252201impl AsyncRWAccepter for AsyncPipeListener {
253- async fn accept_rw ( & mut self ) -> Result < AcceptedRW , CodeError > {
254- let pipe = self . accept ( ) . await ?;
255- let ( read, write) = socket_stream_split ( pipe) ;
256- Ok ( ( Box :: new ( read) , Box :: new ( write) ) )
202+ fn accept_rw (
203+ & mut self ,
204+ ) -> Pin < Box < dyn Future < Output = Result < AcceptedRW , CodeError > > + Send + ' _ > > {
205+ Box :: pin ( async move {
206+ let pipe = self . accept ( ) . await ?;
207+ let ( read, write) = socket_stream_split ( pipe) ;
208+ Ok ( (
209+ Box :: new ( read) as Box < dyn AsyncRead + Send + Unpin > ,
210+ Box :: new ( write) as Box < dyn AsyncWrite + Send + Unpin > ,
211+ ) )
212+ } )
257213 }
258214}
259215
260- #[ async_trait]
261216impl AsyncRWAccepter for TcpListener {
262- async fn accept_rw ( & mut self ) -> Result < AcceptedRW , CodeError > {
263- let ( stream, _) = self
264- . accept ( )
265- . await
266- . map_err ( CodeError :: AsyncPipeListenerFailed ) ?;
267- let ( read, write) = tokio:: io:: split ( stream) ;
268- Ok ( ( Box :: new ( read) , Box :: new ( write) ) )
217+ fn accept_rw (
218+ & mut self ,
219+ ) -> Pin < Box < dyn Future < Output = Result < AcceptedRW , CodeError > > + Send + ' _ > > {
220+ Box :: pin ( async move {
221+ let ( stream, _) = self
222+ . accept ( )
223+ . await
224+ . map_err ( CodeError :: AsyncPipeListenerFailed ) ?;
225+ let ( read, write) = tokio:: io:: split ( stream) ;
226+ Ok ( (
227+ Box :: new ( read) as Box < dyn AsyncRead + Send + Unpin > ,
228+ Box :: new ( write) as Box < dyn AsyncWrite + Send + Unpin > ,
229+ ) )
230+ } )
269231 }
270232}
0 commit comments