Create concurrent TCP handle and implement RSD for non-lifetime structs

This commit is contained in:
Jackson Coxson
2025-08-11 10:41:55 -06:00
parent 713a2ae0c2
commit 0a0899cd8a
28 changed files with 536 additions and 149 deletions

View File

@@ -14,6 +14,7 @@ keywords = ["lockdownd", "ios"]
tokio = { version = "1.43", features = ["io-util"] }
tokio-rustls = { version = "0.26", default-features = false }
rustls = { version = "0.23", default-features = false }
crossfire = { version = "2.0", optional = true } # TODO: update to 2.1 when it comes out
plist = { version = "1.7" }
serde = { version = "1", features = ["derive"] }
@@ -80,7 +81,13 @@ restore_service = []
rsd = ["xpc"]
syslog_relay = ["dep:bytes"]
tcp = ["tokio/net"]
tunnel_tcp_stack = ["dep:rand", "dep:futures", "tokio/fs", "tokio/sync"]
tunnel_tcp_stack = [
"dep:rand",
"dep:futures",
"tokio/fs",
"tokio/sync",
"dep:crossfire",
]
tss = ["dep:uuid", "dep:reqwest"]
tunneld = ["dep:serde_json", "dep:json", "dep:reqwest"]
usbmuxd = ["tokio/net"]

View File

