From 2a37865340f1cc29671ad6b9002da1ffb09c766c Mon Sep 17 00:00:00 2001 From: Jackson Coxson Date: Thu, 21 Aug 2025 08:48:10 -0600 Subject: [PATCH] Keep buffered IP packet read internal to struct --- idevice/src/tcp/adapter.rs | 53 +++++++++++----- idevice/src/tcp/handle.rs | 125 +++++++++++++------------------------ idevice/src/tcp/packets.rs | 4 +- 3 files changed, 83 insertions(+), 99 deletions(-) diff --git a/idevice/src/tcp/adapter.rs b/idevice/src/tcp/adapter.rs index 287b59f..c366992 100644 --- a/idevice/src/tcp/adapter.rs +++ b/idevice/src/tcp/adapter.rs @@ -63,9 +63,12 @@ use std::{collections::HashMap, io::ErrorKind, net::IpAddr, path::Path, sync::Arc}; use log::{debug, trace, warn}; -use tokio::{io::AsyncWriteExt, sync::Mutex}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + sync::Mutex, +}; -use crate::ReadWrite; +use crate::{ReadWrite, tcp::packets::IpParseError}; use super::packets::{Ipv4Packet, Ipv6Packet, ProtocolNumber, TcpFlags, TcpPacket}; @@ -112,7 +115,7 @@ impl ConnectionState { #[derive(Debug)] pub struct Adapter { /// The underlying transport connection - pub(crate) peer: Box, + peer: Box, /// The local IP address host_ip: IpAddr, /// The remote peer's IP address @@ -121,6 +124,8 @@ pub struct Adapter { /// The states of the connections states: HashMap, // host port by state dropped: Vec, + read_buf: [u8; 4096], + bytes_in_buf: usize, /// Optional PCAP file for packet logging pcap: Option>>, @@ -143,6 +148,8 @@ impl Adapter { peer_ip, states: HashMap::new(), dropped: Vec::new(), + read_buf: [0u8; 4096], + bytes_in_buf: 0, pcap: None, } } @@ -518,22 +525,36 @@ impl Adapter { async fn read_ip_packet(&mut self) -> Result, std::io::Error> { self.write_buffer_flush().await?; Ok(loop { - match self.host_ip { - IpAddr::V4(_) => { - let packet = Ipv4Packet::from_reader(&mut self.peer, &self.pcap).await?; - trace!("IPv4 packet: {packet:#?}"); - if packet.protocol == 6 { - break packet.payload; - } + // try the data we already have + match Ipv6Packet::parse(&self.read_buf[..self.bytes_in_buf]) { + IpParseError::Ok { + packet, + bytes_consumed, + } => { + // And remove it from the buffer by shifting the remaining bytes + self.read_buf + .copy_within(bytes_consumed..self.bytes_in_buf, 0); + self.bytes_in_buf -= bytes_consumed; + break packet.payload; } - IpAddr::V6(_) => { - let packet = Ipv6Packet::from_reader(&mut self.peer, &self.pcap).await?; - trace!("IPv6 packet: {packet:#?}"); - if packet.next_header == 6 { - break packet.payload; - } + IpParseError::NotEnough => { + // Buffer doesn't have a full packet, wait for the next read + } + IpParseError::Invalid => { + // Corrupted data, close the connection + return Err(std::io::Error::new( + ErrorKind::InvalidData, + "invalid IPv6 parse", + )); } } + // go get more + let s = self + .peer + .read(&mut self.read_buf[self.bytes_in_buf..]) + .await?; + + self.bytes_in_buf += s; }) } diff --git a/idevice/src/tcp/handle.rs b/idevice/src/tcp/handle.rs index b7049a9..7da3aff 100644 --- a/idevice/src/tcp/handle.rs +++ b/idevice/src/tcp/handle.rs @@ -8,16 +8,13 @@ use std::{collections::HashMap, path::PathBuf, sync::Mutex, task::Poll}; use crossfire::{AsyncRx, MTx, Tx, mpsc, spsc, stream::AsyncStream}; use futures::{StreamExt, stream::FuturesUnordered}; -use log::{debug, trace}; +use log::trace; use tokio::{ - io::{AsyncRead, AsyncReadExt, AsyncWrite}, + io::{AsyncRead, AsyncWrite}, sync::oneshot, }; -use crate::tcp::{ - adapter::ConnectionStatus, - packets::{IpParseError, Ipv6Packet}, -}; +use crate::tcp::adapter::ConnectionStatus; pub type ConnectToPortRes = oneshot::Sender, std::io::Error>>), std::io::Error>>; @@ -54,8 +51,6 @@ impl AdapterHandle { let mut handles: HashMap, std::io::Error>>> = HashMap::new(); let mut tick = tokio::time::interval(std::time::Duration::from_millis(1)); - let mut read_buf = [0u8; 4096]; - let mut bytes_in_buf = 0; loop { tokio::select! { // check for messages for us @@ -102,85 +97,53 @@ impl AdapterHandle { } } - result = adapter.peer.read(&mut read_buf[bytes_in_buf..]) => { - match result { - Ok(0) => { - debug!("Underlying stream closed (EOF)"); - break; // Exit the main actor loop - } - Ok(s) => { - bytes_in_buf += s; - loop { - match Ipv6Packet::parse(&read_buf[..bytes_in_buf]) { - IpParseError::Ok { packet, bytes_consumed } => { - // We got a full packet! Process it. - if let Err(e) = adapter.process_tcp_packet_from_payload(&packet.payload).await { - debug!("CRITICAL: Failed to process IP packet: {e:?}"); - } - - // And remove it from the buffer by shifting the remaining bytes - read_buf.copy_within(bytes_consumed..bytes_in_buf, 0); - bytes_in_buf -= bytes_consumed; - // Push any newly available bytes to per-conn channels - let mut dead = Vec::new(); - for (&hp, tx) in &handles { - match adapter.uncache_all(hp) { - Ok(buf) if !buf.is_empty() => { - if tx.send(Ok(buf)).is_err() { - dead.push(hp); - } - } - Err(e) => { - let _ = tx.send(Err(e)); - dead.push(hp); - } - _ => {} - } - } - for hp in dead { - handles.remove(&hp); - let _ = adapter.close(hp).await; - } - - let mut to_close = Vec::new(); - for (&hp, tx) in &handles { - if let Ok(ConnectionStatus::Error(kind)) = adapter.get_status(hp) { - if kind == std::io::ErrorKind::UnexpectedEof { - to_close.push(hp); - } else { - let _ = tx.send(Err(std::io::Error::from(kind))); - to_close.push(hp); - } - } - } - for hp in to_close { - handles.remove(&hp); - // Best-effort close. For RST this just tidies state on our side - let _ = adapter.close(hp).await; - } - } - IpParseError::NotEnough => { - // Buffer doesn't have a full packet, wait for the next read - break; - } - IpParseError::Invalid => { - // Corrupted data, close the connection - // ... (error handling) ... - return; - } - } - } - - } - Err(e) => { - debug!("Failed to read: {e:?}, closing stack"); + r = adapter.process_tcp_packet() => { + if let Err(e) = r { + // propagate error to all streams; close them for (hp, tx) in handles.drain() { let _ = tx.send(Err(e.kind().into())); // or clone/convert let _ = adapter.close(hp).await; } - break; + break; + } + + // Push any newly available bytes to per-conn channels + let mut dead = Vec::new(); + for (&hp, tx) in &handles { + match adapter.uncache_all(hp) { + Ok(buf) if !buf.is_empty() => { + if tx.send(Ok(buf)).is_err() { + dead.push(hp); + } + } + Err(e) => { + let _ = tx.send(Err(e)); + dead.push(hp); + } + _ => {} } } + for hp in dead { + handles.remove(&hp); + let _ = adapter.close(hp).await; + } + + let mut to_close = Vec::new(); + for (&hp, tx) in &handles { + if let Ok(ConnectionStatus::Error(kind)) = adapter.get_status(hp) { + if kind == std::io::ErrorKind::UnexpectedEof { + to_close.push(hp); + } else { + let _ = tx.send(Err(std::io::Error::from(kind))); + to_close.push(hp); + } + } + } + for hp in to_close { + handles.remove(&hp); + // Best-effort close. For RST this just tidies state on our side + let _ = adapter.close(hp).await; + } } _ = tick.tick() => { diff --git a/idevice/src/tcp/packets.rs b/idevice/src/tcp/packets.rs index 0186ebe..251dc14 100644 --- a/idevice/src/tcp/packets.rs +++ b/idevice/src/tcp/packets.rs @@ -110,7 +110,7 @@ impl Ipv4Packet { let ihl = (version_ihl & 0x0F) * 4; if version != 4 || ihl < 20 { - debug!("Got an invalid IPv4 header"); + debug!("Got an invalid IPv4 header from reader"); return Err(std::io::Error::new( std::io::ErrorKind::InvalidData, "Invalid IPv4 header", @@ -306,7 +306,7 @@ impl Ipv6Packet { let version = header[0] >> 4; if version != 6 { - debug!("Got an invalid IPv6 header"); + debug!("Got an invalid IPv6 header from reader"); return Err(std::io::Error::new( std::io::ErrorKind::InvalidData, "Invalid IPv6 header",