Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 10 additions & 57 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

74 changes: 35 additions & 39 deletions api/src/file/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use core::{
use axerrno::AxError;
use axio::{Buf, BufMut, Read, Write};
use axpoll::{IoEvents, PollSet, Pollable};
use axtask::future::Poller;
use axtask::future::{block_on, poll_io};

use crate::file::{FileLike, Kstat, SealedBuf, SealedBufMut};

Expand Down Expand Up @@ -40,28 +40,26 @@ impl FileLike for EventFd {
return Err(AxError::InvalidInput);
}

Poller::new(self, IoEvents::IN)
.non_blocking(self.nonblocking())
.poll(|| {
let result =
self.count
.fetch_update(Ordering::Release, Ordering::Acquire, |count| {
if count > 0 {
let dec = if self.semaphore { 1 } else { count };
Some(count - dec)
} else {
None
}
});
match result {
Ok(count) => {
dst.write(&count.to_ne_bytes())?;
self.poll_tx.wake();
Ok(size_of::<u64>())
block_on(poll_io(self, IoEvents::IN, self.nonblocking(), || {
let result = self
.count
.fetch_update(Ordering::Release, Ordering::Acquire, |count| {
if count > 0 {
let dec = if self.semaphore { 1 } else { count };
Some(count - dec)
} else {
None
}
Err(_) => Err(AxError::WouldBlock),
});
match result {
Ok(count) => {
dst.write(&count.to_ne_bytes())?;
self.poll_tx.wake();
Ok(size_of::<u64>())
}
})
Err(_) => Err(AxError::WouldBlock),
}
}))
}

fn write(&self, src: &mut SealedBuf) -> axio::Result<usize> {
Expand All @@ -76,26 +74,24 @@ impl FileLike for EventFd {
return Err(AxError::InvalidInput);
}

Poller::new(self, IoEvents::OUT)
.non_blocking(self.nonblocking())
.poll(|| {
let result =
self.count
.fetch_update(Ordering::Release, Ordering::Acquire, |count| {
if u64::MAX - count > value {
Some(count + value)
} else {
None
}
});
match result {
Ok(_) => {
self.poll_rx.wake();
Ok(size_of::<u64>())
block_on(poll_io(self, IoEvents::OUT, self.nonblocking(), || {
let result = self
.count
.fetch_update(Ordering::Release, Ordering::Acquire, |count| {
if u64::MAX - count > value {
Some(count + value)
} else {
None
}
Err(_) => Err(AxError::WouldBlock),
});
match result {
Ok(_) => {
self.poll_rx.wake();
Ok(size_of::<u64>())
}
})
Err(_) => Err(AxError::WouldBlock),
}
}))
}

fn stat(&self) -> axio::Result<Kstat> {
Expand Down
14 changes: 7 additions & 7 deletions api/src/file/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use axfs_ng::{FS_CONTEXT, FsContext};
use axfs_ng_vfs::{Location, Metadata, NodeFlags};
use axpoll::{IoEvents, Pollable};
use axsync::Mutex;
use axtask::future::Poller;
use axtask::future::{block_on, poll_io};
use linux_raw_sys::general::{AT_EMPTY_PATH, AT_FDCWD, AT_SYMLINK_NOFOLLOW};

use super::{FileLike, Kstat, get_file_like};
Expand Down Expand Up @@ -131,9 +131,9 @@ impl FileLike for File {
if likely(self.is_blocking()) {
inner.read(dst)
} else {
Poller::new(self, IoEvents::IN)
.non_blocking(self.nonblocking())
.poll(|| inner.read(dst))
block_on(poll_io(self, IoEvents::IN, self.nonblocking(), || {
inner.read(dst)
}))
}
}

Expand All @@ -142,9 +142,9 @@ impl FileLike for File {
if likely(self.is_blocking()) {
inner.write(src)
} else {
Poller::new(self, IoEvents::OUT)
.non_blocking(self.nonblocking())
.poll(|| inner.write(src))
block_on(poll_io(self, IoEvents::OUT, self.nonblocking(), || {
inner.write(src)
}))
}
}

Expand Down
95 changes: 47 additions & 48 deletions api/src/file/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use axerrno::{AxError, AxResult};
use axio::{Buf, BufMut, Read, Write};
use axpoll::{IoEvents, PollSet, Pollable};
use axsync::Mutex;
use axtask::{current, future::Poller};
use axtask::{
current,
future::{block_on, poll_io},
};
use linux_raw_sys::{general::S_IFIFO, ioctl::FIONREAD};
use memory_addr::PAGE_SIZE_4K;
use ringbuf::{
Expand Down Expand Up @@ -117,28 +120,26 @@ impl FileLike for Pipe {
return Ok(0);
}

Poller::new(self, IoEvents::IN)
.non_blocking(self.nonblocking())
.poll(|| {
let read = {
let cons = self.shared.buffer.lock();
let (left, right) = cons.as_slices();
let mut count = dst.write(left)?;
if count >= left.len() {
count += dst.write(right)?;
}
unsafe { cons.advance_read_index(count) };
count
};
if read > 0 {
self.shared.poll_tx.wake();
Ok(read)
} else if self.closed() {
Ok(0)
} else {
Err(AxError::WouldBlock)
block_on(poll_io(self, IoEvents::IN, self.nonblocking(), || {
let read = {
let cons = self.shared.buffer.lock();
let (left, right) = cons.as_slices();
let mut count = dst.write(left)?;
if count >= left.len() {
count += dst.write(right)?;
}
})
unsafe { cons.advance_read_index(count) };
count
};
if read > 0 {
self.shared.poll_tx.wake();
Ok(read)
} else if self.closed() {
Ok(0)
} else {
Err(AxError::WouldBlock)
}
}))
}

fn write(&self, src: &mut SealedBuf) -> AxResult<usize> {
Expand All @@ -151,34 +152,32 @@ impl FileLike for Pipe {
}

let mut total_written = 0;
let non_blocking = self.nonblocking();
Poller::new(self, IoEvents::OUT)
.non_blocking(non_blocking)
.poll(|| {
if self.closed() {
raise_pipe();
return Err(AxError::BrokenPipe);
}

let written = {
let mut prod = self.shared.buffer.lock();
let (left, right) = prod.vacant_slices_mut();
let mut count = src.read(unsafe { left.assume_init_mut() })?;
if count >= left.len() {
count += src.read(unsafe { right.assume_init_mut() })?;
}
unsafe { prod.advance_write_index(count) };
count
};
if written > 0 {
self.shared.poll_rx.wake();
total_written += written;
if total_written == size || non_blocking {
return Ok(total_written);
}
block_on(poll_io(self, IoEvents::OUT, self.nonblocking(), || {
if self.closed() {
raise_pipe();
return Err(AxError::BrokenPipe);
}

let written = {
let mut prod = self.shared.buffer.lock();
let (left, right) = prod.vacant_slices_mut();
let mut count = src.read(unsafe { left.assume_init_mut() })?;
if count >= left.len() {
count += src.read(unsafe { right.assume_init_mut() })?;
}
Err(AxError::WouldBlock)
})
unsafe { prod.advance_write_index(count) };
count
};
if written > 0 {
self.shared.poll_rx.wake();
total_written += written;
if total_written == size || self.nonblocking() {
return Ok(total_written);
}
}
Err(AxError::WouldBlock)
}))
}

fn stat(&self) -> AxResult<Kstat> {
Expand Down
Loading