@@ -25,7 +25,7 @@ pub use services::*;
#[cfg(feature = "xpc")]
pub use xpc::RemoteXpcClient;
use log::{debug, error, trace};
use log::{debug, error, trace, warn};
use provider::{IdeviceProvider, RsdProvider};
use rustls::{crypto::CryptoProvider, pki_types::ServerName};
use std::{
@@ -70,20 +70,17 @@ pub trait IdeviceService: Sized {
pub trait RsdService: Sized {
fn rsd_service_name() -> std::borrow::Cow<'static, str>;
fn from_stream(
stream: Self::Stream,
stream: Box<dyn ReadWrite>,
) -> impl std::future::Future<Output = Result<Self, IdeviceError>> + Send;
fn connect_rsd<'a, S>(
provider: &'a mut impl RsdProvider<'a, Stream = S>,
fn connect_rsd(
provider: &mut impl RsdProvider,
handshake: &mut rsd::RsdHandshake,
) -> impl std::future::Future<Output = Result<Self, IdeviceError>>
where
Self: crate::RsdService<Stream = S>,
S: ReadWrite,
Self: crate::RsdService,
{
handshake.connect(provider)
}
type Stream: ReadWrite;
}
/// Type alias for boxed device connection sockets
@@ -417,9 +414,12 @@ impl Idevice {
#[cfg(all(feature = "ring", feature = "aws-lc"))]
{
compile_error!(
"Cannot enable both `ring` and `aws-lc` features at the same time"
);
// We can't throw a compile error because it breaks rust-analyzer.
// My sanity while debugging the workspace crates are more important.
debug!("Using ring crypto backend, because both were passed");
warn!("Both ring && aws-lc are selected as idevice crypto backends!");
rustls::crypto::ring::default_provider()
}
};

View File

@@ -42,12 +42,11 @@ pub trait IdeviceProvider: Unpin + Send + Sync + std::fmt::Debug {
) -> Pin<Box<dyn Future<Output = Result<PairingFile, IdeviceError>> + Send>>;
}
pub trait RsdProvider<'a>: Unpin + Send + Sync + std::fmt::Debug {
pub trait RsdProvider: Unpin + Send + Sync + std::fmt::Debug {
fn connect_to_service_port(
&'a mut self,
&mut self,
port: u16,
) -> impl std::future::Future<Output = Result<Self::Stream, IdeviceError>> + Send;
type Stream: ReadWrite;
) -> impl std::future::Future<Output = Result<Box<dyn ReadWrite>, IdeviceError>> + Send;
}
/// TCP-based device connection provider
@@ -159,13 +158,13 @@ impl IdeviceProvider for UsbmuxdProvider {
}
#[cfg(feature = "tcp")]
impl<'a> RsdProvider<'a> for std::net::IpAddr {
impl RsdProvider for std::net::IpAddr {
async fn connect_to_service_port(
&'a mut self,
&mut self,
port: u16,
) -> Result<Self::Stream, IdeviceError> {
Ok(tokio::net::TcpStream::connect((*self, port)).await?)
) -> Result<Box<dyn ReadWrite>, IdeviceError> {
Ok(Box::new(
tokio::net::TcpStream::connect((*self, port)).await?,
))
}
type Stream = tokio::net::TcpStream;
}

View File

@@ -7,18 +7,16 @@ use crate::{obf, IdeviceError, ReadWrite, RsdService};
use super::CoreDeviceServiceClient;
impl<R: ReadWrite> RsdService for AppServiceClient<R> {
impl RsdService for AppServiceClient<Box<dyn ReadWrite>> {
fn rsd_service_name() -> std::borrow::Cow<'static, str> {
obf!("com.apple.coredevice.appservice")
}
async fn from_stream(stream: R) -> Result<Self, IdeviceError> {
async fn from_stream(stream: Box<dyn ReadWrite>) -> Result<Self, IdeviceError> {
Ok(Self {
inner: CoreDeviceServiceClient::new(stream).await?,
})
}
type Stream = R;
}
pub struct AppServiceClient<R: ReadWrite> {
@@ -137,12 +135,6 @@ impl<'a, R: ReadWrite + 'a> AppServiceClient<R> {
})
}
pub fn box_inner(self) -> AppServiceClient<Box<dyn ReadWrite + 'a>> {
AppServiceClient {
inner: self.inner.box_inner(),
}
}
pub async fn list_apps(
&mut self,
app_clips: bool,

View File

@@ -24,12 +24,6 @@ impl<'a, R: ReadWrite + 'a> CoreDeviceServiceClient<R> {
Ok(Self { inner: client })
}
pub fn box_inner(self) -> CoreDeviceServiceClient<Box<dyn ReadWrite + 'a>> {
CoreDeviceServiceClient {
inner: self.inner.box_inner(),
}
}
pub async fn invoke(
&mut self,
feature: impl Into<String>,

View File

@@ -10,19 +10,17 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use crate::{obf, IdeviceError, ReadWrite, RsdService};
impl<R: ReadWrite> RsdService for DebugProxyClient<R> {
impl RsdService for DebugProxyClient<Box<dyn ReadWrite>> {
fn rsd_service_name() -> std::borrow::Cow<'static, str> {
obf!("com.apple.internal.dt.remote.debugproxy")
}
async fn from_stream(stream: R) -> Result<Self, IdeviceError> {
async fn from_stream(stream: Box<dyn ReadWrite>) -> Result<Self, IdeviceError> {
Ok(Self {
socket: stream,
noack_mode: false,
})
}
type Stream = R;
}
/// Client for interacting with the iOS debug proxy service

View File

@@ -8,14 +8,12 @@ pub mod message;
pub mod process_control;
pub mod remote_server;
impl<R: ReadWrite> RsdService for remote_server::RemoteServerClient<R> {
impl RsdService for remote_server::RemoteServerClient<Box<dyn ReadWrite>> {
fn rsd_service_name() -> std::borrow::Cow<'static, str> {
obf!("com.apple.instruments.dtservicehub")
}
async fn from_stream(stream: R) -> Result<Self, IdeviceError> {
async fn from_stream(stream: Box<dyn ReadWrite>) -> Result<Self, IdeviceError> {
Ok(Self::new(stream))
}
type Stream = R;
}

View File

@@ -6,7 +6,7 @@
use log::warn;
use plist::Dictionary;
use crate::{lockdown::LockdownClient, obf, Idevice, IdeviceError, IdeviceService};
use crate::{lockdown::LockdownClient, obf, Idevice, IdeviceError, IdeviceService, RsdService};
/// Client for interacting with the iOS misagent service
///
@@ -19,6 +19,18 @@ pub struct MisagentClient {
pub idevice: Idevice,
}
impl RsdService for MisagentClient {
fn rsd_service_name() -> std::borrow::Cow<'static, str> {
obf!("com.apple.misagent.shim.remote")
}
async fn from_stream(stream: Box<dyn crate::ReadWrite>) -> Result<Self, IdeviceError> {
let mut stream = Idevice::new(stream, "");
stream.rsd_checkin().await?;
Ok(Self::new(stream))
}
}
impl IdeviceService for MisagentClient {
/// Returns the misagent service name as registered with lockdownd
fn service_name() -> std::borrow::Cow<'static, str> {

View File

@@ -6,41 +6,33 @@ use plist::Dictionary;
use crate::{obf, IdeviceError, ReadWrite, RemoteXpcClient, RsdService};
/// Client for interacting with the Restore Service
pub struct RestoreServiceClient<R: ReadWrite> {
pub struct RestoreServiceClient {
/// The underlying device connection with established Restore Service service
pub stream: RemoteXpcClient<R>,
pub stream: RemoteXpcClient<Box<dyn ReadWrite>>,
}
impl<R: ReadWrite> RsdService for RestoreServiceClient<R> {
impl RsdService for RestoreServiceClient {
fn rsd_service_name() -> std::borrow::Cow<'static, str> {
obf!("com.apple.RestoreRemoteServices.restoreserviced")
}
async fn from_stream(stream: R) -> Result<Self, IdeviceError> {
async fn from_stream(stream: Box<dyn ReadWrite>) -> Result<Self, IdeviceError> {
Self::new(stream).await
}
type Stream = R;
}
impl<'a, R: ReadWrite + 'a> RestoreServiceClient<R> {
impl RestoreServiceClient {
/// Creates a new Restore Service client a socket connection,
/// and connects to the RemoteXPC service.
///
/// # Arguments
/// * `idevice` - Pre-established device connection
pub async fn new(stream: R) -> Result<Self, IdeviceError> {
pub async fn new(stream: Box<dyn ReadWrite>) -> Result<Self, IdeviceError> {
let mut stream = RemoteXpcClient::new(stream).await?;
stream.do_handshake().await?;
Ok(Self { stream })
}
pub fn box_inner(self) -> RestoreServiceClient<Box<dyn ReadWrite + 'a>> {
RestoreServiceClient {
stream: self.stream.box_inner(),
}
}
/// Enter recovery
pub async fn enter_recovery(&mut self) -> Result<(), IdeviceError> {
let mut req = Dictionary::new();

View File

@@ -156,13 +156,9 @@ impl RsdHandshake {
})
}
pub async fn connect<'a, T, S>(
&mut self,
provider: &'a mut impl RsdProvider<'a, Stream = S>,
) -> Result<T, IdeviceError>
pub async fn connect<T>(&mut self, provider: &mut impl RsdProvider) -> Result<T, IdeviceError>
where
T: crate::RsdService<Stream = S>,
S: ReadWrite,
T: crate::RsdService,
{
let service_name = T::rsd_service_name();
let service = match self.services.get(&service_name.to_string()) {

View File

@@ -147,6 +147,13 @@ impl Adapter {
}
}
/// Wraps this handle in a new thread.
/// Streams from this handle will be thread safe, with data sent through channels.
/// The handle supports the trait for RSD provider.
pub fn to_async_handle(self) -> super::handle::AdapterHandle {
super::handle::AdapterHandle::new(self)
}
/// Initiates a TCP connection to the specified port.
///
/// # Arguments
@@ -435,6 +442,19 @@ impl Adapter {
}
}
pub(crate) fn uncache_all(&mut self, host_port: u16) -> Result<Vec<u8>, std::io::Error> {
if let Some(state) = self.states.get_mut(&host_port) {
let res = state.read_buffer[..].to_vec();
state.read_buffer.clear();
Ok(res)
} else {
Err(std::io::Error::new(
ErrorKind::NotConnected,
"not connected",
))
}
}
pub(crate) fn cache_read(
&mut self,
payload: &[u8],
@@ -517,7 +537,7 @@ impl Adapter {
})
}
async fn process_tcp_packet(&mut self) -> Result<(), std::io::Error> {
pub(crate) async fn process_tcp_packet(&mut self) -> Result<(), std::io::Error> {
loop {
let ip_packet = self.read_ip_packet().await?;
let res = TcpPacket::parse(&ip_packet)?;

339
idevice/src/tcp/handle.rs Normal file
View File

@@ -0,0 +1,339 @@
// So originally, streams wrote to the adapter via a mutable reference.
// This worked fine for most applications, but the lifetime requirement of the stream
// makes things difficult. This was especially apparent when trying to integrate with lockdown
// services that were swapped on the heap. This will also allow for usage across threads,
// especially in FFI. Judging the tradeoffs, we'll go forward with it.
use std::{collections::HashMap, path::PathBuf, sync::Mutex, task::Poll};
use crossfire::{mpsc, spsc, stream::AsyncStream, AsyncRx, MTx, Tx};
use futures::{stream::FuturesUnordered, StreamExt};
use log::trace;
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::oneshot,
};
pub type ConnectToPortRes =
oneshot::Sender<Result<(u16, AsyncRx<Result<Vec<u8>, std::io::Error>>), std::io::Error>>;
enum HandleMessage {
/// Returns the host port
ConnectToPort {
target: u16,
res: ConnectToPortRes,
},
Close {
host_port: u16,
},
Send {
host_port: u16,
data: Vec<u8>,
res: oneshot::Sender<Result<(), std::io::Error>>,
},
Pcap {
path: PathBuf,
res: oneshot::Sender<Result<(), std::io::Error>>,
},
}
#[derive(Debug)]
pub struct AdapterHandle {
sender: MTx<HandleMessage>,
}
impl AdapterHandle {
pub fn new(mut adapter: super::adapter::Adapter) -> Self {
let (tx, rx) = mpsc::unbounded_async();
tokio::spawn(async move {
let mut handles: HashMap<u16, Tx<Result<Vec<u8>, std::io::Error>>> = HashMap::new();
let mut tick = tokio::time::interval(std::time::Duration::from_millis(1));
loop {
tokio::select! {
// check for messages for us
msg = rx.recv() => {
match msg {
Ok(m) => match m {
HandleMessage::ConnectToPort { target, res } => {
let connect_response = match adapter.connect(target).await {
Ok(c) => {
let (ptx, prx) = spsc::unbounded_async();
handles.insert(c, ptx);
Ok((c, prx))
}
Err(e) => Err(e),
};
res.send(connect_response).ok();
}
HandleMessage::Close { host_port } => {
handles.remove(&host_port);
adapter.close(host_port).await.ok();
}
HandleMessage::Send {
host_port,
data,
res,
} => {
if let Err(e) = adapter.queue_send(&data, host_port) {
res.send(Err(e)).ok();
} else {
let response = adapter.write_buffer_flush().await;
res.send(response).ok();
}
}
HandleMessage::Pcap {
path,
res
} => {
res.send(adapter.pcap(path).await).ok();
}
},
Err(_) => {
break;
},
}
}
r = adapter.process_tcp_packet() => {
if let Err(e) = r {
// propagate error to all streams; close them
for (hp, tx) in handles.drain() {
let _ = tx.send(Err(e.kind().into())); // or clone/convert
let _ = adapter.close(hp).await;
}
break;
}
// Push any newly available bytes to per-conn channels
let mut dead = Vec::new();
for (&hp, tx) in &handles {
match adapter.uncache_all(hp) {
Ok(buf) if !buf.is_empty() => {
if tx.send(Ok(buf)).is_err() {
dead.push(hp);
}
}
Err(e) => {
let _ = tx.send(Err(e));
dead.push(hp);
}
_ => {}
}
}
for hp in dead {
handles.remove(&hp);
let _ = adapter.close(hp).await;
}
}
_ = tick.tick() => {
let _ = adapter.write_buffer_flush().await;
}
}
}
});
Self { sender: tx }
}
pub async fn connect(&mut self, port: u16) -> Result<StreamHandle, std::io::Error> {
let (res_tx, res_rx) = oneshot::channel();
if self
.sender
.send(HandleMessage::ConnectToPort {
target: port,
res: res_tx,
})
.is_err()
{
return Err(std::io::Error::new(
std::io::ErrorKind::NetworkUnreachable,
"adapter closed",
));
}
match res_rx.await {
Ok(r) => {
let (host_port, recv_channel) = r?;
Ok(StreamHandle {
host_port,
recv_channel: Mutex::new(recv_channel.into_stream()),
send_channel: self.sender.clone(),
read_buffer: Vec::new(),
pending_writes: FuturesUnordered::new(),
})
}
Err(_) => Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"adapter closed",
)),
}
}
pub async fn pcap(&mut self, path: impl Into<PathBuf>) -> Result<(), std::io::Error> {
let (res_tx, res_rx) = oneshot::channel();
let path: PathBuf = path.into();
if self
.sender
.send(HandleMessage::Pcap { path, res: res_tx })
.is_err()
{
return Err(std::io::Error::new(
std::io::ErrorKind::NetworkUnreachable,
"adapter closed",
));
}
match res_rx.await {
Ok(r) => r,
Err(_) => Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"adapter closed",
)),
}
}
}
#[derive(Debug)]
pub struct StreamHandle {
host_port: u16,
recv_channel: Mutex<AsyncStream<Result<Vec<u8>, std::io::Error>>>,
send_channel: MTx<HandleMessage>,
read_buffer: Vec<u8>,
pending_writes: FuturesUnordered<oneshot::Receiver<Result<(), std::io::Error>>>,
}
impl AsyncRead for StreamHandle {
/// Attempts to read from the connection into the provided buffer.
///
/// Uses an internal read buffer to cache any extra received data.
///
/// # Returns
/// * `Poll::Ready(Ok(()))` if data was read successfully
/// * `Poll::Ready(Err(e))` if an error occurred
/// * `Poll::Pending` if operation would block
///
/// # Errors
/// * Returns `NotConnected` if adapter isn't connected
/// * Propagates any underlying transport errors
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
// 1) Serve from cache first.
if !self.read_buffer.is_empty() {
let n = buf.remaining().min(self.read_buffer.len());
buf.put_slice(&self.read_buffer[..n]);
self.read_buffer.drain(..n); // fewer allocs than to_vec + reassign
return Poll::Ready(Ok(()));
}
// 2) Poll the channel directly; this registers the waker on Empty.
let mut lock = self
.recv_channel
.lock()
.expect("somehow the mutex was poisoned");
// this should always return, since we're the only owner of the mutex. The mutex is only
// used to satisfy the `Send` bounds of ReadWrite.
let mut extend_slice = Vec::new();
let res = match lock.poll_item(cx) {
Poll::Pending => Poll::Pending,
// Disconnected/ended: map to BrokenPipe
Poll::Ready(None) => Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"channel closed",
))),
// Got a chunk: copy what fits; cache the tail.
Poll::Ready(Some(res)) => match res {
Ok(data) => {
let n = buf.remaining().min(data.len());
buf.put_slice(&data[..n]);
if n < data.len() {
extend_slice = data[n..].to_vec();
}
Poll::Ready(Ok(()))
}
Err(e) => Poll::Ready(Err(e)),
},
};
std::mem::drop(lock);
self.read_buffer.extend(extend_slice);
res
}
}
impl AsyncWrite for StreamHandle {
/// Attempts to write data to the connection.
///
/// Data is buffered internally until flushed.
///
/// # Returns
/// * `Poll::Ready(Ok(n))` with number of bytes written
/// * `Poll::Ready(Err(e))` if an error occurred
/// * `Poll::Pending` if operation would block
///
/// # Errors
/// * Returns `NotConnected` if adapter isn't connected
fn poll_write(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
trace!("poll psh {}", buf.len());
let (tx, rx) = oneshot::channel();
self.send_channel
.send(HandleMessage::Send {
host_port: self.host_port,
data: buf.to_vec(),
res: tx,
})
.map_err(|_| std::io::Error::new(std::io::ErrorKind::BrokenPipe, "channel closed"))?;
self.pending_writes.push(rx);
Poll::Ready(Ok(buf.len()))
}
fn poll_flush(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
while let Poll::Ready(maybe) = self.pending_writes.poll_next_unpin(cx) {
match maybe {
Some(Ok(Ok(()))) => {}
Some(Ok(Err(e))) => return Poll::Ready(Err(e)),
Some(Err(_canceled)) => {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"channel closed",
)))
}
None => break, // nothing pending
}
}
if self.pending_writes.is_empty() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
// Just a drop will close the channel, which will trigger a close
std::task::Poll::Ready(Ok(()))
}
}
impl Drop for StreamHandle {
fn drop(&mut self) {
let _ = self.send_channel.send(HandleMessage::Close {
host_port: self.host_port,
});
}
}

