diff --git a/idevice/src/tcp/adapter.rs b/idevice/src/tcp/adapter.rs index 2f06326..f1a59ef 100644 --- a/idevice/src/tcp/adapter.rs +++ b/idevice/src/tcp/adapter.rs @@ -11,8 +11,7 @@ //! - Optional PCAP packet capture //! - Implements `AsyncRead` and `AsyncWrite` for Tokio compatibility //! -//! # Limitations -//! - Only supports one connection at a time +//! # Limitations (unecessary for CDTunnel) //! - No proper sequence number tracking //! - No retransmission or congestion control //! - Requires 100% reliable underlying transport @@ -22,8 +21,8 @@ //! ```rust,no_run //! use std::net::{IpAddr, Ipv4Addr}; //! use tokio::io::{AsyncReadExt, AsyncWriteExt}; -//! use your_crate::tcp::Adapter; -//! use your_crate::ReadWrite; // Assuming you have a ReadWrite trait +//! use idevice::tcp::{adapter::Adapter, stream::AdapterStream}; +//! use idevice::ReadWrite; //! //! #[tokio::main] //! async fn main() -> Result<(), Box> { @@ -39,19 +38,19 @@ //! adapter.pcap("capture.pcap").await?; //! //! // Connect to remote server -//! adapter.connect(80).await?; +//! let stream = AdapterStream::new(&mut adapter, 80).await?; //! //! // Send HTTP request -//! adapter.write_all(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n").await?; -//! adapter.flush().await?; +//! stream.write_all(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n").await?; +//! stream.flush().await?; //! //! // Read response //! let mut buf = vec![0; 1024]; -//! let n = adapter.read(&mut buf).await?; +//! let n = stream.read(&mut buf).await?; //! println!("Received: {}", String::from_utf8_lossy(&buf[..n])); //! //! // Close connection -//! adapter.close().await?; +//! stream.close().await?; //! //! Ok(()) //! } @@ -59,7 +58,7 @@ //! //! # Warning //! This implementation makes significant simplifications and should not be used -//! in production environments or with unreliable network transports. +//! with unreliable network transports. use std::{collections::HashMap, io::ErrorKind, net::IpAddr, path::Path, sync::Arc}; @@ -106,11 +105,8 @@ impl ConnectionState { /// /// This is an extremely naive, limited, and dangerous TCP stack implementation. /// Key limitations: -/// - Only one connection can be active at a time /// - ACKs aren't properly tracked and are silently ignored /// - Should only be used when the underlying transport is 100% reliable -/// -/// The adapter implements `AsyncRead` and `AsyncWrite` for convenient IO operations. #[derive(Debug)] pub struct Adapter { /// The underlying transport connection @@ -120,9 +116,10 @@ pub struct Adapter { /// The remote peer's IP address peer_ip: IpAddr, + /// The states of the connections states: HashMap, // host port by state + dropped: Vec, - // Logging /// Optional PCAP file for packet logging pcap: Option>>, } @@ -136,13 +133,14 @@ impl Adapter { /// * `peer_ip` - The remote IP address to connect to /// /// # Returns - /// A new unconnected `Adapter` instance + /// A new `Adapter` instance pub fn new(peer: Box, host_ip: IpAddr, peer_ip: IpAddr) -> Self { Self { peer, host_ip, peer_ip, states: HashMap::new(), + dropped: Vec::new(), pcap: None, } } @@ -153,7 +151,7 @@ impl Adapter { /// * `port` - The remote port number to connect to /// /// # Returns - /// * `Ok(())` if connection was successful + /// * `Ok(u16)` the chosen host port if successful /// * `Err(std::io::Error)` if connection failed /// /// # Errors @@ -362,6 +360,10 @@ impl Adapter { Ok(()) } + pub(crate) fn connection_drop(&mut self, host_port: u16) { + self.dropped.push(host_port); + } + /// Flushes the packets pub(crate) async fn write_buffer_flush(&mut self) -> Result<(), std::io::Error> { for (_, state) in self.states.clone() { @@ -377,6 +379,18 @@ impl Adapter { state.write_buffer.clear(); } } + + // Since we have extra clocks and we haven't been cancelled by the runtime, let's reap the + // dropped connections + for d in self.dropped.clone() { + if let Some(state) = self.states.remove(&d) { + self.close(state.host_port).await.ok(); + } + } + // We can't clear until it's all done, since we can get cancelled by the runtime at any + // point. + self.dropped.clear(); + Ok(()) } diff --git a/idevice/src/tcp/stream.rs b/idevice/src/tcp/stream.rs index 2ada436..54dfde7 100644 --- a/idevice/src/tcp/stream.rs +++ b/idevice/src/tcp/stream.rs @@ -1,4 +1,4 @@ -// Jackson Coxson +//! A stream for the adapter use std::{future::Future, task::Poll}; @@ -17,6 +17,7 @@ pub struct AdapterStream<'a> { } impl<'a> AdapterStream<'a> { + /// Connect to the specified port pub async fn connect(adapter: &'a mut Adapter, port: u16) -> Result { let host_port = adapter.connect(port).await?; Ok(Self { @@ -26,6 +27,7 @@ impl<'a> AdapterStream<'a> { }) } + /// Gracefully closes the stream pub async fn close(&mut self) -> Result<(), std::io::Error> { self.adapter.close(self.host_port).await } @@ -156,3 +158,9 @@ impl AsyncWrite for AdapterStream<'_> { future.poll(cx) } } + +impl Drop for AdapterStream<'_> { + fn drop(&mut self) { + self.adapter.connection_drop(self.host_port); + } +}