mirror of
https://github.com/jkcoxson/idevice.git
synced 2026-03-02 06:26:15 +01:00
Keep buffered IP packet read internal to struct
This commit is contained in:
@@ -63,9 +63,12 @@
|
||||
use std::{collections::HashMap, io::ErrorKind, net::IpAddr, path::Path, sync::Arc};
|
||||
|
||||
use log::{debug, trace, warn};
|
||||
use tokio::{io::AsyncWriteExt, sync::Mutex};
|
||||
use tokio::{
|
||||
io::{AsyncReadExt, AsyncWriteExt},
|
||||
sync::Mutex,
|
||||
};
|
||||
|
||||
use crate::ReadWrite;
|
||||
use crate::{ReadWrite, tcp::packets::IpParseError};
|
||||
|
||||
use super::packets::{Ipv4Packet, Ipv6Packet, ProtocolNumber, TcpFlags, TcpPacket};
|
||||
|
||||
@@ -112,7 +115,7 @@ impl ConnectionState {
|
||||
#[derive(Debug)]
|
||||
pub struct Adapter {
|
||||
/// The underlying transport connection
|
||||
pub(crate) peer: Box<dyn ReadWrite>,
|
||||
peer: Box<dyn ReadWrite>,
|
||||
/// The local IP address
|
||||
host_ip: IpAddr,
|
||||
/// The remote peer's IP address
|
||||
@@ -121,6 +124,8 @@ pub struct Adapter {
|
||||
/// The states of the connections
|
||||
states: HashMap<u16, ConnectionState>, // host port by state
|
||||
dropped: Vec<u16>,
|
||||
read_buf: [u8; 4096],
|
||||
bytes_in_buf: usize,
|
||||
|
||||
/// Optional PCAP file for packet logging
|
||||
pcap: Option<Arc<Mutex<tokio::fs::File>>>,
|
||||
@@ -143,6 +148,8 @@ impl Adapter {
|
||||
peer_ip,
|
||||
states: HashMap::new(),
|
||||
dropped: Vec::new(),
|
||||
read_buf: [0u8; 4096],
|
||||
bytes_in_buf: 0,
|
||||
pcap: None,
|
||||
}
|
||||
}
|
||||
@@ -518,22 +525,36 @@ impl Adapter {
|
||||
async fn read_ip_packet(&mut self) -> Result<Vec<u8>, std::io::Error> {
|
||||
self.write_buffer_flush().await?;
|
||||
Ok(loop {
|
||||
match self.host_ip {
|
||||
IpAddr::V4(_) => {
|
||||
let packet = Ipv4Packet::from_reader(&mut self.peer, &self.pcap).await?;
|
||||
trace!("IPv4 packet: {packet:#?}");
|
||||
if packet.protocol == 6 {
|
||||
// try the data we already have
|
||||
match Ipv6Packet::parse(&self.read_buf[..self.bytes_in_buf]) {
|
||||
IpParseError::Ok {
|
||||
packet,
|
||||
bytes_consumed,
|
||||
} => {
|
||||
// And remove it from the buffer by shifting the remaining bytes
|
||||
self.read_buf
|
||||
.copy_within(bytes_consumed..self.bytes_in_buf, 0);
|
||||
self.bytes_in_buf -= bytes_consumed;
|
||||
break packet.payload;
|
||||
}
|
||||
IpParseError::NotEnough => {
|
||||
// Buffer doesn't have a full packet, wait for the next read
|
||||
}
|
||||
IpAddr::V6(_) => {
|
||||
let packet = Ipv6Packet::from_reader(&mut self.peer, &self.pcap).await?;
|
||||
trace!("IPv6 packet: {packet:#?}");
|
||||
if packet.next_header == 6 {
|
||||
break packet.payload;
|
||||
}
|
||||
IpParseError::Invalid => {
|
||||
// Corrupted data, close the connection
|
||||
return Err(std::io::Error::new(
|
||||
ErrorKind::InvalidData,
|
||||
"invalid IPv6 parse",
|
||||
));
|
||||
}
|
||||
}
|
||||
// go get more
|
||||
let s = self
|
||||
.peer
|
||||
.read(&mut self.read_buf[self.bytes_in_buf..])
|
||||
.await?;
|
||||
|
||||
self.bytes_in_buf += s;
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -8,16 +8,13 @@ use std::{collections::HashMap, path::PathBuf, sync::Mutex, task::Poll};
|
||||
|
||||
use crossfire::{AsyncRx, MTx, Tx, mpsc, spsc, stream::AsyncStream};
|
||||
use futures::{StreamExt, stream::FuturesUnordered};
|
||||
use log::{debug, trace};
|
||||
use log::trace;
|
||||
use tokio::{
|
||||
io::{AsyncRead, AsyncReadExt, AsyncWrite},
|
||||
io::{AsyncRead, AsyncWrite},
|
||||
sync::oneshot,
|
||||
};
|
||||
|
||||
use crate::tcp::{
|
||||
adapter::ConnectionStatus,
|
||||
packets::{IpParseError, Ipv6Packet},
|
||||
};
|
||||
use crate::tcp::adapter::ConnectionStatus;
|
||||
|
||||
pub type ConnectToPortRes =
|
||||
oneshot::Sender<Result<(u16, AsyncRx<Result<Vec<u8>, std::io::Error>>), std::io::Error>>;
|
||||
@@ -54,8 +51,6 @@ impl AdapterHandle {
|
||||
let mut handles: HashMap<u16, Tx<Result<Vec<u8>, std::io::Error>>> = HashMap::new();
|
||||
let mut tick = tokio::time::interval(std::time::Duration::from_millis(1));
|
||||
|
||||
let mut read_buf = [0u8; 4096];
|
||||
let mut bytes_in_buf = 0;
|
||||
loop {
|
||||
tokio::select! {
|
||||
// check for messages for us
|
||||
@@ -102,25 +97,16 @@ impl AdapterHandle {
|
||||
}
|
||||
}
|
||||
|
||||
result = adapter.peer.read(&mut read_buf[bytes_in_buf..]) => {
|
||||
match result {
|
||||
Ok(0) => {
|
||||
debug!("Underlying stream closed (EOF)");
|
||||
break; // Exit the main actor loop
|
||||
r = adapter.process_tcp_packet() => {
|
||||
if let Err(e) = r {
|
||||
// propagate error to all streams; close them
|
||||
for (hp, tx) in handles.drain() {
|
||||
let _ = tx.send(Err(e.kind().into())); // or clone/convert
|
||||
let _ = adapter.close(hp).await;
|
||||
}
|
||||
Ok(s) => {
|
||||
bytes_in_buf += s;
|
||||
loop {
|
||||
match Ipv6Packet::parse(&read_buf[..bytes_in_buf]) {
|
||||
IpParseError::Ok { packet, bytes_consumed } => {
|
||||
// We got a full packet! Process it.
|
||||
if let Err(e) = adapter.process_tcp_packet_from_payload(&packet.payload).await {
|
||||
debug!("CRITICAL: Failed to process IP packet: {e:?}");
|
||||
break;
|
||||
}
|
||||
|
||||
// And remove it from the buffer by shifting the remaining bytes
|
||||
read_buf.copy_within(bytes_consumed..bytes_in_buf, 0);
|
||||
bytes_in_buf -= bytes_consumed;
|
||||
// Push any newly available bytes to per-conn channels
|
||||
let mut dead = Vec::new();
|
||||
for (&hp, tx) in &handles {
|
||||
@@ -159,29 +145,6 @@ impl AdapterHandle {
|
||||
let _ = adapter.close(hp).await;
|
||||
}
|
||||
}
|
||||
IpParseError::NotEnough => {
|
||||
// Buffer doesn't have a full packet, wait for the next read
|
||||
break;
|
||||
}
|
||||
IpParseError::Invalid => {
|
||||
// Corrupted data, close the connection
|
||||
// ... (error handling) ...
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("Failed to read: {e:?}, closing stack");
|
||||
for (hp, tx) in handles.drain() {
|
||||
let _ = tx.send(Err(e.kind().into())); // or clone/convert
|
||||
let _ = adapter.close(hp).await;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_ = tick.tick() => {
|
||||
let _ = adapter.write_buffer_flush().await;
|
||||
|
||||
@@ -110,7 +110,7 @@ impl Ipv4Packet {
|
||||
let ihl = (version_ihl & 0x0F) * 4;
|
||||
|
||||
if version != 4 || ihl < 20 {
|
||||
debug!("Got an invalid IPv4 header");
|
||||
debug!("Got an invalid IPv4 header from reader");
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
"Invalid IPv4 header",
|
||||
@@ -306,7 +306,7 @@ impl Ipv6Packet {
|
||||
|
||||
let version = header[0] >> 4;
|
||||
if version != 6 {
|
||||
debug!("Got an invalid IPv6 header");
|
||||
debug!("Got an invalid IPv6 header from reader");
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
"Invalid IPv6 header",
|
||||
|
||||
Reference in New Issue
Block a user