Add drop to AdapterStream

This commit is contained in:
Jackson Coxson
2025-05-23 02:01:52 -06:00
parent f4ba4210fa
commit 22034edcab
2 changed files with 39 additions and 17 deletions

View File

@@ -11,8 +11,7 @@
//! - Optional PCAP packet capture
//! - Implements `AsyncRead` and `AsyncWrite` for Tokio compatibility
//!
//! # Limitations
//! - Only supports one connection at a time
//! # Limitations (unecessary for CDTunnel)
//! - No proper sequence number tracking
//! - No retransmission or congestion control
//! - Requires 100% reliable underlying transport
@@ -22,8 +21,8 @@
//! ```rust,no_run
//! use std::net::{IpAddr, Ipv4Addr};
//! use tokio::io::{AsyncReadExt, AsyncWriteExt};
//! use your_crate::tcp::Adapter;
//! use your_crate::ReadWrite; // Assuming you have a ReadWrite trait
//! use idevice::tcp::{adapter::Adapter, stream::AdapterStream};
//! use idevice::ReadWrite;
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -39,19 +38,19 @@
//! adapter.pcap("capture.pcap").await?;
//!
//! // Connect to remote server
//! adapter.connect(80).await?;
//! let stream = AdapterStream::new(&mut adapter, 80).await?;
//!
//! // Send HTTP request
//! adapter.write_all(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n").await?;
//! adapter.flush().await?;
//! stream.write_all(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n").await?;
//! stream.flush().await?;
//!
//! // Read response
//! let mut buf = vec![0; 1024];
//! let n = adapter.read(&mut buf).await?;
//! let n = stream.read(&mut buf).await?;
//! println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
//!
//! // Close connection
//! adapter.close().await?;
//! stream.close().await?;
//!
//! Ok(())
//! }
@@ -59,7 +58,7 @@
//!
//! # Warning
//! This implementation makes significant simplifications and should not be used
//! in production environments or with unreliable network transports.
//! with unreliable network transports.
use std::{collections::HashMap, io::ErrorKind, net::IpAddr, path::Path, sync::Arc};
@@ -106,11 +105,8 @@ impl ConnectionState {
///
/// This is an extremely naive, limited, and dangerous TCP stack implementation.
/// Key limitations:
/// - Only one connection can be active at a time
/// - ACKs aren't properly tracked and are silently ignored
/// - Should only be used when the underlying transport is 100% reliable
///
/// The adapter implements `AsyncRead` and `AsyncWrite` for convenient IO operations.
#[derive(Debug)]
pub struct Adapter {
/// The underlying transport connection
@@ -120,9 +116,10 @@ pub struct Adapter {
/// The remote peer's IP address
peer_ip: IpAddr,
/// The states of the connections
states: HashMap<u16, ConnectionState>, // host port by state
dropped: Vec<u16>,
// Logging
/// Optional PCAP file for packet logging
pcap: Option<Arc<Mutex<tokio::fs::File>>>,
}
@@ -136,13 +133,14 @@ impl Adapter {
/// * `peer_ip` - The remote IP address to connect to
///
/// # Returns
/// A new unconnected `Adapter` instance
/// A new `Adapter` instance
pub fn new(peer: Box<dyn ReadWrite>, host_ip: IpAddr, peer_ip: IpAddr) -> Self {
Self {
peer,
host_ip,
peer_ip,
states: HashMap::new(),
dropped: Vec::new(),
pcap: None,
}
}
@@ -153,7 +151,7 @@ impl Adapter {
/// * `port` - The remote port number to connect to
///
/// # Returns
/// * `Ok(())` if connection was successful
/// * `Ok(u16)` the chosen host port if successful
/// * `Err(std::io::Error)` if connection failed
///
/// # Errors
@@ -362,6 +360,10 @@ impl Adapter {
Ok(())
}
pub(crate) fn connection_drop(&mut self, host_port: u16) {
self.dropped.push(host_port);
}
/// Flushes the packets
pub(crate) async fn write_buffer_flush(&mut self) -> Result<(), std::io::Error> {
for (_, state) in self.states.clone() {
@@ -377,6 +379,18 @@ impl Adapter {
state.write_buffer.clear();
}
}
// Since we have extra clocks and we haven't been cancelled by the runtime, let's reap the
// dropped connections
for d in self.dropped.clone() {
if let Some(state) = self.states.remove(&d) {
self.close(state.host_port).await.ok();
}
}
// We can't clear until it's all done, since we can get cancelled by the runtime at any
// point.
self.dropped.clear();
Ok(())
}

View File

@@ -1,4 +1,4 @@
// Jackson Coxson
//! A stream for the adapter
use std::{future::Future, task::Poll};
@@ -17,6 +17,7 @@ pub struct AdapterStream<'a> {
}
impl<'a> AdapterStream<'a> {
/// Connect to the specified port
pub async fn connect(adapter: &'a mut Adapter, port: u16) -> Result<Self, std::io::Error> {
let host_port = adapter.connect(port).await?;
Ok(Self {
@@ -26,6 +27,7 @@ impl<'a> AdapterStream<'a> {
})
}
/// Gracefully closes the stream
pub async fn close(&mut self) -> Result<(), std::io::Error> {
self.adapter.close(self.host_port).await
}
@@ -156,3 +158,9 @@ impl AsyncWrite for AdapterStream<'_> {
future.poll(cx)
}
}
impl Drop for AdapterStream<'_> {
fn drop(&mut self) {
self.adapter.connection_drop(self.host_port);
}
}