diff --git a/ffi/src/afc.rs b/ffi/src/afc.rs index 4f1d3ab..2f9a243 100644 --- a/ffi/src/afc.rs +++ b/ffi/src/afc.rs @@ -578,7 +578,7 @@ pub unsafe extern "C" fn afc_file_read( } let fd = unsafe { &mut *(handle as *mut idevice::afc::file::FileDescriptor) }; - let res: Result, IdeviceError> = run_sync(async move { fd.read().await }); + let res: Result, IdeviceError> = run_sync(async move { fd.read_entire().await }); match res { Ok(bytes) => { @@ -620,7 +620,7 @@ pub unsafe extern "C" fn afc_file_write( let fd = unsafe { &mut *(handle as *mut idevice::afc::file::FileDescriptor) }; let data_slice = unsafe { std::slice::from_raw_parts(data, length) }; - let res: Result<(), IdeviceError> = run_sync(async move { fd.write(data_slice).await }); + let res: Result<(), IdeviceError> = run_sync(async move { fd.write_entire(data_slice).await }); match res { Ok(_) => null_mut(), diff --git a/idevice/src/services/afc/file.rs b/idevice/src/services/afc/file.rs index 4ce7a4b..34432ae 100644 --- a/idevice/src/services/afc/file.rs +++ b/idevice/src/services/afc/file.rs @@ -1,183 +1,98 @@ // Jackson Coxson -use std::io::SeekFrom; +use std::{io::SeekFrom, pin::Pin}; +use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite}; + +use super::inner_file::InnerFileDescriptor; use crate::IdeviceError; -use super::{ - opcode::AfcOpcode, - packet::{AfcPacket, AfcPacketHeader}, -}; - -/// Maximum transfer size for file operations (64KB) -const MAX_TRANSFER: u64 = 64 * 1024; // this is what go-ios uses - -/// Handle for an open file on the device. -/// Call close before dropping pub struct FileDescriptor<'a> { - pub(crate) client: &'a mut super::AfcClient, - pub(crate) fd: u64, - pub(crate) path: String, + inner: Pin>>, } -impl FileDescriptor<'_> { - /// Generic helper to send an AFC packet and read the response - async fn send_packet( - &mut self, - opcode: AfcOpcode, - header_payload: Vec, - payload: Vec, - ) -> Result { - let header_len = header_payload.len() as u64 + AfcPacketHeader::LEN; - let header = AfcPacketHeader { - magic: super::MAGIC, - entire_len: header_len + payload.len() as u64, - header_payload_len: header_len, - packet_num: self.client.package_number, - operation: opcode, - }; - self.client.package_number += 1; - - let packet = AfcPacket { - header, - header_payload, - payload, - }; - - self.client.send(packet).await?; - self.client.read().await +impl<'a> FileDescriptor<'a> { + pub(crate) fn new(inner: Pin>>) -> Self { + Self { inner } } - +} +impl FileDescriptor<'_> { /// Returns the current cursor position for the file pub async fn seek_tell(&mut self) -> Result { - let header_payload = self.fd.to_le_bytes().to_vec(); - let res = self - .send_packet(AfcOpcode::FileTell, header_payload, Vec::new()) - .await?; - - let cur_pos = res - .header_payload - .get(..8) - .ok_or(IdeviceError::UnexpectedResponse)? - .try_into() - .map(u64::from_le_bytes) - .map_err(|_| IdeviceError::UnexpectedResponse)?; - - Ok(cur_pos) - } - - /// Moves the file cursor - pub async fn seek(&mut self, pos: SeekFrom) -> Result<(), IdeviceError> { - let (offset, whence) = match pos { - SeekFrom::Start(off) => (off as i64, 0), - SeekFrom::Current(off) => (off, 1), - SeekFrom::End(off) => (off, 2), - }; - - let mut header_payload = Vec::new(); - header_payload.extend(self.fd.to_le_bytes()); - header_payload.extend((whence as u64).to_le_bytes()); - header_payload.extend(offset.to_le_bytes()); - - self.send_packet(AfcOpcode::FileSeek, header_payload, Vec::new()) - .await?; - - Ok(()) + self.inner.as_mut().seek_tell().await } /// Closes the file descriptor - pub async fn close(mut self) -> Result<(), IdeviceError> { - let header_payload = self.fd.to_le_bytes().to_vec(); - - self.send_packet(AfcOpcode::FileClose, header_payload, Vec::new()) - .await?; - Ok(()) + pub async fn close(self) -> Result<(), IdeviceError> { + self.inner.close().await } /// Reads the entire contents of the file /// /// # Returns /// A vector containing the file's data - pub async fn read(&mut self) -> Result, IdeviceError> { - let seek_pos = self.seek_tell().await? as usize; - let file_info = self.client.get_file_info(&self.path).await?; - let mut bytes_left = file_info.size.saturating_sub(seek_pos); - let mut collected_bytes = Vec::with_capacity(bytes_left); - - while bytes_left > 0 { - let mut header_payload = self.fd.to_le_bytes().to_vec(); - header_payload.extend_from_slice(&MAX_TRANSFER.to_le_bytes()); - let res = self - .send_packet(AfcOpcode::Read, header_payload, Vec::new()) - .await?; - - bytes_left -= res.payload.len(); - collected_bytes.extend(res.payload); - } - - Ok(collected_bytes) - } - - pub async fn read_with_callback( - &mut self, - callback: impl Fn(((usize, usize), S)) -> Fut, - state: S, - ) -> Result, IdeviceError> - where - Fut: std::future::Future, - S: Clone, - { - let seek_pos = self.seek_tell().await? as usize; - let file_info = self.client.get_file_info(&self.path).await?; - let mut bytes_left = file_info.size.saturating_sub(seek_pos); - let mut collected_bytes = Vec::with_capacity(bytes_left); - - while bytes_left > 0 { - let mut header_payload = self.fd.to_le_bytes().to_vec(); - header_payload.extend_from_slice(&MAX_TRANSFER.to_le_bytes()); - let res = self - .send_packet(AfcOpcode::Read, header_payload, Vec::new()) - .await?; - - bytes_left -= res.payload.len(); - collected_bytes.extend(res.payload); - callback(((file_info.size - bytes_left, file_info.size), state.clone())).await; - } - - Ok(collected_bytes) + pub async fn read_entire(&mut self) -> Result, IdeviceError> { + self.inner.as_mut().read().await } /// Writes data to the file /// /// # Arguments /// * `bytes` - Data to write to the file - pub async fn write(&mut self, bytes: &[u8]) -> Result<(), IdeviceError> { - for chunk in bytes.chunks(MAX_TRANSFER as usize) { - let header_payload = self.fd.to_le_bytes().to_vec(); - self.send_packet(AfcOpcode::Write, header_payload, chunk.to_vec()) - .await?; - } - Ok(()) - } - - pub async fn write_with_callback( - &mut self, - bytes: &[u8], - callback: impl Fn(((usize, usize), S)) -> Fut, - state: S, - ) -> Result<(), IdeviceError> - where - Fut: std::future::Future, - S: Clone, - { - let chunks = bytes.chunks(MAX_TRANSFER as usize); - let chunks_len = chunks.len(); - for (i, chunk) in chunks.enumerate() { - let header_payload = self.fd.to_le_bytes().to_vec(); - self.send_packet(AfcOpcode::Write, header_payload, chunk.to_vec()) - .await?; - callback(((i, chunks_len), state.clone())).await; - } - Ok(()) + pub async fn write_entire(&mut self, bytes: &[u8]) -> Result<(), IdeviceError> { + self.inner.as_mut().write(bytes).await + } +} + +impl AsyncRead for FileDescriptor<'_> { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + let inner = self.inner.as_mut(); + inner.poll_read(cx, buf) + } +} + +impl AsyncWrite for FileDescriptor<'_> { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + let inner = self.inner.as_mut(); + inner.poll_write(cx, buf) + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let inner = self.inner.as_mut(); + inner.poll_flush(cx) + } + + fn poll_shutdown( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let inner = self.inner.as_mut(); + inner.poll_shutdown(cx) + } +} + +impl AsyncSeek for FileDescriptor<'_> { + fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> std::io::Result<()> { + let this = self.inner.as_mut(); + this.start_seek(position) + } + + fn poll_complete( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.inner.as_mut(); + this.poll_complete(cx) } } diff --git a/idevice/src/services/afc/inner_file.rs b/idevice/src/services/afc/inner_file.rs new file mode 100644 index 0000000..a1a4acb --- /dev/null +++ b/idevice/src/services/afc/inner_file.rs @@ -0,0 +1,745 @@ +// Jackson Coxson + +use std::{io::SeekFrom, pin::Pin}; + +use futures::{FutureExt, future::BoxFuture}; +use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite}; + +use crate::IdeviceError; + +use super::{ + opcode::AfcOpcode, + packet::{AfcPacket, AfcPacketHeader}, +}; + +/// Maximum transfer size for file operations (64KB) +const MAX_TRANSFER: u64 = 64 * 1024; // this is what go-ios uses + +fn chunk_number(n: usize, chunk_size: usize) -> impl Iterator { + (0..n) + .step_by(chunk_size) + .map(move |i| (n - i).min(chunk_size)) +} + +// Used to descripe what the future returns +pub(crate) enum PendingResult { + // writing + Empty, + // seeking + SeekPos(u64), + // reading + Bytes(Vec), +} + +/// Handle for an open file on the device. +/// Call close before dropping +pub(crate) struct InnerFileDescriptor<'a> { + pub(crate) client: &'a mut super::AfcClient, + pub(crate) fd: u64, + pub(crate) path: String, + + pub(crate) pending_fut: Option>>, + + pub(crate) _m: std::marker::PhantomPinned, +} + +impl<'a> InnerFileDescriptor<'a> { + pub(crate) fn new(client: &'a mut super::AfcClient, fd: u64, path: String) -> Pin> { + Box::pin(Self { + client, + fd, + path, + pending_fut: None, + _m: std::marker::PhantomPinned, + }) + } +} +impl InnerFileDescriptor<'_> { + /// Generic helper to send an AFC packet and read the response + pub async fn send_packet( + self: Pin<&mut Self>, + opcode: AfcOpcode, + header_payload: Vec, + payload: Vec, + ) -> Result { + // SAFETY: we don't modify pinned fileds, it's ok + let this = unsafe { self.get_unchecked_mut() }; + + let header_len = header_payload.len() as u64 + AfcPacketHeader::LEN; + let header = AfcPacketHeader { + magic: super::MAGIC, + entire_len: header_len + payload.len() as u64, + header_payload_len: header_len, + packet_num: this.client.package_number, + operation: opcode, + }; + this.client.package_number += 1; + + let packet = AfcPacket { + header, + header_payload, + payload, + }; + + this.client.send(packet).await?; + this.client.read().await + } + + /// Returns the current cursor position for the file + pub async fn seek_tell(self: Pin<&mut Self>) -> Result { + let header_payload = self.fd.to_le_bytes().to_vec(); + let res = self + .send_packet(AfcOpcode::FileTell, header_payload, Vec::new()) + .await?; + + let cur_pos = res + .header_payload + .get(..8) + .ok_or(IdeviceError::UnexpectedResponse)? + .try_into() + .map(u64::from_le_bytes) + .map_err(|_| IdeviceError::UnexpectedResponse)?; + + Ok(cur_pos) + } + + /// Moves the file cursor + async fn seek(mut self: Pin<&mut Self>, pos: SeekFrom) -> Result { + let (offset, whence) = match pos { + SeekFrom::Start(off) => (off as i64, 0), + SeekFrom::Current(off) => (off, 1), + SeekFrom::End(off) => (off, 2), + }; + + let header_payload = [ + self.fd.to_le_bytes(), + (whence as u64).to_le_bytes(), + offset.to_le_bytes(), + ] + .concat(); + + self.as_mut() + .send_packet(AfcOpcode::FileSeek, header_payload, Vec::new()) + .await?; + + self.as_mut().seek_tell().await + } + + /// Closes the file descriptor + pub async fn close(mut self: Pin>) -> Result<(), IdeviceError> { + let header_payload = self.fd.to_le_bytes().to_vec(); + + self.as_mut() + .send_packet(AfcOpcode::FileClose, header_payload, Vec::new()) + .await?; + Ok(()) + } + + /// Reads n size of contents from the file + /// + /// # Arguments + /// * `n` - amount of bytes to read + /// # Returns + /// A vector containing the file's data + pub async fn read_n(mut self: Pin<&mut Self>, n: usize) -> Result, IdeviceError> { + let mut collected_bytes = Vec::with_capacity(n); + + for chunk in chunk_number(n, MAX_TRANSFER as usize) { + let header_payload = [self.fd.to_le_bytes(), chunk.to_le_bytes()].concat(); + let res = self + .as_mut() + .send_packet(AfcOpcode::Read, header_payload, Vec::new()) + .await?; + + collected_bytes.extend(res.payload); + } + Ok(collected_bytes) + } + + /// Reads the entire contents of the file + /// + /// # Returns + /// A vector containing the file's data + pub async fn read(mut self: Pin<&mut Self>) -> Result, IdeviceError> { + let seek_pos = self.as_mut().seek_tell().await? as usize; + + let file_info = unsafe { + let this = self.as_mut().get_unchecked_mut(); + + this.client.get_file_info(&this.path).await? + }; + + let mut bytes_left = file_info.size.saturating_sub(seek_pos); + let mut collected_bytes = Vec::with_capacity(bytes_left); + + while bytes_left > 0 { + let bytes = self.as_mut().read_n(MAX_TRANSFER as usize).await?; + + bytes_left -= bytes.len(); + collected_bytes.extend(bytes); + } + + Ok(collected_bytes) + } + + /// Writes data to the file + /// + /// # Arguments + /// * `bytes` - Data to write to the file + pub async fn write(mut self: Pin<&mut Self>, bytes: &[u8]) -> Result<(), IdeviceError> { + for chunk in bytes.chunks(MAX_TRANSFER as usize) { + let header_payload = self.as_ref().fd.to_le_bytes().to_vec(); + self.as_mut() + .send_packet(AfcOpcode::Write, header_payload, chunk.to_vec()) + .await?; + } + Ok(()) + } + + fn store_pending_read(mut self: Pin<&mut Self>, buf_rem: usize) { + unsafe { + let this = self.as_mut().get_unchecked_mut() as *mut InnerFileDescriptor; + + let fut = Some( + // SAFETY: we already know that self is pinned + Pin::new_unchecked(&mut *this) + .read_n(buf_rem) + .map(|r| r.map(PendingResult::Bytes)) + .boxed(), + ); + + (&mut *this).pending_fut = fut; + } + } + + fn store_pending_seek(mut self: Pin<&mut Self>, position: std::io::SeekFrom) { + unsafe { + let this = self.as_mut().get_unchecked_mut() as *mut InnerFileDescriptor; + + let fut = Some( + Pin::new_unchecked(&mut *this) + .seek(position) + .map(|r| r.map(PendingResult::SeekPos)) + .boxed(), + ); + + (&mut *this).pending_fut = fut; + } + } + + fn store_pending_write(mut self: Pin<&mut Self>, buf: &'_ [u8]) { + unsafe { + let this = self.as_mut().get_unchecked_mut(); + + let this = this as *mut InnerFileDescriptor; + + // move the entire buffer into the future so we don't have to store it somewhere + let pined_this = Pin::new_unchecked(&mut *this); + let buf = buf.to_vec(); + let fut = + async move { pined_this.write(&buf).await.map(|_| PendingResult::Empty) }.boxed(); + + (&mut *this).pending_fut = Some(fut); + } + } +} + +impl<'a> InnerFileDescriptor<'a> { + fn get_or_init_read_fut( + mut self: Pin<&mut Self>, + buf_rem: usize, + ) -> &mut BoxFuture<'a, Result> { + if self.as_ref().pending_fut.is_none() { + self.as_mut().store_pending_read(buf_rem); + } + + unsafe { self.get_unchecked_mut().pending_fut.as_mut().unwrap() } + } + + fn get_or_init_write_fut( + mut self: Pin<&mut Self>, + buf: &'_ [u8], + ) -> &mut BoxFuture<'a, Result> { + if self.as_ref().pending_fut.is_none() { + self.as_mut().store_pending_write(buf); + } + + unsafe { self.get_unchecked_mut().pending_fut.as_mut().unwrap() } + } + + fn get_seek_fut( + self: Pin<&mut Self>, + ) -> Option<&mut BoxFuture<'a, Result>> { + unsafe { self.get_unchecked_mut().pending_fut.as_mut() } + } + + fn remove_pending_fut(mut self: Pin<&mut Self>) { + unsafe { + self.as_mut().get_unchecked_mut().pending_fut.take(); + } + } +} + +impl AsyncRead for InnerFileDescriptor<'_> { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + let contents = { + let read_func = self.as_mut().get_or_init_read_fut(buf.remaining()); + match std::task::ready!(read_func.as_mut().poll(cx)) { + Ok(PendingResult::Bytes(c)) => { + self.as_mut().remove_pending_fut(); + c + } + Err(e) => return std::task::Poll::Ready(Err(std::io::Error::other(e.to_string()))), + + _ => unreachable!("a non read future was stored, this shouldn't happen"), + } + }; + + buf.put_slice(&contents); + + std::task::Poll::Ready(Ok(())) + } +} + +impl AsyncWrite for InnerFileDescriptor<'_> { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + let write_func = self.as_mut().get_or_init_write_fut(buf); + + match std::task::ready!(write_func.as_mut().poll(cx)) { + Ok(PendingResult::Empty) => self.as_mut().remove_pending_fut(), + Err(e) => { + println!("error: {e}"); + return std::task::Poll::Ready(Err(std::io::Error::other(e.to_string()))); + } + + _ => unreachable!("a non write future was stored, this shouldn't happen"), + } + + std::task::Poll::Ready(Ok(buf.len())) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + _: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + _: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } +} + +impl AsyncSeek for InnerFileDescriptor<'_> { + fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> std::io::Result<()> { + self.store_pending_seek(position); + + Ok(()) + } + + fn poll_complete( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let Some(fut) = self.as_mut().get_seek_fut() else { + // tokio runs the `poll_complete` before the `start_seek` to ensure no previous seek is in progress + return std::task::Poll::Ready(Ok(0)); + }; + + match std::task::ready!(fut.as_mut().poll(cx)) { + Ok(PendingResult::SeekPos(pos)) => { + self.as_mut().remove_pending_fut(); + std::task::Poll::Ready(Ok(pos)) + } + Err(e) => std::task::Poll::Ready(Err(std::io::Error::other(e.to_string()))), + _ => unreachable!("a non seek future was stored, this shouldn't happen"), + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; + + use crate::usbmuxd::{UsbmuxdAddr, UsbmuxdConnection}; + + use super::super::*; + use super::*; + + async fn make_client() -> super::super::AfcClient { + let mut u = UsbmuxdConnection::default() + .await + .expect("failed to connect to usbmuxd"); + let d = u + .get_devices() + .await + .expect("no devices") + .into_iter() + .next() + .expect("no devices connected") + .to_provider(UsbmuxdAddr::default(), "idevice_afc_file_inner_tests"); + + let mut ac = AfcClient::connect(&d) + .await + .expect("failed to connect to afc"); + ac.mk_dir("/tmp").await.unwrap(); + ac + } + + #[tokio::test] + async fn write_and_read_large_file() { + let mut client = make_client().await; + let path = "/tmp/large_file.txt"; + let data = vec![b'x'; 10_000_000]; // 10mb + + { + let mut file = client.open(path, AfcFopenMode::WrOnly).await.unwrap(); + file.write_all(&data).await.unwrap(); + } + + let mut file = client.open(path, AfcFopenMode::RdOnly).await.unwrap(); + let mut buf = Vec::new(); + file.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf.len(), data.len()); + + drop(file); + client.remove(path).await.unwrap(); + } + + #[should_panic] + #[tokio::test] + async fn panic_safety() { + let mut client = make_client().await; + client.list_dir("/invalid").await.unwrap(); + } + + #[tokio::test] + async fn file_seek_and_append() { + let mut client = make_client().await; + let path = "/tmp/seek_append.txt"; + + let mut f = client.open(path, AfcFopenMode::WrOnly).await.unwrap(); + f.write_all(b"start").await.unwrap(); + f.seek(std::io::SeekFrom::Start(0)).await.unwrap(); + f.write_all(b"over").await.unwrap(); + drop(f); + + let mut f = client.open(path, AfcFopenMode::RdOnly).await.unwrap(); + let mut buf = Vec::new(); + f.read_to_end(&mut buf).await.unwrap(); + assert_eq!(&buf, b"overt"); // “over” overwrites start + + drop(f); + client.remove(path).await.unwrap(); + } + + #[tokio::test] + async fn borrow_check_works() { + let mut client = make_client().await; + let fut = client.list_dir("/Downloads"); + // // This line should fail to compile if uncommented: + // let fut2 = client.list_dir("/bar"); + fut.await.unwrap(); + } + + #[tokio::test] + async fn not_send_across_threads() { + let mut client = make_client().await; + // // This should fail to compile if uncommented: + // tokio::spawn(async move { client.list_dir("/").await }); + let _ = client.list_dir("/").await; + } + + #[tokio::test] + async fn write_and_read_roundtrip() { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + let mut client = make_client().await; + + // Create a test file in /tmp (AFC should allow this) + let path = "/tmp/afc_test_file.txt"; + let contents = b"hello async afc world"; + + { + let mut file = client.open(path, AfcFopenMode::WrOnly).await.unwrap(); + file.write_all(contents).await.unwrap(); + } + + { + let mut file = client.open(path, AfcFopenMode::RdOnly).await.unwrap(); + let mut buf = Vec::new(); + file.read_to_end(&mut buf).await.unwrap(); + assert_eq!(buf, contents); + } + + client.remove(path).await.unwrap(); + } + + #[tokio::test] + async fn write_multiple_chunks() { + use tokio::io::AsyncWriteExt; + + let mut client = make_client().await; + let path = "/tmp/afc_chunk_test.txt"; + let mut file = client.open(path, AfcFopenMode::WrOnly).await.unwrap(); + + for i in 0..10 { + let data = format!("chunk{}\n", i); + file.write_all(data.as_bytes()).await.unwrap(); + } + + drop(file); + + let mut file = client.open(path, AfcFopenMode::RdOnly).await.unwrap(); + let mut buf = Vec::new(); + tokio::io::AsyncReadExt::read_to_end(&mut file, &mut buf) + .await + .unwrap(); + let s = String::from_utf8_lossy(&buf); + + for i in 0..10 { + assert!(s.contains(&format!("chunk{}", i))); + } + drop(file); + + client.remove(path).await.unwrap(); + } + + #[tokio::test] + async fn read_partial_and_resume() { + use tokio::io::AsyncReadExt; + + let mut client = make_client().await; + let path = "/tmp/afc_partial_read.txt"; + let contents = b"abcdefghijklmnopqrstuvwxyz"; + + { + let mut file = client.open(path, AfcFopenMode::WrOnly).await.unwrap(); + file.write_all(contents).await.unwrap(); + } + + let mut file = client.open(path, AfcFopenMode::RdOnly).await.unwrap(); + let mut buf = [0u8; 5]; + let n = file.read(&mut buf).await.unwrap(); + assert_eq!(&buf[..n], b"abcde"); + + let mut rest = Vec::new(); + file.read_to_end(&mut rest).await.unwrap(); + assert_eq!(rest, b"fghijklmnopqrstuvwxyz"); + drop(file); + + client.remove(path).await.unwrap(); + } + + #[tokio::test] + async fn zero_length_file() { + use tokio::io::AsyncReadExt; + + let mut client = make_client().await; + let path = "/tmp/afc_empty.txt"; + + { + let _ = client.open(path, AfcFopenMode::WrOnly).await.unwrap(); + } + + let mut file = client.open(path, AfcFopenMode::RdOnly).await.unwrap(); + let mut buf = Vec::new(); + let n = file.read_to_end(&mut buf).await.unwrap(); + assert_eq!(n, 0); + drop(file); + + client.remove(path).await.unwrap(); + } + + #[tokio::test] + async fn write_then_append() { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + let mut client = make_client().await; + let path = "/tmp/afc_append.txt"; + + { + let mut file = client.open(path, AfcFopenMode::WrOnly).await.unwrap(); + file.write_all(b"first\n").await.unwrap(); + file.flush().await.unwrap(); + } + + { + let mut file = client.open(path, AfcFopenMode::Append).await.unwrap(); + file.write_all(b"second\n").await.unwrap(); + file.flush().await.unwrap(); + } + + let mut file = client.open(path, AfcFopenMode::RdOnly).await.unwrap(); + let mut buf = Vec::new(); + file.read_to_end(&mut buf).await.unwrap(); + + assert_eq!(String::from_utf8_lossy(&buf), "first\nsecond\n"); + drop(file); + + client.remove(path).await.unwrap(); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn concurrent_file_access_should_not_ub() { + use std::sync::Arc; + use tokio::task; + + let client = Arc::new(tokio::sync::Mutex::new(make_client().await)); + let path = "/tmp/afc_threaded.txt"; + + let tasks: Vec<_> = (0..10) + .map(|i| { + let client = Arc::clone(&client); + task::spawn(async move { + let mut guard = client.lock().await; + let mut f = guard.open(path, AfcFopenMode::Append).await.unwrap(); + f.write_all(format!("{}\n", i).as_bytes()).await.unwrap(); + f.flush().await.unwrap(); + }) + }) + .collect(); + + for t in tasks { + let _ = t.await; + } + + let mut guard = client.lock().await; + let mut f = guard.open(path, AfcFopenMode::RdOnly).await.unwrap(); + let mut buf = Vec::new(); + tokio::io::AsyncReadExt::read_to_end(&mut f, &mut buf) + .await + .unwrap(); + let s = String::from_utf8_lossy(&buf); + for i in 0..10 { + assert!(s.contains(&i.to_string())); + } + drop(f); + guard.remove(path).await.unwrap(); + } + + #[tokio::test] + async fn panic_during_write_does_not_leak() { + use std::sync::atomic::{AtomicUsize, Ordering}; + + static COUNT: AtomicUsize = AtomicUsize::new(0); + + let mut client = make_client().await; + let path = "/tmp/afc_panic.txt"; + + let result = std::panic::AssertUnwindSafe(async { + let _f = client.open(path, AfcFopenMode::WrOnly).await.unwrap(); + COUNT.fetch_add(1, Ordering::SeqCst); + panic!("simulate crash mid-write"); + }) + .catch_unwind() + .await; + + assert!(result.is_err()); + // Reopen to ensure no handles leaked + let _ = client.open(path, AfcFopenMode::WrOnly).await.unwrap(); + assert_eq!(COUNT.load(Ordering::SeqCst), 1); + } + + #[tokio::test] + async fn open_close_stress() { + let mut client = make_client().await; + let path = "/tmp/afc_stress.txt"; + + for _ in 0..100 { + let mut f = client.open(path, AfcFopenMode::WrOnly).await.unwrap(); + f.write_all(b"hi").await.unwrap(); + drop(f); + } + + // Make sure handle cleanup didn’t break internal state + client.remove(path).await.unwrap(); + } + + #[tokio::test] + async fn concurrent_access_stress() { + let client = Arc::new(tokio::sync::Mutex::new(make_client().await)); + + let mut handles = vec![]; + for i in 0..10 { + let client = client.clone(); + handles.push(tokio::spawn(async move { + let mut client = client.lock().await; + let path = format!("/tmp/testfile_{}", i); + client.mk_dir(&path).await.ok(); + let _ = client.list_dir("/tmp").await; + client.remove(&path).await.ok(); + })); + } + + for h in handles { + let _ = h.await; + } + } + + #[tokio::test] + async fn read_write_mode_works() { + let mut client = make_client().await; + + // Clean up from previous runs + let _ = client.remove("/tmp/rw_test.txt").await; + + // Open for read/write + let mut file = client + .open("/tmp/rw_test.txt", AfcFopenMode::Rw) + .await + .expect("failed to open file in rw mode"); + + // Write some data + let data = b"hello world"; + file.write_all(data).await.expect("failed to write"); + + // Seek back to start + file.seek(std::io::SeekFrom::Start(0)) + .await + .expect("seek failed"); + + // Read it back + let mut buf = vec![0u8; data.len()]; + file.read_exact(&mut buf).await.expect("failed to read"); + assert_eq!(&buf, data); + + // Write again at end + file.seek(std::io::SeekFrom::End(0)).await.unwrap(); + file.write_all(b"!").await.unwrap(); + + // Verify new content + file.seek(std::io::SeekFrom::Start(0)).await.unwrap(); + let mut final_buf = Vec::new(); + file.read_to_end(&mut final_buf).await.unwrap(); + assert_eq!(&final_buf, b"hello world!"); + + file.close().await.expect("failed to close"); + + // Double check via list/read + let contents = client + .open("/tmp/rw_test.txt", AfcFopenMode::RdOnly) + .await + .unwrap() + .read_entire() + .await + .unwrap(); + assert_eq!(contents, b"hello world!"); + + // Clean up + client.remove("/tmp/rw_test.txt").await.unwrap(); + } +} diff --git a/idevice/src/services/afc/mod.rs b/idevice/src/services/afc/mod.rs index f26742e..c650dd8 100644 --- a/idevice/src/services/afc/mod.rs +++ b/idevice/src/services/afc/mod.rs @@ -15,6 +15,7 @@ use crate::{Idevice, IdeviceError, IdeviceService, obf}; pub mod errors; pub mod file; +mod inner_file; pub mod opcode; pub mod packet; @@ -408,11 +409,9 @@ impl AfcClient { return Err(IdeviceError::UnexpectedResponse); } let fd = u64::from_le_bytes(res.header_payload[..8].try_into().unwrap()); - Ok(FileDescriptor { - client: self, - fd, - path, - }) + Ok(FileDescriptor::new(inner_file::InnerFileDescriptor::new( + self, fd, path, + ))) } /// Creates a hard or symbolic link diff --git a/idevice/src/services/crashreportcopymobile.rs b/idevice/src/services/crashreportcopymobile.rs index 4c97220..c7259a2 100644 --- a/idevice/src/services/crashreportcopymobile.rs +++ b/idevice/src/services/crashreportcopymobile.rs @@ -84,7 +84,7 @@ impl CrashReportCopyMobileClient { .open(format!("/{log}"), crate::afc::opcode::AfcFopenMode::RdOnly) .await?; - f.read().await + f.read_entire().await } /// Removes a specified crash log file from the device. diff --git a/idevice/src/utils/installation/helpers.rs b/idevice/src/utils/installation/helpers.rs index bf69c48..f88ce0d 100644 --- a/idevice/src/utils/installation/helpers.rs +++ b/idevice/src/utils/installation/helpers.rs @@ -142,7 +142,7 @@ pub async fn afc_upload_file>( remote_path: &str, ) -> Result<(), IdeviceError> { let mut fd = afc.open(remote_path, AfcFopenMode::WrOnly).await?; - fd.write(file.as_ref()).await?; + fd.write_entire(file.as_ref()).await?; fd.close().await } diff --git a/tools/src/afc.rs b/tools/src/afc.rs index 6cd3c61..e1fbb5f 100644 --- a/tools/src/afc.rs +++ b/tools/src/afc.rs @@ -154,7 +154,7 @@ async fn main() { .await .expect("Failed to open"); - let res = file.read().await.expect("Failed to read"); + let res = file.read_entire().await.expect("Failed to read"); tokio::fs::write(save, res) .await .expect("Failed to write to file"); @@ -168,7 +168,9 @@ async fn main() { .await .expect("Failed to open"); - file.write(&bytes).await.expect("Failed to upload bytes"); + file.write_entire(&bytes) + .await + .expect("Failed to upload bytes"); } else if let Some(matches) = matches.subcommand_matches("remove") { let path = matches.get_one::("path").expect("No path passed"); afc_client.remove(path).await.expect("Failed to remove");