Implement file stream XPC messages

This commit is contained in:
Jackson Coxson
2025-08-15 20:13:03 -06:00
parent 2d528ae21c
commit e881d6ef07
3 changed files with 114 additions and 2 deletions

View File

@@ -18,6 +18,9 @@ pub enum XPCFlag {
WantingReply,
InitHandshake,
FileTxStreamRequest,
FileTxStreamResponse,
Custom(u32),
}
@@ -28,6 +31,8 @@ impl From<XPCFlag> for u32 {
XPCFlag::DataFlag => 0x00000100,
XPCFlag::WantingReply => 0x00010000,
XPCFlag::InitHandshake => 0x00400000,
XPCFlag::FileTxStreamRequest => 0x00100000,
XPCFlag::FileTxStreamResponse => 0x00200000,
XPCFlag::Custom(inner) => inner,
}
}
@@ -68,6 +73,7 @@ pub enum XPCType {
String = 0x00009000,
Data = 0x00008000,
Uuid = 0x0000a000,
FileTransfer = 0x0001a000,
}
impl TryFrom<u32> for XPCType {
@@ -85,6 +91,7 @@ impl TryFrom<u32> for XPCType {
0x00009000 => Ok(Self::String),
0x00008000 => Ok(Self::Data),
0x0000a000 => Ok(Self::Uuid),
0x0001a000 => Ok(Self::FileTransfer),
_ => Err(IdeviceError::UnknownXpcType(value))?,
}
}
@@ -107,6 +114,8 @@ pub enum XPCObject {
String(String),
Data(Vec<u8>),
Uuid(uuid::Uuid),
FileTransfer { msg_id: u64, data: Box<XPCObject> },
}
impl From<plist::Value> for XPCObject {
@@ -153,6 +162,12 @@ impl XPCObject {
}
plist::Value::Dictionary(dict)
}
Self::FileTransfer { msg_id, data } => {
crate::plist!({
"msg_id": *msg_id,
"data": data.to_plist(),
})
}
}
}
@@ -240,6 +255,11 @@ impl XPCObject {
buf.extend_from_slice(&16_u32.to_le_bytes());
buf.extend_from_slice(uuid.as_bytes());
}
XPCObject::FileTransfer { msg_id, data } => {
buf.extend_from_slice(&(XPCType::FileTransfer as u32).to_le_bytes());
buf.extend_from_slice(&msg_id.to_le_bytes());
data.encode_object(buf)?;
}
}
Ok(())
}
@@ -370,6 +390,18 @@ impl XPCObject {
cursor.read_exact(&mut data)?;
Ok(XPCObject::Uuid(uuid::Builder::from_bytes(data).into_uuid()))
}
XPCType::FileTransfer => {
let mut id_buf = [0u8; 8];
cursor.read_exact(&mut id_buf)?;
let msg_id = u64::from_le_bytes(id_buf);
// The next thing in the stream is a full XPC object
let inner = Self::decode_object(cursor)?;
Ok(XPCObject::FileTransfer {
msg_id,
data: Box::new(inner),
})
}
}
}

View File

@@ -60,7 +60,8 @@ impl<R: ReadWrite> Http2Client<R> {
}
pub async fn open_stream(&mut self, stream_id: u32) -> Result<(), IdeviceError> {
self.cache.insert(stream_id, VecDeque::new());
// Sometimes Apple is silly and sends data to a stream that isn't open
self.cache.entry(stream_id).or_default();
let frame = frame::HeadersFrame { stream_id }.serialize();
self.inner.write_all(&frame).await?;
self.inner.flush().await?;
@@ -124,11 +125,14 @@ impl<R: ReadWrite> Http2Client<R> {
let c = match self.cache.get_mut(&data_frame.stream_id) {
Some(c) => c,
None => {
// Sometimes Apple is a little silly and sends data before the
// stream is open.
warn!(
"Received message for stream ID {} not in cache",
data_frame.stream_id
);
continue;
self.cache.insert(data_frame.stream_id, VecDeque::new());
self.cache.get_mut(&data_frame.stream_id).unwrap()
}
};
c.push_back(data_frame.payload);

View File

@@ -1,5 +1,7 @@
// Jackson Coxson
use async_stream::try_stream;
use futures::Stream;
use http2::Setting;
use log::debug;
@@ -133,4 +135,78 @@ impl<R: ReadWrite> RemoteXpcClient<R> {
.await?;
Ok(())
}
pub fn iter_file_chunks<'a>(
&'a mut self,
total_size: usize,
file_idx: u32,
) -> impl Stream<Item = Result<Vec<u8>, IdeviceError>> + 'a {
let stream_id = (file_idx + 1) * 2;
try_stream! {
fn strip_xpc_wrapper_prefix(buf: &[u8]) -> (&[u8], bool) {
// Returns (data_after_wrapper, stripped_anything)
const MAGIC: u32 = 0x29b00b92;
if buf.len() < 24 {
return (buf, false);
}
let magic = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]);
if magic != MAGIC {
return (buf, false);
}
// flags at [4..8] not needed to compute size
let body_len = u64::from_le_bytes([
buf[8], buf[9], buf[10], buf[11], buf[12], buf[13], buf[14], buf[15],
]) as usize;
let wrapper_len = 24 + body_len;
if buf.len() < wrapper_len {
// Incomplete wrapper (shouldnt happen with your read API), keep as-is.
return (buf, false);
}
(&buf[wrapper_len..], true)
}
self.open_file_stream_for_response(stream_id).await?;
let mut got = 0usize;
while got < total_size {
let bytes = self.h2_client.read(stream_id).await?;
let (after, stripped) = strip_xpc_wrapper_prefix(&bytes);
if stripped && after.is_empty() {
continue; // pure control wrapper, don't count
}
let data = if stripped { after.to_vec() } else { bytes };
if data.is_empty() {
continue;
}
got += data.len();
yield data;
}
}
}
pub async fn open_file_stream_for_response(
&mut self,
stream_id: u32,
) -> Result<(), IdeviceError> {
// 1) Open the HTTP/2 stream
self.h2_client.open_stream(stream_id).await?;
// 2) Send an empty XPC wrapper on that same stream with FILE_TX_STREAM_RESPONSE
let flags = XPCFlag::AlwaysSet | XPCFlag::FileTxStreamResponse;
let msg = XPCMessage::new(Some(flags), None, Some(0));
// IMPORTANT: send on `stream_id`, not ROOT/REPLY
let bytes = msg.encode(0)?;
self.h2_client.send(bytes, stream_id).await?;
Ok(())
}
}