diff --git a/ffi/src/core_device/diagnosticsservice.rs b/ffi/src/core_device/diagnosticsservice.rs index 24476db..31ca02a 100644 --- a/ffi/src/core_device/diagnosticsservice.rs +++ b/ffi/src/core_device/diagnosticsservice.rs @@ -7,6 +7,7 @@ use std::ptr::null_mut; use futures::{Stream, StreamExt}; use idevice::core_device::DiagnostisServiceClient; use idevice::{IdeviceError, ReadWrite, RsdService}; +use log::debug; use crate::core_device_proxy::AdapterHandle; use crate::rsd::RsdHandshakeHandle; @@ -45,12 +46,17 @@ pub unsafe extern "C" fn diagnostics_service_connect_rsd( RUNTIME.block_on(async move { let provider_ref = unsafe { &mut (*provider).0 }; let handshake_ref = unsafe { &mut (*handshake).0 }; + debug!( + "Connecting to DiagnosticsService: provider {provider_ref:?}, handshake: {:?}", + handshake_ref.uuid + ); DiagnostisServiceClient::connect_rsd(provider_ref, handshake_ref).await }); match res { Ok(client) => { + debug!("Connected to DiagnosticsService"); let boxed = Box::new(DiagnosticsServiceHandle(client)); unsafe { *handle = Box::into_raw(boxed) }; null_mut() diff --git a/idevice/src/services/rsd.rs b/idevice/src/services/rsd.rs index 4013816..1f2555a 100644 --- a/idevice/src/services/rsd.rs +++ b/idevice/src/services/rsd.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; -use log::warn; +use log::{debug, warn}; use serde::Deserialize; use crate::{IdeviceError, ReadWrite, RemoteXpcClient, provider::RsdProvider}; @@ -169,6 +169,10 @@ impl RsdHandshake { } }; + debug!( + "Connecting to RSD service {service_name} on port {}", + service.port + ); let stream = provider.connect_to_service_port(service.port).await?; T::from_stream(stream).await } diff --git a/idevice/src/tcp/adapter.rs b/idevice/src/tcp/adapter.rs index c366992..9d60868 100644 --- a/idevice/src/tcp/adapter.rs +++ b/idevice/src/tcp/adapter.rs @@ -205,6 +205,7 @@ impl Adapter { // Wait for the syn ack self.states.insert(host_port, state); + let start_time = std::time::Instant::now(); loop { self.process_tcp_packet().await?; if let Some(s) = self.states.get(&host_port) { @@ -216,6 +217,12 @@ impl Adapter { return Err(std::io::Error::new(e, "failed to connect")); } ConnectionStatus::WaitingForSyn => { + if start_time.elapsed() > std::time::Duration::from_secs(5) { + return Err(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "didn't syn in time", + )); + } continue; } } @@ -526,7 +533,7 @@ impl Adapter { self.write_buffer_flush().await?; Ok(loop { // try the data we already have - match Ipv6Packet::parse(&self.read_buf[..self.bytes_in_buf]) { + match Ipv6Packet::parse(&self.read_buf[..self.bytes_in_buf], &self.pcap) { IpParseError::Ok { packet, bytes_consumed, @@ -559,8 +566,15 @@ impl Adapter { } pub(crate) async fn process_tcp_packet(&mut self) -> Result<(), std::io::Error> { - let ip_packet = self.read_ip_packet().await?; - self.process_tcp_packet_from_payload(&ip_packet).await + tokio::select! { + ip_packet = self.read_ip_packet() => { + let ip_packet = ip_packet?; + self.process_tcp_packet_from_payload(&ip_packet).await + } + _ = tokio::time::sleep(std::time::Duration::from_secs(15)) => { + Ok(()) + } + } } pub(crate) async fn process_tcp_packet_from_payload( diff --git a/idevice/src/tcp/handle.rs b/idevice/src/tcp/handle.rs index 7da3aff..183704b 100644 --- a/idevice/src/tcp/handle.rs +++ b/idevice/src/tcp/handle.rs @@ -12,6 +12,7 @@ use log::trace; use tokio::{ io::{AsyncRead, AsyncWrite}, sync::oneshot, + time::timeout, }; use crate::tcp::adapter::ConnectionStatus; @@ -172,8 +173,8 @@ impl AdapterHandle { )); } - match res_rx.await { - Ok(r) => { + match timeout(std::time::Duration::from_secs(8), res_rx).await { + Ok(Ok(r)) => { let (host_port, recv_channel) = r?; Ok(StreamHandle { host_port, @@ -183,10 +184,14 @@ impl AdapterHandle { pending_writes: FuturesUnordered::new(), }) } - Err(_) => Err(std::io::Error::new( + Ok(Err(_)) => Err(std::io::Error::new( std::io::ErrorKind::BrokenPipe, "adapter closed", )), + Err(_) => Err(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "channel recv timeout", + )), } } diff --git a/idevice/src/tcp/packets.rs b/idevice/src/tcp/packets.rs index 251dc14..4ae328f 100644 --- a/idevice/src/tcp/packets.rs +++ b/idevice/src/tcp/packets.rs @@ -230,7 +230,10 @@ pub(crate) enum IpParseError { } impl Ipv6Packet { - pub(crate) fn parse(packet: &[u8]) -> IpParseError { + pub(crate) fn parse( + packet: &[u8], + log: &Option>>, + ) -> IpParseError { if packet.len() < 40 { return IpParseError::NotEnough; } @@ -275,6 +278,13 @@ impl Ipv6Packet { ); let payload = packet[40..total_packet_len].to_vec(); + if let Some(log) = log { + let mut log_packet = Vec::new(); + log_packet.extend_from_slice(&packet[..40]); + log_packet.extend_from_slice(&payload); + super::log_packet(log, &log_packet); + } + IpParseError::Ok { packet: Self { version, @@ -699,7 +709,7 @@ mod tests { ); println!("{b1:02X?}"); - let ip1 = Ipv6Packet::parse(&b1); + let ip1 = Ipv6Packet::parse(&b1, &None); println!("{ip1:#?}"); }