diff --git a/chardev_backend/src/chardev.rs b/chardev_backend/src/chardev.rs index d13d306e20858c90b345ffbb6c234f103583d99f..b8ae9fe0bad29cb19cbb2e7864e3a80acc863350 100644 --- a/chardev_backend/src/chardev.rs +++ b/chardev_backend/src/chardev.rs @@ -35,7 +35,8 @@ use machine_manager::{ }; use util::file::clear_file; use util::loop_context::{ - gen_delete_notifiers, EventNotifier, EventNotifierHelper, NotifierCallback, NotifierOperation, + create_new_eventfd, gen_delete_notifiers, read_fd, EventNotifier, EventNotifierHelper, + NotifierCallback, NotifierOperation, }; use util::set_termi_raw_mode; use util::socket::{SocketListener, SocketStream}; @@ -91,13 +92,15 @@ pub struct Chardev { unpause_timer: Option, /// output listener to notify when output stream fd can be written output_listener_fd: Option>, + /// Event to kick output + kick_out_evt: Arc, /// output buffer queue outbuf: VecDeque>, } impl Chardev { - pub fn new(chardev_cfg: ChardevConfig) -> Self { - Chardev { + pub fn new(chardev_cfg: ChardevConfig) -> Result { + Ok(Chardev { id: chardev_cfg.id(), backend: chardev_cfg.classtype, listener: None, @@ -109,8 +112,9 @@ impl Chardev { wait_port: false, unpause_timer: None, output_listener_fd: None, + kick_out_evt: Arc::new(create_new_eventfd()?), outbuf: VecDeque::with_capacity(BUF_QUEUE_SIZE), - } + }) } pub fn realize(&mut self) -> Result<()> { @@ -265,6 +269,10 @@ impl Chardev { self.outbuf.len() == self.outbuf.capacity() } + pub fn outbuf_free_size(&self) -> usize { + self.outbuf.capacity() - self.outbuf.len() + } + pub fn fill_outbuf(&mut self, buf: Vec, listener_fd: Option>) -> Result<()> { match self.backend { ChardevType::File { .. } | ChardevType::Pty { .. } | ChardevType::Stdio { .. } => { @@ -273,14 +281,10 @@ impl Chardev { } return write_buffer_sync(self.output.as_ref().unwrap().clone(), buf); } - ChardevType::Socket { .. } => { - if self.output.is_none() { - return Ok(()); - } - if listener_fd.is_none() { - return write_buffer_sync(self.output.as_ref().unwrap().clone(), buf); - } - } + ChardevType::Socket { .. } => (), + } + if self.output.is_none() { + return Ok(()); } if self.outbuf_is_full() { @@ -288,18 +292,15 @@ impl Chardev { } self.outbuf.push_back(buf); self.output_listener_fd = listener_fd; + let _ = self.kick_out_evt.as_ref().write(1); - let event_notifier = EventNotifier::new( - NotifierOperation::AddEvents, - self.stream_fd.unwrap(), - None, - EventSet::OUT, - Vec::new(), - ); - EventLoop::update_event(vec![event_notifier], None)?; Ok(()) } + pub fn set_outbuf_listener(&mut self, listener_fd: Option>) { + self.output_listener_fd = listener_fd; + } + fn consume_outbuf(&mut self) -> Result<()> { if self.output.is_none() { bail!("no output interface"); @@ -490,6 +491,7 @@ fn get_socket_notifier(chardev: Arc>) -> Option { let stream_fd = stream.as_raw_fd(); let stream_arc = Arc::new(Mutex::new(stream)); let listener_fd = locked_chardev.listener.as_ref().unwrap().as_raw_fd(); + let kick_out_fd = locked_chardev.kick_out_evt.as_ref().as_raw_fd(); let notify_dev = locked_chardev.dev.clone(); locked_chardev.stream_fd = Some(stream_fd); @@ -509,6 +511,11 @@ fn get_socket_notifier(chardev: Arc>) -> Option { locked_chardev.output = None; locked_chardev.stream_fd = None; locked_chardev.cancel_unpause_timer(); + locked_chardev.outbuf.clear(); + if locked_chardev.output_listener_fd.is_some() { + let _ = locked_chardev.output_listener_fd.as_ref().unwrap().write(1); + locked_chardev.output_listener_fd = None; + } info!( "Chardev \'{}\' event, connection closed: {}", &locked_chardev.id, connection_info @@ -589,53 +596,78 @@ fn get_socket_notifier(chardev: Arc>) -> Option { }); let handling_chardev = cloned_chardev.clone(); - let output_handler = Rc::new(move |event, fd| { - if event & EventSet::OUT != EventSet::OUT { - return None; - } + let send_buffers = Rc::new(move || { + let mut locked_chardev = handling_chardev.lock().unwrap(); - let mut locked_cdev = handling_chardev.lock().unwrap(); - if let Err(e) = locked_cdev.consume_outbuf() { + if let Err(e) = locked_chardev.consume_outbuf() { error!("Failed to consume outbuf with error {:?}", e); - locked_cdev.clear_outbuf(); + locked_chardev.clear_outbuf(); return Some(vec![EventNotifier::new( NotifierOperation::DeleteEvents, - fd, + stream_fd, None, EventSet::OUT, Vec::new(), )]); } - if locked_cdev.output_listener_fd.is_some() { - let fd = locked_cdev.output_listener_fd.as_ref().unwrap(); + if locked_chardev.output_listener_fd.is_some() { + let fd = locked_chardev.output_listener_fd.as_ref().unwrap(); if let Err(e) = fd.write(1) { error!("Failed to write eventfd with error {:?}", e); return None; } - locked_cdev.output_listener_fd = None; + locked_chardev.output_listener_fd = None; } - if locked_cdev.outbuf.is_empty() { + if locked_chardev.outbuf.is_empty() { Some(vec![EventNotifier::new( NotifierOperation::DeleteEvents, - fd, + stream_fd, None, EventSet::OUT, Vec::new(), )]) } else { - None + Some(vec![EventNotifier::new( + NotifierOperation::AddEvents, + stream_fd, + None, + EventSet::OUT, + Vec::new(), + )]) } }); - Some(vec![EventNotifier::new( - NotifierOperation::AddShared, - stream_fd, - Some(listener_fd), - EventSet::IN | EventSet::HANG_UP, - vec![input_handler, output_handler], - )]) + let send_buffers_cb = send_buffers.clone(); + let outavail_handler = Rc::new(move |event, _| { + if event & EventSet::OUT != EventSet::OUT { + return None; + } + send_buffers_cb() + }); + + let send_handler = Rc::new(move |_event, fd| { + read_fd(fd); + send_buffers() + }); + + Some(vec![ + EventNotifier::new( + NotifierOperation::AddShared, + stream_fd, + Some(listener_fd), + EventSet::IN | EventSet::HANG_UP, + vec![input_handler, outavail_handler], + ), + EventNotifier::new( + NotifierOperation::AddShared, + kick_out_fd, + None, + EventSet::IN, + vec![send_handler], + ), + ]) }); let listener_fd = listener.as_ref().unwrap().as_raw_fd(); diff --git a/devices/src/legacy/pl011.rs b/devices/src/legacy/pl011.rs index b656b875df1e0d9ef79d6dcf14440dc95985abd6..d710281722f7561007d89d771427c58deaebd451 100644 --- a/devices/src/legacy/pl011.rs +++ b/devices/src/legacy/pl011.rs @@ -10,6 +10,8 @@ // NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. // See the Mulan PSL v2 for more details. +use std::os::unix::prelude::{AsRawFd, RawFd}; +use std::rc::Rc; use std::sync::{Arc, Mutex}; use anyhow::{Context, Result}; @@ -34,12 +36,18 @@ use migration::{ use migration_derive::{ByteCode, Desc}; use util::byte_code::ByteCode; use util::gen_base_func; -use util::loop_context::{create_new_eventfd, EventNotifierHelper}; +use util::loop_context::{ + create_new_eventfd, read_fd, EventNotifier, EventNotifierHelper, NotifierCallback, + NotifierOperation, +}; use util::num_ops::read_data_u32; +use vmm_sys_util::epoll::EventSet; +use vmm_sys_util::eventfd::EventFd; -const PL011_FLAG_TXFE: u8 = 0x80; -const PL011_FLAG_RXFF: u8 = 0x40; -const PL011_FLAG_RXFE: u8 = 0x10; +const PL011_FLAG_TXFE: u32 = 0x80; +const PL011_FLAG_RXFF: u32 = 0x40; +const PL011_FLAG_TXFF: u32 = 0x20; +const PL011_FLAG_RXFE: u32 = 0x10; // Interrupt bits in UARTRIS, UARTMIS and UARTIMSC // Receive timeout interrupt bit @@ -53,6 +61,7 @@ const INT_E: u32 = 1 << 7 | 1 << 8 | 1 << 9 | 1 << 10; // nUARTRI/nUARTCTS/nUARTDCD/nUARTDSR modem interrupt bits, bits 0~3. const INT_MS: u32 = 1 | 1 << 1 | 1 << 2 | 1 << 3; +// Keep in sync with PL011::id! Fifo size depends on revision and affects a lot of things. const PL011_FIFO_SIZE: usize = 16; /// Device state of PL011. @@ -125,6 +134,8 @@ pub struct PL011 { state: PL011State, /// Character device for redirection. chardev: Arc>, + /// Event to register for tx-ready signal. + tx_ready_evt: Arc, } impl PL011 { @@ -143,7 +154,8 @@ impl PL011 { }, paused: false, state: PL011State::new(), - chardev: Arc::new(Mutex::new(Chardev::new(cfg.chardev))), + chardev: Arc::new(Mutex::new(Chardev::new(cfg.chardev)?)), + tx_ready_evt: Arc::new(create_new_eventfd()?), }; pl011 .set_sys_resource(sysbus, region_base, region_size, "PL011") @@ -163,6 +175,31 @@ impl PL011 { } } + fn register_tx_ready_handler(dev: Arc>) { + let fd = dev.lock().unwrap().tx_ready_evt.as_raw_fd(); + + let tx_ready_handler: Rc = Rc::new(move |_, fd: RawFd| { + read_fd(fd); + + let mut locked_dev = dev.lock().unwrap(); + // protect from spurious event + if !locked_dev.chardev.lock().unwrap().outbuf_is_full() { + locked_dev.state.int_level |= INT_TX; + locked_dev.interrupt(); + } + None + }); + + let notifier = EventNotifier::new( + NotifierOperation::AddShared, + fd, + None, + EventSet::IN, + vec![tx_ready_handler], + ); + let _ = EventLoop::update_event(vec![notifier], None); + } + fn unpause_rx(&mut self) { if self.paused { trace::pl011_unpause_rx(); @@ -227,6 +264,9 @@ impl Device for PL011 { dev.clone(), PL011_SNAPSHOT_ID, ); + + Self::register_tx_ready_handler(dev.clone()); + let locked_dev = dev.lock().unwrap(); locked_dev.chardev.lock().unwrap().set_receiver(&dev); EventLoop::update_event( @@ -279,7 +319,15 @@ impl SysBusDevOps for PL011 { ret = self.state.rsr; } 6 => { - ret = self.state.flags; + let mut val = self.state.flags & !(PL011_FLAG_TXFE | PL011_FLAG_TXFF); + + let free_space = self.chardev.lock().unwrap().outbuf_free_size(); + if free_space >= PL011_FIFO_SIZE { + val |= PL011_FLAG_TXFE; + } else if free_space == 0 { + val |= PL011_FLAG_TXFF; + } + ret = val; } 8 => { ret = self.state.ilpr; @@ -340,12 +388,22 @@ impl SysBusDevOps for PL011 { match offset >> 2 { 0 => { let ch = value as u8; - if let Err(e) = self.chardev.lock().unwrap().fill_outbuf(vec![ch], None) { + // We must not get here if outbuf is full. + // So, it is safe to drop the char in case of error. + let mut locked_chardev = self.chardev.lock().unwrap(); + if let Err(e) = locked_chardev.fill_outbuf(vec![ch], None) { error!("Failed to append pl011 data to outbuf of chardev, {:?}", e); - return false; + // Fallback to signal INT_TX always. Otherwise guest will stop using port forever. + } + // If outbuf is not full, signal INT_TX. Otherwise, enable listener callback. + if locked_chardev.outbuf_is_full() { + self.state.int_level &= !INT_TX; + locked_chardev.set_outbuf_listener(Some(self.tx_ready_evt.clone())); + } else { + drop(locked_chardev); + self.state.int_level |= INT_TX; + self.interrupt(); } - self.state.int_level |= INT_TX; - self.interrupt(); } 1 => { self.state.rsr = 0; @@ -413,6 +471,7 @@ impl StateTransfer for PL011 { self.state = *PL011State::from_bytes(state) .with_context(|| MigrationError::FromBytesError("PL011"))?; + let _ = self.tx_ready_evt.write(1); // signal resume TX, if it was paused self.unpause_rx(); Ok(()) } diff --git a/devices/src/legacy/serial.rs b/devices/src/legacy/serial.rs index e21112b3cf214dcfa7beb85c9e34c253af509b2b..3442be7aeb0e98a6d5241420cdf34d76877cdb8e 100644 --- a/devices/src/legacy/serial.rs +++ b/devices/src/legacy/serial.rs @@ -11,6 +11,8 @@ // See the Mulan PSL v2 for more details. use std::collections::VecDeque; +use std::os::unix::prelude::{AsRawFd, RawFd}; +use std::rc::Rc; use std::sync::{Arc, Mutex}; use anyhow::{bail, Context, Result}; @@ -34,7 +36,12 @@ use migration::{ use migration_derive::{ByteCode, Desc}; use util::byte_code::ByteCode; use util::gen_base_func; -use util::loop_context::{create_new_eventfd, EventNotifierHelper}; +use util::loop_context::{ + create_new_eventfd, read_fd, EventNotifier, EventNotifierHelper, NotifierCallback, + NotifierOperation, +}; +use vmm_sys_util::epoll::EventSet; +use vmm_sys_util::eventfd::EventFd; pub const SERIAL_ADDR: u64 = 0x3f8; @@ -121,6 +128,8 @@ pub struct Serial { state: SerialState, /// Character device for redirection. chardev: Arc>, + /// Event to register for tx-ready signal. + tx_ready_evt: Arc, } impl Serial { @@ -135,7 +144,8 @@ impl Serial { paused: false, rbr: VecDeque::new(), state: SerialState::new(), - chardev: Arc::new(Mutex::new(Chardev::new(cfg.chardev))), + chardev: Arc::new(Mutex::new(Chardev::new(cfg.chardev)?)), + tx_ready_evt: Arc::new(create_new_eventfd()?), }; serial.base.interrupt_evt = Some(Arc::new(create_new_eventfd()?)); serial @@ -145,6 +155,32 @@ impl Serial { Ok(serial) } + fn register_tx_ready_handler(dev: Arc>) { + let fd = dev.lock().unwrap().tx_ready_evt.as_raw_fd(); + + let tx_ready_handler: Rc = Rc::new(move |_, fd: RawFd| { + read_fd(fd); + + let mut locked_dev = dev.lock().unwrap(); + // protect from spurious event + if !locked_dev.chardev.lock().unwrap().outbuf_is_full() { + locked_dev.state.lsr |= UART_LSR_TEMT | UART_LSR_THRE; + locked_dev.state.thr_pending = 1; + locked_dev.update_iir(); + } + None + }); + + let notifier = EventNotifier::new( + NotifierOperation::AddShared, + fd, + None, + EventSet::IN, + vec![tx_ready_handler], + ); + let _ = EventLoop::update_event(vec![notifier], None); + } + fn unpause_rx(&mut self) { if self.paused { trace::serial_unpause_rx(); @@ -257,14 +293,15 @@ impl Serial { // * fail to flush serial. fn write_internal(&mut self, offset: u64, data: u8) -> Result<()> { trace::serial_write(offset, data); + let mut maybe_err = Ok(()); match offset { 0 => { if self.state.lcr & UART_LCR_DLAB != 0 { self.state.div = (self.state.div & 0xff00) | u16::from(data); } else { - self.state.thr_pending = 1; - if self.state.mcr & UART_MCR_LOOP != 0 { + self.state.thr_pending = 1; + // loopback mode let len = self.rbr.len(); if len >= RECEIVER_BUFF_SIZE { @@ -276,10 +313,25 @@ impl Serial { self.rbr.push_back(data); self.state.lsr |= UART_LSR_DR; - } else if let Err(e) = - self.chardev.lock().unwrap().fill_outbuf(vec![data], None) - { - bail!("Failed to append data to output buffer of chardev, {:?}", e); + } else { + let mut locked_chardev = self.chardev.lock().unwrap(); + if let Err(e) = locked_chardev.fill_outbuf(vec![data], None) { + maybe_err = Err(e).with_context(|| { + "Failed to append rs232 data to outbuf of chardev" + }); + // We need signal INT_TX anyway. Otherwise guest will stop using port forever. + // Fallback to signalling + } + + // If outbuf is not full, signal INT_TX. Otherwise, enable listener callback. + if locked_chardev.outbuf_is_full() { + self.state.lsr &= !(UART_LSR_TEMT | UART_LSR_THRE); + locked_chardev.set_outbuf_listener(Some(self.tx_ready_evt.clone())); + } else { + drop(locked_chardev); + self.state.thr_pending = 1; + self.state.lsr |= UART_LSR_TEMT | UART_LSR_THRE; + } } self.update_iir(); @@ -304,6 +356,10 @@ impl Serial { if data & UART_MCR_LOOP == 0 { // loopback turned off. Unpause rx self.unpause_rx(); + } else if self.state.lsr & UART_LSR_TEMT == 0 { + self.state.thr_pending = 1; + self.state.lsr |= UART_LSR_TEMT | UART_LSR_THRE; + self.update_iir(); } self.state.mcr = data; } @@ -313,7 +369,7 @@ impl Serial { _ => {} } - Ok(()) + maybe_err } } @@ -369,6 +425,9 @@ impl Device for Serial { dev.clone(), SERIAL_SNAPSHOT_ID, ); + + Self::register_tx_ready_handler(dev.clone()); + let locked_dev = dev.lock().unwrap(); locked_dev.chardev.lock().unwrap().set_receiver(&dev); EventLoop::update_event( @@ -450,6 +509,7 @@ impl StateTransfer for Serial { } self.rbr = rbr; self.state = serial_state; + let _ = self.tx_ready_evt.write(1); // signal resume TX, if it was paused self.unpause_rx(); Ok(()) @@ -507,7 +567,7 @@ mod test { // for write_internal with first argument to work, // you need to set output at first assert!(usart.write_internal(0, 0x03).is_err()); - let mut chardev = Chardev::new(chardev_cfg); + let mut chardev = Chardev::new(chardev_cfg).unwrap(); chardev.output = Some(Arc::new(Mutex::new(std::io::stdout()))); usart.chardev = Arc::new(Mutex::new(chardev)); diff --git a/machine/src/lib.rs b/machine/src/lib.rs index 8a47faacd8272355fede333bc5ea0d3b8fa1baf4..4e054a4c52cf842394bbe5f4e6be5a9032573b3b 100644 --- a/machine/src/lib.rs +++ b/machine/src/lib.rs @@ -837,7 +837,7 @@ pub trait MachineOps: MachineLifecycle { ) })?; - let mut serial_port = SerialPort::new(serialport_cfg, chardev_cfg); + let mut serial_port = SerialPort::new(serialport_cfg, chardev_cfg)?; let port = Arc::new(Mutex::new(serial_port.clone())); serial_port.realize()?; if !is_console { diff --git a/virtio/src/device/serial.rs b/virtio/src/device/serial.rs index f8a1e35feb85133ac20839a8f702ec79041d7b82..e901bc66e68d57dc9ea3e28214e9daecb9b9e0fd 100644 --- a/virtio/src/device/serial.rs +++ b/virtio/src/device/serial.rs @@ -350,7 +350,7 @@ pub struct SerialPort { } impl SerialPort { - pub fn new(port_cfg: VirtioSerialPortCfg, chardev_cfg: ChardevConfig) -> Self { + pub fn new(port_cfg: VirtioSerialPortCfg, chardev_cfg: ChardevConfig) -> Result { // Console is default host connected. And pty chardev has opened by default in realize() // function. let is_console = matches!(port_cfg.classtype.as_str(), "virtconsole"); @@ -359,16 +359,16 @@ impl SerialPort { host_connected = true; } - SerialPort { + Ok(SerialPort { name: Some(port_cfg.id), paused: false, - chardev: Arc::new(Mutex::new(Chardev::new(chardev_cfg))), + chardev: Arc::new(Mutex::new(Chardev::new(chardev_cfg)?)), nr: port_cfg.nr.unwrap(), is_console, guest_connected: false, host_connected, ctrl_handler: None, - } + }) } pub fn realize(&mut self) -> Result<()> { @@ -459,7 +459,17 @@ impl SerialPortHandler { let locked_port = port.lock().unwrap(); let locked_cdev = locked_port.chardev.lock().unwrap(); if locked_cdev.outbuf_is_full() { + // disable further notifications until space appears + queue_lock + .vring + .suppress_queue_notify(&self.mem_space, self.driver_features, true) + .with_context(|| "Failed to disable tx queue notify")?; break; + } else { + queue_lock + .vring + .suppress_queue_notify(&self.mem_space, self.driver_features, false) + .with_context(|| "Failed to enable tx queue notify")?; } }