From 0a0899cd8a10b98adc4c60c86a0fdf8617895e91 Mon Sep 17 00:00:00 2001 From: Jackson Coxson Date: Mon, 11 Aug 2025 10:41:55 -0600 Subject: [PATCH] Create concurrent TCP handle and implement RSD for non-lifetime structs --- Cargo.lock | 75 ++++ ffi/libplist | 1 - ffi/src/adapter.rs | 2 +- ffi/src/core_device/app_service.rs | 14 +- ffi/src/core_device_proxy.rs | 5 +- ffi/src/debug_proxy.rs | 14 +- ffi/src/remote_server.rs | 3 +- idevice/Cargo.toml | 9 +- idevice/src/lib.rs | 22 +- idevice/src/provider.rs | 19 +- .../src/services/core_device/app_service.rs | 12 +- idevice/src/services/core_device/mod.rs | 6 - idevice/src/services/debug_proxy.rs | 6 +- idevice/src/services/dvt/mod.rs | 6 +- idevice/src/services/misagent.rs | 14 +- idevice/src/services/restore_service.rs | 20 +- idevice/src/services/rsd.rs | 8 +- idevice/src/tcp/adapter.rs | 22 +- idevice/src/tcp/handle.rs | 339 ++++++++++++++++++ idevice/src/tcp/mod.rs | 16 +- idevice/src/xpc/http2/mod.rs | 7 - idevice/src/xpc/mod.rs | 8 - tools/src/app_service.rs | 9 +- tools/src/common.rs | 4 +- tools/src/debug_proxy.rs | 9 +- tools/src/location_simulation.rs | 12 +- tools/src/process_control.rs | 12 +- tools/src/restore_service.rs | 11 +- 28 files changed, 536 insertions(+), 149 deletions(-) delete mode 160000 ffi/libplist create mode 100644 idevice/src/tcp/handle.rs diff --git a/Cargo.lock b/Cargo.lock index b059b5b..ccb56be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -476,12 +476,74 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "crossfire" +version = "2.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3881655583172e9ff313a26f910f262271e331c4af54e22ebd395f8d97da3629" +dependencies = [ + "crossbeam", + "enum_dispatch", + "futures", + "parking_lot", +] + [[package]] name = "crypto-common" version = "0.1.6" @@ -568,6 +630,18 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "enum_dispatch" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd" +dependencies = [ + "once_cell", + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "env_filter" version = "0.1.3" @@ -1073,6 +1147,7 @@ dependencies = [ "byteorder", "bytes", "chrono", + "crossfire", "env_logger", "futures", "indexmap", diff --git a/ffi/libplist b/ffi/libplist deleted file mode 160000 index cf5897a..0000000 --- a/ffi/libplist +++ /dev/null @@ -1 +0,0 @@ -Subproject commit cf5897a71ea412ea2aeb1e2f6b5ea74d4fabfd8c diff --git a/ffi/src/adapter.rs b/ffi/src/adapter.rs index 553162a..79c7a8d 100644 --- a/ffi/src/adapter.rs +++ b/ffi/src/adapter.rs @@ -35,7 +35,7 @@ pub unsafe extern "C" fn adapter_connect( } let adapter = unsafe { &mut (*adapter_handle).0 }; - let res = RUNTIME.block_on(async move { AdapterStream::connect(adapter, port).await }); + let res = RUNTIME.block_on(async move { adapter.connect(port).await }); match res { Ok(r) => { diff --git a/ffi/src/core_device/app_service.rs b/ffi/src/core_device/app_service.rs index a877485..5aac7a4 100644 --- a/ffi/src/core_device/app_service.rs +++ b/ffi/src/core_device/app_service.rs @@ -5,7 +5,6 @@ use std::os::raw::{c_float, c_int}; use std::ptr::{self, null_mut}; use idevice::core_device::AppServiceClient; -use idevice::tcp::stream::AdapterStream; use idevice::{IdeviceError, ReadWrite, RsdService}; use crate::core_device_proxy::AdapterHandle; @@ -91,16 +90,17 @@ pub unsafe extern "C" fn app_service_connect_rsd( return ffi_err!(IdeviceError::FfiInvalidArg); } - let res: Result, IdeviceError> = RUNTIME.block_on(async move { - let provider_ref = unsafe { &mut (*provider).0 }; - let handshake_ref = unsafe { &mut (*handshake).0 }; + let res: Result>, IdeviceError> = + RUNTIME.block_on(async move { + let provider_ref = unsafe { &mut (*provider).0 }; + let handshake_ref = unsafe { &mut (*handshake).0 }; - AppServiceClient::connect_rsd(provider_ref, handshake_ref).await - }); + AppServiceClient::connect_rsd(provider_ref, handshake_ref).await + }); match res { Ok(client) => { - let boxed = Box::new(AppServiceHandle(client.box_inner())); + let boxed = Box::new(AppServiceHandle(client)); unsafe { *handle = Box::into_raw(boxed) }; null_mut() } diff --git a/ffi/src/core_device_proxy.rs b/ffi/src/core_device_proxy.rs index 2d9960e..8282681 100644 --- a/ffi/src/core_device_proxy.rs +++ b/ffi/src/core_device_proxy.rs @@ -7,13 +7,12 @@ use std::{ use idevice::{ IdeviceError, IdeviceService, core_device_proxy::CoreDeviceProxy, provider::IdeviceProvider, - tcp::adapter::Adapter, }; use crate::{IdeviceFfiError, IdeviceHandle, RUNTIME, ffi_err, provider::IdeviceProviderHandle}; pub struct CoreDeviceProxyHandle(pub CoreDeviceProxy); -pub struct AdapterHandle(pub Adapter); +pub struct AdapterHandle(pub idevice::tcp::handle::AdapterHandle); /// Automatically creates and connects to Core Device Proxy, returning a client handle /// @@ -312,7 +311,7 @@ pub unsafe extern "C" fn core_device_proxy_create_tcp_adapter( match result { Ok(adapter_obj) => { - let boxed = Box::new(AdapterHandle(adapter_obj)); + let boxed = Box::new(AdapterHandle(adapter_obj.to_async_handle())); unsafe { *adapter = Box::into_raw(boxed) }; null_mut() } diff --git a/ffi/src/debug_proxy.rs b/ffi/src/debug_proxy.rs index 1ab061e..3bf7eef 100644 --- a/ffi/src/debug_proxy.rs +++ b/ffi/src/debug_proxy.rs @@ -5,7 +5,6 @@ use std::os::raw::c_int; use std::ptr::{self, null_mut}; use idevice::debug_proxy::{DebugProxyClient, DebugserverCommand}; -use idevice::tcp::stream::AdapterStream; use idevice::{IdeviceError, ReadWrite, RsdService}; use crate::core_device_proxy::AdapterHandle; @@ -136,13 +135,14 @@ pub unsafe extern "C" fn debug_proxy_connect_rsd( if provider.is_null() || handshake.is_null() || handshake.is_null() { return ffi_err!(IdeviceError::FfiInvalidArg); } - let res: Result, IdeviceError> = RUNTIME.block_on(async move { - let provider_ref = unsafe { &mut (*provider).0 }; - let handshake_ref = unsafe { &mut (*handshake).0 }; + let res: Result>, IdeviceError> = + RUNTIME.block_on(async move { + let provider_ref = unsafe { &mut (*provider).0 }; + let handshake_ref = unsafe { &mut (*handshake).0 }; - // Connect using the reference - DebugProxyClient::connect_rsd(provider_ref, handshake_ref).await - }); + // Connect using the reference + DebugProxyClient::connect_rsd(provider_ref, handshake_ref).await + }); match res { Ok(d) => { diff --git a/ffi/src/remote_server.rs b/ffi/src/remote_server.rs index 0646d11..a9d3c54 100644 --- a/ffi/src/remote_server.rs +++ b/ffi/src/remote_server.rs @@ -6,7 +6,6 @@ use crate::core_device_proxy::AdapterHandle; use crate::rsd::RsdHandshakeHandle; use crate::{IdeviceFfiError, RUNTIME, ReadWriteOpaque, ffi_err}; use idevice::dvt::remote_server::RemoteServerClient; -use idevice::tcp::stream::AdapterStream; use idevice::{IdeviceError, ReadWrite, RsdService}; /// Opaque handle to a RemoteServerClient @@ -77,7 +76,7 @@ pub unsafe extern "C" fn remote_server_connect_rsd( if provider.is_null() || handshake.is_null() || handshake.is_null() { return ffi_err!(IdeviceError::FfiInvalidArg); } - let res: Result, IdeviceError> = + let res: Result>, IdeviceError> = RUNTIME.block_on(async move { let provider_ref = unsafe { &mut (*provider).0 }; let handshake_ref = unsafe { &mut (*handshake).0 }; diff --git a/idevice/Cargo.toml b/idevice/Cargo.toml index e28e7ef..765a3b5 100644 --- a/idevice/Cargo.toml +++ b/idevice/Cargo.toml @@ -14,6 +14,7 @@ keywords = ["lockdownd", "ios"] tokio = { version = "1.43", features = ["io-util"] } tokio-rustls = { version = "0.26", default-features = false } rustls = { version = "0.23", default-features = false } +crossfire = { version = "2.0", optional = true } # TODO: update to 2.1 when it comes out plist = { version = "1.7" } serde = { version = "1", features = ["derive"] } @@ -80,7 +81,13 @@ restore_service = [] rsd = ["xpc"] syslog_relay = ["dep:bytes"] tcp = ["tokio/net"] -tunnel_tcp_stack = ["dep:rand", "dep:futures", "tokio/fs", "tokio/sync"] +tunnel_tcp_stack = [ + "dep:rand", + "dep:futures", + "tokio/fs", + "tokio/sync", + "dep:crossfire", +] tss = ["dep:uuid", "dep:reqwest"] tunneld = ["dep:serde_json", "dep:json", "dep:reqwest"] usbmuxd = ["tokio/net"] diff --git a/idevice/src/lib.rs b/idevice/src/lib.rs index 76a3034..b7bbe13 100644 --- a/idevice/src/lib.rs +++ b/idevice/src/lib.rs @@ -25,7 +25,7 @@ pub use services::*; #[cfg(feature = "xpc")] pub use xpc::RemoteXpcClient; -use log::{debug, error, trace}; +use log::{debug, error, trace, warn}; use provider::{IdeviceProvider, RsdProvider}; use rustls::{crypto::CryptoProvider, pki_types::ServerName}; use std::{ @@ -70,20 +70,17 @@ pub trait IdeviceService: Sized { pub trait RsdService: Sized { fn rsd_service_name() -> std::borrow::Cow<'static, str>; fn from_stream( - stream: Self::Stream, + stream: Box, ) -> impl std::future::Future> + Send; - fn connect_rsd<'a, S>( - provider: &'a mut impl RsdProvider<'a, Stream = S>, + fn connect_rsd( + provider: &mut impl RsdProvider, handshake: &mut rsd::RsdHandshake, ) -> impl std::future::Future> where - Self: crate::RsdService, - S: ReadWrite, + Self: crate::RsdService, { handshake.connect(provider) } - - type Stream: ReadWrite; } /// Type alias for boxed device connection sockets @@ -417,9 +414,12 @@ impl Idevice { #[cfg(all(feature = "ring", feature = "aws-lc"))] { - compile_error!( - "Cannot enable both `ring` and `aws-lc` features at the same time" - ); + // We can't throw a compile error because it breaks rust-analyzer. + // My sanity while debugging the workspace crates are more important. + + debug!("Using ring crypto backend, because both were passed"); + warn!("Both ring && aws-lc are selected as idevice crypto backends!"); + rustls::crypto::ring::default_provider() } }; diff --git a/idevice/src/provider.rs b/idevice/src/provider.rs index 917fa18..bc0250e 100644 --- a/idevice/src/provider.rs +++ b/idevice/src/provider.rs @@ -42,12 +42,11 @@ pub trait IdeviceProvider: Unpin + Send + Sync + std::fmt::Debug { ) -> Pin> + Send>>; } -pub trait RsdProvider<'a>: Unpin + Send + Sync + std::fmt::Debug { +pub trait RsdProvider: Unpin + Send + Sync + std::fmt::Debug { fn connect_to_service_port( - &'a mut self, + &mut self, port: u16, - ) -> impl std::future::Future> + Send; - type Stream: ReadWrite; + ) -> impl std::future::Future, IdeviceError>> + Send; } /// TCP-based device connection provider @@ -159,13 +158,13 @@ impl IdeviceProvider for UsbmuxdProvider { } #[cfg(feature = "tcp")] -impl<'a> RsdProvider<'a> for std::net::IpAddr { +impl RsdProvider for std::net::IpAddr { async fn connect_to_service_port( - &'a mut self, + &mut self, port: u16, - ) -> Result { - Ok(tokio::net::TcpStream::connect((*self, port)).await?) + ) -> Result, IdeviceError> { + Ok(Box::new( + tokio::net::TcpStream::connect((*self, port)).await?, + )) } - - type Stream = tokio::net::TcpStream; } diff --git a/idevice/src/services/core_device/app_service.rs b/idevice/src/services/core_device/app_service.rs index 5d276ab..fc74960 100644 --- a/idevice/src/services/core_device/app_service.rs +++ b/idevice/src/services/core_device/app_service.rs @@ -7,18 +7,16 @@ use crate::{obf, IdeviceError, ReadWrite, RsdService}; use super::CoreDeviceServiceClient; -impl RsdService for AppServiceClient { +impl RsdService for AppServiceClient> { fn rsd_service_name() -> std::borrow::Cow<'static, str> { obf!("com.apple.coredevice.appservice") } - async fn from_stream(stream: R) -> Result { + async fn from_stream(stream: Box) -> Result { Ok(Self { inner: CoreDeviceServiceClient::new(stream).await?, }) } - - type Stream = R; } pub struct AppServiceClient { @@ -137,12 +135,6 @@ impl<'a, R: ReadWrite + 'a> AppServiceClient { }) } - pub fn box_inner(self) -> AppServiceClient> { - AppServiceClient { - inner: self.inner.box_inner(), - } - } - pub async fn list_apps( &mut self, app_clips: bool, diff --git a/idevice/src/services/core_device/mod.rs b/idevice/src/services/core_device/mod.rs index b5151d3..e11fd76 100644 --- a/idevice/src/services/core_device/mod.rs +++ b/idevice/src/services/core_device/mod.rs @@ -24,12 +24,6 @@ impl<'a, R: ReadWrite + 'a> CoreDeviceServiceClient { Ok(Self { inner: client }) } - pub fn box_inner(self) -> CoreDeviceServiceClient> { - CoreDeviceServiceClient { - inner: self.inner.box_inner(), - } - } - pub async fn invoke( &mut self, feature: impl Into, diff --git a/idevice/src/services/debug_proxy.rs b/idevice/src/services/debug_proxy.rs index 8fa2a28..ab3237f 100644 --- a/idevice/src/services/debug_proxy.rs +++ b/idevice/src/services/debug_proxy.rs @@ -10,19 +10,17 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use crate::{obf, IdeviceError, ReadWrite, RsdService}; -impl RsdService for DebugProxyClient { +impl RsdService for DebugProxyClient> { fn rsd_service_name() -> std::borrow::Cow<'static, str> { obf!("com.apple.internal.dt.remote.debugproxy") } - async fn from_stream(stream: R) -> Result { + async fn from_stream(stream: Box) -> Result { Ok(Self { socket: stream, noack_mode: false, }) } - - type Stream = R; } /// Client for interacting with the iOS debug proxy service diff --git a/idevice/src/services/dvt/mod.rs b/idevice/src/services/dvt/mod.rs index fced62f..904f584 100644 --- a/idevice/src/services/dvt/mod.rs +++ b/idevice/src/services/dvt/mod.rs @@ -8,14 +8,12 @@ pub mod message; pub mod process_control; pub mod remote_server; -impl RsdService for remote_server::RemoteServerClient { +impl RsdService for remote_server::RemoteServerClient> { fn rsd_service_name() -> std::borrow::Cow<'static, str> { obf!("com.apple.instruments.dtservicehub") } - async fn from_stream(stream: R) -> Result { + async fn from_stream(stream: Box) -> Result { Ok(Self::new(stream)) } - - type Stream = R; } diff --git a/idevice/src/services/misagent.rs b/idevice/src/services/misagent.rs index ab9b4b1..4f2f03d 100644 --- a/idevice/src/services/misagent.rs +++ b/idevice/src/services/misagent.rs @@ -6,7 +6,7 @@ use log::warn; use plist::Dictionary; -use crate::{lockdown::LockdownClient, obf, Idevice, IdeviceError, IdeviceService}; +use crate::{lockdown::LockdownClient, obf, Idevice, IdeviceError, IdeviceService, RsdService}; /// Client for interacting with the iOS misagent service /// @@ -19,6 +19,18 @@ pub struct MisagentClient { pub idevice: Idevice, } +impl RsdService for MisagentClient { + fn rsd_service_name() -> std::borrow::Cow<'static, str> { + obf!("com.apple.misagent.shim.remote") + } + + async fn from_stream(stream: Box) -> Result { + let mut stream = Idevice::new(stream, ""); + stream.rsd_checkin().await?; + Ok(Self::new(stream)) + } +} + impl IdeviceService for MisagentClient { /// Returns the misagent service name as registered with lockdownd fn service_name() -> std::borrow::Cow<'static, str> { diff --git a/idevice/src/services/restore_service.rs b/idevice/src/services/restore_service.rs index aa23568..2942e71 100644 --- a/idevice/src/services/restore_service.rs +++ b/idevice/src/services/restore_service.rs @@ -6,41 +6,33 @@ use plist::Dictionary; use crate::{obf, IdeviceError, ReadWrite, RemoteXpcClient, RsdService}; /// Client for interacting with the Restore Service -pub struct RestoreServiceClient { +pub struct RestoreServiceClient { /// The underlying device connection with established Restore Service service - pub stream: RemoteXpcClient, + pub stream: RemoteXpcClient>, } -impl RsdService for RestoreServiceClient { +impl RsdService for RestoreServiceClient { fn rsd_service_name() -> std::borrow::Cow<'static, str> { obf!("com.apple.RestoreRemoteServices.restoreserviced") } - async fn from_stream(stream: R) -> Result { + async fn from_stream(stream: Box) -> Result { Self::new(stream).await } - - type Stream = R; } -impl<'a, R: ReadWrite + 'a> RestoreServiceClient { +impl RestoreServiceClient { /// Creates a new Restore Service client a socket connection, /// and connects to the RemoteXPC service. /// /// # Arguments /// * `idevice` - Pre-established device connection - pub async fn new(stream: R) -> Result { + pub async fn new(stream: Box) -> Result { let mut stream = RemoteXpcClient::new(stream).await?; stream.do_handshake().await?; Ok(Self { stream }) } - pub fn box_inner(self) -> RestoreServiceClient> { - RestoreServiceClient { - stream: self.stream.box_inner(), - } - } - /// Enter recovery pub async fn enter_recovery(&mut self) -> Result<(), IdeviceError> { let mut req = Dictionary::new(); diff --git a/idevice/src/services/rsd.rs b/idevice/src/services/rsd.rs index 1e78196..58566b9 100644 --- a/idevice/src/services/rsd.rs +++ b/idevice/src/services/rsd.rs @@ -156,13 +156,9 @@ impl RsdHandshake { }) } - pub async fn connect<'a, T, S>( - &mut self, - provider: &'a mut impl RsdProvider<'a, Stream = S>, - ) -> Result + pub async fn connect(&mut self, provider: &mut impl RsdProvider) -> Result where - T: crate::RsdService, - S: ReadWrite, + T: crate::RsdService, { let service_name = T::rsd_service_name(); let service = match self.services.get(&service_name.to_string()) { diff --git a/idevice/src/tcp/adapter.rs b/idevice/src/tcp/adapter.rs index 0724fd0..4bcc943 100644 --- a/idevice/src/tcp/adapter.rs +++ b/idevice/src/tcp/adapter.rs @@ -147,6 +147,13 @@ impl Adapter { } } + /// Wraps this handle in a new thread. + /// Streams from this handle will be thread safe, with data sent through channels. + /// The handle supports the trait for RSD provider. + pub fn to_async_handle(self) -> super::handle::AdapterHandle { + super::handle::AdapterHandle::new(self) + } + /// Initiates a TCP connection to the specified port. /// /// # Arguments @@ -435,6 +442,19 @@ impl Adapter { } } + pub(crate) fn uncache_all(&mut self, host_port: u16) -> Result, std::io::Error> { + if let Some(state) = self.states.get_mut(&host_port) { + let res = state.read_buffer[..].to_vec(); + state.read_buffer.clear(); + Ok(res) + } else { + Err(std::io::Error::new( + ErrorKind::NotConnected, + "not connected", + )) + } + } + pub(crate) fn cache_read( &mut self, payload: &[u8], @@ -517,7 +537,7 @@ impl Adapter { }) } - async fn process_tcp_packet(&mut self) -> Result<(), std::io::Error> { + pub(crate) async fn process_tcp_packet(&mut self) -> Result<(), std::io::Error> { loop { let ip_packet = self.read_ip_packet().await?; let res = TcpPacket::parse(&ip_packet)?; diff --git a/idevice/src/tcp/handle.rs b/idevice/src/tcp/handle.rs new file mode 100644 index 0000000..86c08ac --- /dev/null +++ b/idevice/src/tcp/handle.rs @@ -0,0 +1,339 @@ +// So originally, streams wrote to the adapter via a mutable reference. +// This worked fine for most applications, but the lifetime requirement of the stream +// makes things difficult. This was especially apparent when trying to integrate with lockdown +// services that were swapped on the heap. This will also allow for usage across threads, +// especially in FFI. Judging the tradeoffs, we'll go forward with it. + +use std::{collections::HashMap, path::PathBuf, sync::Mutex, task::Poll}; + +use crossfire::{mpsc, spsc, stream::AsyncStream, AsyncRx, MTx, Tx}; +use futures::{stream::FuturesUnordered, StreamExt}; +use log::trace; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + sync::oneshot, +}; + +pub type ConnectToPortRes = + oneshot::Sender, std::io::Error>>), std::io::Error>>; + +enum HandleMessage { + /// Returns the host port + ConnectToPort { + target: u16, + res: ConnectToPortRes, + }, + Close { + host_port: u16, + }, + Send { + host_port: u16, + data: Vec, + res: oneshot::Sender>, + }, + Pcap { + path: PathBuf, + res: oneshot::Sender>, + }, +} + +#[derive(Debug)] +pub struct AdapterHandle { + sender: MTx, +} + +impl AdapterHandle { + pub fn new(mut adapter: super::adapter::Adapter) -> Self { + let (tx, rx) = mpsc::unbounded_async(); + tokio::spawn(async move { + let mut handles: HashMap, std::io::Error>>> = HashMap::new(); + let mut tick = tokio::time::interval(std::time::Duration::from_millis(1)); + loop { + tokio::select! { + // check for messages for us + msg = rx.recv() => { + match msg { + Ok(m) => match m { + HandleMessage::ConnectToPort { target, res } => { + let connect_response = match adapter.connect(target).await { + Ok(c) => { + let (ptx, prx) = spsc::unbounded_async(); + handles.insert(c, ptx); + Ok((c, prx)) + } + Err(e) => Err(e), + }; + res.send(connect_response).ok(); + } + HandleMessage::Close { host_port } => { + handles.remove(&host_port); + adapter.close(host_port).await.ok(); + } + HandleMessage::Send { + host_port, + data, + res, + } => { + if let Err(e) = adapter.queue_send(&data, host_port) { + res.send(Err(e)).ok(); + } else { + let response = adapter.write_buffer_flush().await; + res.send(response).ok(); + } + } + HandleMessage::Pcap { + path, + res + } => { + res.send(adapter.pcap(path).await).ok(); + } + }, + Err(_) => { + break; + }, + } + } + + r = adapter.process_tcp_packet() => { + if let Err(e) = r { + // propagate error to all streams; close them + for (hp, tx) in handles.drain() { + let _ = tx.send(Err(e.kind().into())); // or clone/convert + let _ = adapter.close(hp).await; + } + break; + } + + // Push any newly available bytes to per-conn channels + let mut dead = Vec::new(); + for (&hp, tx) in &handles { + match adapter.uncache_all(hp) { + Ok(buf) if !buf.is_empty() => { + if tx.send(Ok(buf)).is_err() { + dead.push(hp); + } + } + Err(e) => { + let _ = tx.send(Err(e)); + dead.push(hp); + } + _ => {} + } + } + for hp in dead { + handles.remove(&hp); + let _ = adapter.close(hp).await; + } + } + + _ = tick.tick() => { + let _ = adapter.write_buffer_flush().await; + } + } + } + }); + + Self { sender: tx } + } + + pub async fn connect(&mut self, port: u16) -> Result { + let (res_tx, res_rx) = oneshot::channel(); + if self + .sender + .send(HandleMessage::ConnectToPort { + target: port, + res: res_tx, + }) + .is_err() + { + return Err(std::io::Error::new( + std::io::ErrorKind::NetworkUnreachable, + "adapter closed", + )); + } + + match res_rx.await { + Ok(r) => { + let (host_port, recv_channel) = r?; + Ok(StreamHandle { + host_port, + recv_channel: Mutex::new(recv_channel.into_stream()), + send_channel: self.sender.clone(), + read_buffer: Vec::new(), + pending_writes: FuturesUnordered::new(), + }) + } + Err(_) => Err(std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "adapter closed", + )), + } + } + + pub async fn pcap(&mut self, path: impl Into) -> Result<(), std::io::Error> { + let (res_tx, res_rx) = oneshot::channel(); + let path: PathBuf = path.into(); + + if self + .sender + .send(HandleMessage::Pcap { path, res: res_tx }) + .is_err() + { + return Err(std::io::Error::new( + std::io::ErrorKind::NetworkUnreachable, + "adapter closed", + )); + } + + match res_rx.await { + Ok(r) => r, + Err(_) => Err(std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "adapter closed", + )), + } + } +} + +#[derive(Debug)] +pub struct StreamHandle { + host_port: u16, + recv_channel: Mutex, std::io::Error>>>, + send_channel: MTx, + + read_buffer: Vec, + pending_writes: FuturesUnordered>>, +} + +impl AsyncRead for StreamHandle { + /// Attempts to read from the connection into the provided buffer. + /// + /// Uses an internal read buffer to cache any extra received data. + /// + /// # Returns + /// * `Poll::Ready(Ok(()))` if data was read successfully + /// * `Poll::Ready(Err(e))` if an error occurred + /// * `Poll::Pending` if operation would block + /// + /// # Errors + /// * Returns `NotConnected` if adapter isn't connected + /// * Propagates any underlying transport errors + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + // 1) Serve from cache first. + if !self.read_buffer.is_empty() { + let n = buf.remaining().min(self.read_buffer.len()); + buf.put_slice(&self.read_buffer[..n]); + self.read_buffer.drain(..n); // fewer allocs than to_vec + reassign + return Poll::Ready(Ok(())); + } + + // 2) Poll the channel directly; this registers the waker on Empty. + let mut lock = self + .recv_channel + .lock() + .expect("somehow the mutex was poisoned"); + // this should always return, since we're the only owner of the mutex. The mutex is only + // used to satisfy the `Send` bounds of ReadWrite. + let mut extend_slice = Vec::new(); + let res = match lock.poll_item(cx) { + Poll::Pending => Poll::Pending, + + // Disconnected/ended: map to BrokenPipe + Poll::Ready(None) => Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "channel closed", + ))), + + // Got a chunk: copy what fits; cache the tail. + Poll::Ready(Some(res)) => match res { + Ok(data) => { + let n = buf.remaining().min(data.len()); + buf.put_slice(&data[..n]); + if n < data.len() { + extend_slice = data[n..].to_vec(); + } + Poll::Ready(Ok(())) + } + Err(e) => Poll::Ready(Err(e)), + }, + }; + std::mem::drop(lock); + self.read_buffer.extend(extend_slice); + res + } +} + +impl AsyncWrite for StreamHandle { + /// Attempts to write data to the connection. + /// + /// Data is buffered internally until flushed. + /// + /// # Returns + /// * `Poll::Ready(Ok(n))` with number of bytes written + /// * `Poll::Ready(Err(e))` if an error occurred + /// * `Poll::Pending` if operation would block + /// + /// # Errors + /// * Returns `NotConnected` if adapter isn't connected + fn poll_write( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + trace!("poll psh {}", buf.len()); + let (tx, rx) = oneshot::channel(); + self.send_channel + .send(HandleMessage::Send { + host_port: self.host_port, + data: buf.to_vec(), + res: tx, + }) + .map_err(|_| std::io::Error::new(std::io::ErrorKind::BrokenPipe, "channel closed"))?; + self.pending_writes.push(rx); + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + while let Poll::Ready(maybe) = self.pending_writes.poll_next_unpin(cx) { + match maybe { + Some(Ok(Ok(()))) => {} + Some(Ok(Err(e))) => return Poll::Ready(Err(e)), + Some(Err(_canceled)) => { + return Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "channel closed", + ))) + } + None => break, // nothing pending + } + } + if self.pending_writes.is_empty() { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + // Just a drop will close the channel, which will trigger a close + std::task::Poll::Ready(Ok(())) + } +} + +impl Drop for StreamHandle { + fn drop(&mut self) { + let _ = self.send_channel.send(HandleMessage::Close { + host_port: self.host_port, + }); + } +} diff --git a/idevice/src/tcp/mod.rs b/idevice/src/tcp/mod.rs index 0c4424d..64302be 100644 --- a/idevice/src/tcp/mod.rs +++ b/idevice/src/tcp/mod.rs @@ -6,12 +6,12 @@ use std::{ }; use log::debug; -use stream::AdapterStream; use tokio::io::AsyncWriteExt; -use crate::provider::RsdProvider; +use crate::{provider::RsdProvider, ReadWrite}; pub mod adapter; +pub mod handle; pub mod packets; pub mod stream; @@ -39,16 +39,14 @@ pub(crate) fn log_packet(file: &Arc>, packet }); } -impl<'a> RsdProvider<'a> for adapter::Adapter { +impl RsdProvider for handle::AdapterHandle { async fn connect_to_service_port( - &'a mut self, + &mut self, port: u16, - ) -> Result, crate::IdeviceError> { - let s = stream::AdapterStream::connect(self, port).await?; - Ok(s) + ) -> Result, crate::IdeviceError> { + let s = self.connect(port).await?; + Ok(Box::new(s)) } - - type Stream = AdapterStream<'a>; } #[cfg(test)] diff --git a/idevice/src/xpc/http2/mod.rs b/idevice/src/xpc/http2/mod.rs index 7e233e3..5679c66 100644 --- a/idevice/src/xpc/http2/mod.rs +++ b/idevice/src/xpc/http2/mod.rs @@ -28,13 +28,6 @@ impl<'a, R: ReadWrite + 'a> Http2Client { }) } - pub fn box_inner(self) -> Http2Client> { - Http2Client { - inner: Box::new(self.inner), - cache: self.cache, - } - } - pub async fn set_settings( &mut self, settings: Vec, diff --git a/idevice/src/xpc/mod.rs b/idevice/src/xpc/mod.rs index 0931c00..aac5308 100644 --- a/idevice/src/xpc/mod.rs +++ b/idevice/src/xpc/mod.rs @@ -29,14 +29,6 @@ impl<'a, R: ReadWrite + 'a> RemoteXpcClient { }) } - pub fn box_inner(self) -> RemoteXpcClient> { - RemoteXpcClient { - h2_client: self.h2_client.box_inner(), - root_id: self.root_id, - reply_id: self.reply_id, - } - } - pub async fn do_handshake(&mut self) -> Result<(), IdeviceError> { self.h2_client .set_settings( diff --git a/tools/src/app_service.rs b/tools/src/app_service.rs index a90f72d..27f95df 100644 --- a/tools/src/app_service.rs +++ b/tools/src/app_service.rs @@ -3,7 +3,7 @@ use clap::{Arg, Command}; use idevice::{ core_device::AppServiceClient, core_device_proxy::CoreDeviceProxy, rsd::RsdHandshake, - tcp::stream::AdapterStream, IdeviceService, RsdService, + IdeviceService, RsdService, }; mod common; @@ -109,11 +109,10 @@ async fn main() { .expect("no core proxy"); let rsd_port = proxy.handshake.server_rsd_port; - let mut adapter = proxy.create_software_tunnel().expect("no software tunnel"); + let adapter = proxy.create_software_tunnel().expect("no software tunnel"); + let mut adapter = adapter.to_async_handle(); - let stream = AdapterStream::connect(&mut adapter, rsd_port) - .await - .expect("no RSD connect"); + let stream = adapter.connect(rsd_port).await.expect("no RSD connect"); // Make the connection to RemoteXPC let mut handshake = RsdHandshake::new(stream).await.unwrap(); diff --git a/tools/src/common.rs b/tools/src/common.rs index ed9ecfd..8e54744 100644 --- a/tools/src/common.rs +++ b/tools/src/common.rs @@ -18,9 +18,7 @@ pub async fn get_provider( pairing_file: Option<&String>, label: &str, ) -> Result, String> { - let provider: Box = if udid.is_some() { - let udid = udid.unwrap(); - + let provider: Box = if let Some(udid) = udid { let mut usbmuxd = if let Ok(var) = std::env::var("USBMUXD_SOCKET_ADDRESS") { let socket = SocketAddr::from_str(&var).expect("Bad USBMUXD_SOCKET_ADDRESS"); let socket = tokio::net::TcpStream::connect(socket) diff --git a/tools/src/debug_proxy.rs b/tools/src/debug_proxy.rs index ba3be4d..7aa983e 100644 --- a/tools/src/debug_proxy.rs +++ b/tools/src/debug_proxy.rs @@ -5,7 +5,7 @@ use std::io::Write; use clap::{Arg, Command}; use idevice::{ core_device_proxy::CoreDeviceProxy, debug_proxy::DebugProxyClient, rsd::RsdHandshake, - tcp::stream::AdapterStream, IdeviceService, RsdService, + IdeviceService, RsdService, }; mod common; @@ -71,10 +71,9 @@ async fn main() { .expect("no core proxy"); let rsd_port = proxy.handshake.server_rsd_port; - let mut adapter = proxy.create_software_tunnel().expect("no software tunnel"); - let stream = AdapterStream::connect(&mut adapter, rsd_port) - .await - .expect("no RSD connect"); + let adapter = proxy.create_software_tunnel().expect("no software tunnel"); + let mut adapter = adapter.to_async_handle(); + let stream = adapter.connect(rsd_port).await.expect("no RSD connect"); // Make the connection to RemoteXPC let mut handshake = RsdHandshake::new(stream).await.unwrap(); diff --git a/tools/src/location_simulation.rs b/tools/src/location_simulation.rs index 0ee50f5..02c0a6d 100644 --- a/tools/src/location_simulation.rs +++ b/tools/src/location_simulation.rs @@ -2,10 +2,7 @@ // Just lists apps for now use clap::{Arg, Command}; -use idevice::{ - core_device_proxy::CoreDeviceProxy, rsd::RsdHandshake, tcp::stream::AdapterStream, - IdeviceService, RsdService, -}; +use idevice::{core_device_proxy::CoreDeviceProxy, rsd::RsdHandshake, IdeviceService, RsdService}; mod common; @@ -71,10 +68,9 @@ async fn main() { .expect("no core proxy"); let rsd_port = proxy.handshake.server_rsd_port; - let mut adapter = proxy.create_software_tunnel().expect("no software tunnel"); - let stream = AdapterStream::connect(&mut adapter, rsd_port) - .await - .expect("no RSD connect"); + let adapter = proxy.create_software_tunnel().expect("no software tunnel"); + let mut adapter = adapter.to_async_handle(); + let stream = adapter.connect(rsd_port).await.expect("no RSD connect"); // Make the connection to RemoteXPC let mut handshake = RsdHandshake::new(stream).await.unwrap(); diff --git a/tools/src/process_control.rs b/tools/src/process_control.rs index 173633d..511df79 100644 --- a/tools/src/process_control.rs +++ b/tools/src/process_control.rs @@ -1,10 +1,7 @@ // Jackson Coxson use clap::{Arg, Command}; -use idevice::{ - core_device_proxy::CoreDeviceProxy, rsd::RsdHandshake, tcp::stream::AdapterStream, - IdeviceService, RsdService, -}; +use idevice::{core_device_proxy::CoreDeviceProxy, rsd::RsdHandshake, IdeviceService, RsdService}; mod common; @@ -79,10 +76,9 @@ async fn main() { .expect("no core proxy"); let rsd_port = proxy.handshake.server_rsd_port; - let mut adapter = proxy.create_software_tunnel().expect("no software tunnel"); - let stream = AdapterStream::connect(&mut adapter, rsd_port) - .await - .expect("no RSD connect"); + let adapter = proxy.create_software_tunnel().expect("no software tunnel"); + let mut adapter = adapter.to_async_handle(); + let stream = adapter.connect(rsd_port).await.expect("no RSD connect"); // Make the connection to RemoteXPC let mut handshake = RsdHandshake::new(stream).await.unwrap(); diff --git a/tools/src/restore_service.rs b/tools/src/restore_service.rs index b1d1e85..e2dcd8a 100644 --- a/tools/src/restore_service.rs +++ b/tools/src/restore_service.rs @@ -3,8 +3,7 @@ use clap::{Arg, Command}; use idevice::{ core_device_proxy::CoreDeviceProxy, pretty_print_dictionary, - restore_service::RestoreServiceClient, rsd::RsdHandshake, tcp::stream::AdapterStream, - IdeviceService, RsdService, + restore_service::RestoreServiceClient, rsd::RsdHandshake, IdeviceService, RsdService, }; mod common; @@ -75,11 +74,9 @@ async fn main() { .expect("no core proxy"); let rsd_port = proxy.handshake.server_rsd_port; - let mut adapter = proxy.create_software_tunnel().expect("no software tunnel"); - - let stream = AdapterStream::connect(&mut adapter, rsd_port) - .await - .expect("no RSD connect"); + let adapter = proxy.create_software_tunnel().expect("no software tunnel"); + let mut adapter = adapter.to_async_handle(); + let stream = adapter.connect(rsd_port).await.expect("no RSD connect"); // Make the connection to RemoteXPC let mut handshake = RsdHandshake::new(stream).await.unwrap();