Timeout on port connect syn

This commit is contained in:
Jackson Coxson
2025-08-27 11:38:44 -06:00
parent 7baa8a73b5
commit 4fde7cf06b
5 changed files with 48 additions and 9 deletions

View File

@@ -7,6 +7,7 @@ use std::ptr::null_mut;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use idevice::core_device::DiagnostisServiceClient; use idevice::core_device::DiagnostisServiceClient;
use idevice::{IdeviceError, ReadWrite, RsdService}; use idevice::{IdeviceError, ReadWrite, RsdService};
use log::debug;
use crate::core_device_proxy::AdapterHandle; use crate::core_device_proxy::AdapterHandle;
use crate::rsd::RsdHandshakeHandle; use crate::rsd::RsdHandshakeHandle;
@@ -45,12 +46,17 @@ pub unsafe extern "C" fn diagnostics_service_connect_rsd(
RUNTIME.block_on(async move { RUNTIME.block_on(async move {
let provider_ref = unsafe { &mut (*provider).0 }; let provider_ref = unsafe { &mut (*provider).0 };
let handshake_ref = unsafe { &mut (*handshake).0 }; let handshake_ref = unsafe { &mut (*handshake).0 };
debug!(
"Connecting to DiagnosticsService: provider {provider_ref:?}, handshake: {:?}",
handshake_ref.uuid
);
DiagnostisServiceClient::connect_rsd(provider_ref, handshake_ref).await DiagnostisServiceClient::connect_rsd(provider_ref, handshake_ref).await
}); });
match res { match res {
Ok(client) => { Ok(client) => {
debug!("Connected to DiagnosticsService");
let boxed = Box::new(DiagnosticsServiceHandle(client)); let boxed = Box::new(DiagnosticsServiceHandle(client));
unsafe { *handle = Box::into_raw(boxed) }; unsafe { *handle = Box::into_raw(boxed) };
null_mut() null_mut()

View File

@@ -3,7 +3,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use log::warn; use log::{debug, warn};
use serde::Deserialize; use serde::Deserialize;
use crate::{IdeviceError, ReadWrite, RemoteXpcClient, provider::RsdProvider}; use crate::{IdeviceError, ReadWrite, RemoteXpcClient, provider::RsdProvider};
@@ -169,6 +169,10 @@ impl RsdHandshake {
} }
}; };
debug!(
"Connecting to RSD service {service_name} on port {}",
service.port
);
let stream = provider.connect_to_service_port(service.port).await?; let stream = provider.connect_to_service_port(service.port).await?;
T::from_stream(stream).await T::from_stream(stream).await
} }

View File

@@ -205,6 +205,7 @@ impl Adapter {
// Wait for the syn ack // Wait for the syn ack
self.states.insert(host_port, state); self.states.insert(host_port, state);
let start_time = std::time::Instant::now();
loop { loop {
self.process_tcp_packet().await?; self.process_tcp_packet().await?;
if let Some(s) = self.states.get(&host_port) { if let Some(s) = self.states.get(&host_port) {
@@ -216,6 +217,12 @@ impl Adapter {
return Err(std::io::Error::new(e, "failed to connect")); return Err(std::io::Error::new(e, "failed to connect"));
} }
ConnectionStatus::WaitingForSyn => { ConnectionStatus::WaitingForSyn => {
if start_time.elapsed() > std::time::Duration::from_secs(5) {
return Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"didn't syn in time",
));
}
continue; continue;
} }
} }
@@ -526,7 +533,7 @@ impl Adapter {
self.write_buffer_flush().await?; self.write_buffer_flush().await?;
Ok(loop { Ok(loop {
// try the data we already have // try the data we already have
match Ipv6Packet::parse(&self.read_buf[..self.bytes_in_buf]) { match Ipv6Packet::parse(&self.read_buf[..self.bytes_in_buf], &self.pcap) {
IpParseError::Ok { IpParseError::Ok {
packet, packet,
bytes_consumed, bytes_consumed,
@@ -559,9 +566,16 @@ impl Adapter {
} }
pub(crate) async fn process_tcp_packet(&mut self) -> Result<(), std::io::Error> { pub(crate) async fn process_tcp_packet(&mut self) -> Result<(), std::io::Error> {
let ip_packet = self.read_ip_packet().await?; tokio::select! {
ip_packet = self.read_ip_packet() => {
let ip_packet = ip_packet?;
self.process_tcp_packet_from_payload(&ip_packet).await self.process_tcp_packet_from_payload(&ip_packet).await
} }
_ = tokio::time::sleep(std::time::Duration::from_secs(15)) => {
Ok(())
}
}
}
pub(crate) async fn process_tcp_packet_from_payload( pub(crate) async fn process_tcp_packet_from_payload(
&mut self, &mut self,

View File

@@ -12,6 +12,7 @@ use log::trace;
use tokio::{ use tokio::{
io::{AsyncRead, AsyncWrite}, io::{AsyncRead, AsyncWrite},
sync::oneshot, sync::oneshot,
time::timeout,
}; };
use crate::tcp::adapter::ConnectionStatus; use crate::tcp::adapter::ConnectionStatus;
@@ -172,8 +173,8 @@ impl AdapterHandle {
)); ));
} }
match res_rx.await { match timeout(std::time::Duration::from_secs(8), res_rx).await {
Ok(r) => { Ok(Ok(r)) => {
let (host_port, recv_channel) = r?; let (host_port, recv_channel) = r?;
Ok(StreamHandle { Ok(StreamHandle {
host_port, host_port,
@@ -183,10 +184,14 @@ impl AdapterHandle {
pending_writes: FuturesUnordered::new(), pending_writes: FuturesUnordered::new(),
}) })
} }
Err(_) => Err(std::io::Error::new( Ok(Err(_)) => Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe, std::io::ErrorKind::BrokenPipe,
"adapter closed", "adapter closed",
)), )),
Err(_) => Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"channel recv timeout",
)),
} }
} }

View File

@@ -230,7 +230,10 @@ pub(crate) enum IpParseError<T> {
} }
impl Ipv6Packet { impl Ipv6Packet {
pub(crate) fn parse(packet: &[u8]) -> IpParseError<Ipv6Packet> { pub(crate) fn parse(
packet: &[u8],
log: &Option<Arc<Mutex<tokio::fs::File>>>,
) -> IpParseError<Ipv6Packet> {
if packet.len() < 40 { if packet.len() < 40 {
return IpParseError::NotEnough; return IpParseError::NotEnough;
} }
@@ -275,6 +278,13 @@ impl Ipv6Packet {
); );
let payload = packet[40..total_packet_len].to_vec(); let payload = packet[40..total_packet_len].to_vec();
if let Some(log) = log {
let mut log_packet = Vec::new();
log_packet.extend_from_slice(&packet[..40]);
log_packet.extend_from_slice(&payload);
super::log_packet(log, &log_packet);
}
IpParseError::Ok { IpParseError::Ok {
packet: Self { packet: Self {
version, version,
@@ -699,7 +709,7 @@ mod tests {
); );
println!("{b1:02X?}"); println!("{b1:02X?}");
let ip1 = Ipv6Packet::parse(&b1); let ip1 = Ipv6Packet::parse(&b1, &None);
println!("{ip1:#?}"); println!("{ip1:#?}");
} }