View File

@@ -6,12 +6,12 @@ use std::{
};
use log::debug;
use stream::AdapterStream;
use tokio::io::AsyncWriteExt;
use crate::provider::RsdProvider;
use crate::{provider::RsdProvider, ReadWrite};
pub mod adapter;
pub mod handle;
pub mod packets;
pub mod stream;
@@ -39,16 +39,14 @@ pub(crate) fn log_packet(file: &Arc<tokio::sync::Mutex<tokio::fs::File>>, packet
});
}
impl<'a> RsdProvider<'a> for adapter::Adapter {
impl RsdProvider for handle::AdapterHandle {
async fn connect_to_service_port(
&'a mut self,
&mut self,
port: u16,
) -> Result<stream::AdapterStream<'a>, crate::IdeviceError> {
let s = stream::AdapterStream::connect(self, port).await?;
Ok(s)
) -> Result<Box<dyn ReadWrite>, crate::IdeviceError> {
let s = self.connect(port).await?;
Ok(Box::new(s))
}
type Stream = AdapterStream<'a>;
}
#[cfg(test)]

View File

@@ -28,13 +28,6 @@ impl<'a, R: ReadWrite + 'a> Http2Client<R> {
})
}
pub fn box_inner(self) -> Http2Client<Box<dyn ReadWrite + 'a>> {
Http2Client {
inner: Box::new(self.inner),
cache: self.cache,
}
}
pub async fn set_settings(
&mut self,
settings: Vec<frame::Setting>,

View File

@@ -29,14 +29,6 @@ impl<'a, R: ReadWrite + 'a> RemoteXpcClient<R> {
})
}
pub fn box_inner(self) -> RemoteXpcClient<Box<dyn ReadWrite + 'a>> {
RemoteXpcClient {
h2_client: self.h2_client.box_inner(),
root_id: self.root_id,
reply_id: self.reply_id,
}
}
pub async fn do_handshake(&mut self) -> Result<(), IdeviceError> {
self.h2_client
.set_settings(