diff --git a/chardev_backend/src/chardev.rs b/chardev_backend/src/chardev.rs index d13d306e20858c90b345ffbb6c234f103583d99f..6009c481285eeab8a49600a1b092eb9323149069 100644 --- a/chardev_backend/src/chardev.rs +++ b/chardev_backend/src/chardev.rs @@ -19,7 +19,7 @@ use std::rc::Rc; use std::sync::{Arc, Mutex}; use std::time::Duration; -use anyhow::{bail, Context, Result}; +use anyhow::{anyhow, bail, Context, Result}; use log::{error, info, warn}; use nix::fcntl::{fcntl, FcntlArg, OFlag}; use nix::pty::openpty; @@ -35,13 +35,15 @@ use machine_manager::{ }; use util::file::clear_file; use util::loop_context::{ - gen_delete_notifiers, EventNotifier, EventNotifierHelper, NotifierCallback, NotifierOperation, + create_new_eventfd, gen_delete_notifiers, read_fd, EventNotifier, EventNotifierHelper, + NotifierCallback, NotifierOperation, }; use util::set_termi_raw_mode; use util::socket::{SocketListener, SocketStream}; use util::unix::limit_permission; const BUF_QUEUE_SIZE: usize = 128; +const CHAR_BUF_QUEUE_SIZE: usize = 1024; /// Provide the trait that helps handle the input data. pub trait InputReceiver: Send { @@ -91,13 +93,23 @@ pub struct Chardev { unpause_timer: Option, /// output listener to notify when output stream fd can be written output_listener_fd: Option>, + /// Event to kick output + kick_out_evt: Arc, /// output buffer queue outbuf: VecDeque>, + /// output ring buffer queue for single char devices (e.g. pl011) + char_outbuf: Vec, + /// char_outbuf status + write_ptr: usize, + read_ptr: usize, + is_charbuf_full: bool, + /// Whether device sends data char-by-char or by blocks + is_sendchar_dev: bool, } impl Chardev { - pub fn new(chardev_cfg: ChardevConfig) -> Self { - Chardev { + pub fn new(chardev_cfg: ChardevConfig) -> Result { + Ok(Chardev { id: chardev_cfg.id(), backend: chardev_cfg.classtype, listener: None, @@ -109,8 +121,14 @@ impl Chardev { wait_port: false, unpause_timer: None, output_listener_fd: None, + kick_out_evt: Arc::new(create_new_eventfd()?), outbuf: VecDeque::with_capacity(BUF_QUEUE_SIZE), - } + char_outbuf: vec![0; CHAR_BUF_QUEUE_SIZE], + write_ptr: 0, + read_ptr: 0, + is_charbuf_full: false, + is_sendchar_dev: false, + }) } pub fn realize(&mut self) -> Result<()> { @@ -185,8 +203,13 @@ impl Chardev { Ok(()) } - pub fn set_receiver(&mut self, dev: &Arc>) { + pub fn set_receiver( + &mut self, + dev: &Arc>, + is_sendchar_dev: bool, + ) { self.receiver = Some(dev.clone()); + self.is_sendchar_dev = is_sendchar_dev; if self.wait_port { warn!("Serial port for chardev \'{}\' appeared.", &self.id); self.wait_port = false; @@ -259,58 +282,135 @@ impl Chardev { fn clear_outbuf(&mut self) { self.outbuf.clear(); + self.write_ptr = 0; + self.read_ptr = 0; + self.is_charbuf_full = false; + } + + fn char_outbuf_busy_len(&self) -> usize { + if self.is_charbuf_full { + CHAR_BUF_QUEUE_SIZE + } else if self.write_ptr < self.read_ptr { + self.write_ptr + CHAR_BUF_QUEUE_SIZE - self.read_ptr + } else { + self.write_ptr - self.read_ptr + } } pub fn outbuf_is_full(&self) -> bool { - self.outbuf.len() == self.outbuf.capacity() + self.is_charbuf_full || self.outbuf.len() == self.outbuf.capacity() } - pub fn fill_outbuf(&mut self, buf: Vec, listener_fd: Option>) -> Result<()> { + pub fn outbuf_free_size(&self) -> usize { + if self.is_sendchar_dev { + CHAR_BUF_QUEUE_SIZE - self.char_outbuf_busy_len() + } else { + self.outbuf.capacity() - self.outbuf.len() + } + } + + fn write_buf_check(&self) -> Result { match self.backend { ChardevType::File { .. } | ChardevType::Pty { .. } | ChardevType::Stdio { .. } => { - if self.output.is_none() { - bail!("chardev has no output"); - } - return write_buffer_sync(self.output.as_ref().unwrap().clone(), buf); - } - ChardevType::Socket { .. } => { - if self.output.is_none() { - return Ok(()); - } - if listener_fd.is_none() { - return write_buffer_sync(self.output.as_ref().unwrap().clone(), buf); - } + return Ok(true); } + ChardevType::Socket { .. } => (), } if self.outbuf_is_full() { - bail!("Failed to append buffer because output buffer queue is full"); + Err(anyhow!( + "Failed to append buffer because output buffer queue is full" + )) + } else { + Ok(false) } + } + + pub fn write_char(&mut self, ch: u8) -> Result<()> { + if !self.is_sendchar_dev { + bail!("Device can only send data char-by-char"); + } + + if self.output.is_none() { + return Ok(()); + } + if self.write_buf_check()? { + return write_buffer_sync(self.output.as_ref().unwrap().clone(), vec![ch]); + } + + if self.is_charbuf_full { + bail!("Charbuf is full!"); + } + // push value to ring buffer + self.char_outbuf[self.write_ptr] = ch; + self.write_ptr = (self.write_ptr + 1) % CHAR_BUF_QUEUE_SIZE; + if self.write_ptr == self.read_ptr { + self.is_charbuf_full = true; + } + + let _ = self.kick_out_evt.as_ref().write(1); + + Ok(()) + } + + pub fn fill_outbuf(&mut self, buf: Vec, listener_fd: Option>) -> Result<()> { + if self.is_sendchar_dev { + bail!("Device can only send data by blocks"); + } + + if self.output.is_none() { + return Ok(()); + } + if self.write_buf_check()? { + return write_buffer_sync(self.output.as_ref().unwrap().clone(), buf); + } + self.outbuf.push_back(buf); self.output_listener_fd = listener_fd; + let _ = self.kick_out_evt.as_ref().write(1); - let event_notifier = EventNotifier::new( - NotifierOperation::AddEvents, - self.stream_fd.unwrap(), - None, - EventSet::OUT, - Vec::new(), - ); - EventLoop::update_event(vec![event_notifier], None)?; Ok(()) } + pub fn set_outbuf_listener(&mut self, listener_fd: Option>) { + self.output_listener_fd = listener_fd; + } + fn consume_outbuf(&mut self) -> Result<()> { if self.output.is_none() { bail!("no output interface"); } let output = self.output.as_ref().unwrap(); - while !self.outbuf.is_empty() { - if write_buffer_async(output.clone(), self.outbuf.front_mut().unwrap())? { - break; + + if !self.is_sendchar_dev { + while !self.outbuf.is_empty() { + let front = self.outbuf.front_mut().unwrap(); + let outbuf_len = front.len(); + if write_buffer_async(output.clone(), front)? != outbuf_len { + break; + } + self.outbuf.pop_front(); } - self.outbuf.pop_front(); + + return Ok(()); } + + let mut data_vector; + + if self.write_ptr < self.read_ptr || self.is_charbuf_full { + data_vector = self.char_outbuf[self.read_ptr..].to_vec(); + data_vector.extend_from_slice(&self.char_outbuf[0..self.write_ptr]); + } else { + data_vector = self.char_outbuf[self.read_ptr..self.write_ptr].to_vec(); + } + + let written_cnt = write_buffer_async(output.clone(), &mut data_vector)?; + + if written_cnt != 0 { + self.read_ptr = (self.read_ptr + written_cnt) % CHAR_BUF_QUEUE_SIZE; + self.is_charbuf_full = false; + } + Ok(()) } } @@ -336,7 +436,7 @@ fn write_buffer_sync(writer: Arc>, buf: Vec>, buf: &mut Vec, -) -> Result { +) -> Result { let len = buf.len(); let mut locked_writer = writer.lock().unwrap(); let mut written = 0; @@ -358,11 +458,10 @@ fn write_buffer_async( .flush() .with_context(|| "chardev failed to flush")?; - if written == len { - return Ok(false); + if written != len { + buf.drain(0..written); } - buf.drain(0..written); - Ok(true) + Ok(written) } fn set_pty_raw_mode() -> Result<(i32, PathBuf)> { @@ -490,6 +589,7 @@ fn get_socket_notifier(chardev: Arc>) -> Option { let stream_fd = stream.as_raw_fd(); let stream_arc = Arc::new(Mutex::new(stream)); let listener_fd = locked_chardev.listener.as_ref().unwrap().as_raw_fd(); + let kick_out_fd = locked_chardev.kick_out_evt.as_ref().as_raw_fd(); let notify_dev = locked_chardev.dev.clone(); locked_chardev.stream_fd = Some(stream_fd); @@ -509,6 +609,11 @@ fn get_socket_notifier(chardev: Arc>) -> Option { locked_chardev.output = None; locked_chardev.stream_fd = None; locked_chardev.cancel_unpause_timer(); + locked_chardev.outbuf.clear(); + if locked_chardev.output_listener_fd.is_some() { + let _ = locked_chardev.output_listener_fd.as_ref().unwrap().write(1); + locked_chardev.output_listener_fd = None; + } info!( "Chardev \'{}\' event, connection closed: {}", &locked_chardev.id, connection_info @@ -589,53 +694,78 @@ fn get_socket_notifier(chardev: Arc>) -> Option { }); let handling_chardev = cloned_chardev.clone(); - let output_handler = Rc::new(move |event, fd| { - if event & EventSet::OUT != EventSet::OUT { - return None; - } + let send_buffers = Rc::new(move || { + let mut locked_chardev = handling_chardev.lock().unwrap(); - let mut locked_cdev = handling_chardev.lock().unwrap(); - if let Err(e) = locked_cdev.consume_outbuf() { + if let Err(e) = locked_chardev.consume_outbuf() { error!("Failed to consume outbuf with error {:?}", e); - locked_cdev.clear_outbuf(); + locked_chardev.clear_outbuf(); return Some(vec![EventNotifier::new( NotifierOperation::DeleteEvents, - fd, + stream_fd, None, EventSet::OUT, Vec::new(), )]); } - if locked_cdev.output_listener_fd.is_some() { - let fd = locked_cdev.output_listener_fd.as_ref().unwrap(); + if locked_chardev.output_listener_fd.is_some() { + let fd = locked_chardev.output_listener_fd.as_ref().unwrap(); if let Err(e) = fd.write(1) { error!("Failed to write eventfd with error {:?}", e); return None; } - locked_cdev.output_listener_fd = None; + locked_chardev.output_listener_fd = None; } - if locked_cdev.outbuf.is_empty() { + if locked_chardev.outbuf.is_empty() { Some(vec![EventNotifier::new( NotifierOperation::DeleteEvents, - fd, + stream_fd, None, EventSet::OUT, Vec::new(), )]) } else { - None + Some(vec![EventNotifier::new( + NotifierOperation::AddEvents, + stream_fd, + None, + EventSet::OUT, + Vec::new(), + )]) } }); - Some(vec![EventNotifier::new( - NotifierOperation::AddShared, - stream_fd, - Some(listener_fd), - EventSet::IN | EventSet::HANG_UP, - vec![input_handler, output_handler], - )]) + let send_buffers_cb = send_buffers.clone(); + let outavail_handler = Rc::new(move |event, _| { + if event & EventSet::OUT != EventSet::OUT { + return None; + } + send_buffers_cb() + }); + + let send_handler = Rc::new(move |_event, fd| { + read_fd(fd); + send_buffers() + }); + + Some(vec![ + EventNotifier::new( + NotifierOperation::AddShared, + stream_fd, + Some(listener_fd), + EventSet::IN | EventSet::HANG_UP, + vec![input_handler, outavail_handler], + ), + EventNotifier::new( + NotifierOperation::AddShared, + kick_out_fd, + None, + EventSet::IN, + vec![send_handler], + ), + ]) }); let listener_fd = listener.as_ref().unwrap().as_raw_fd(); diff --git a/devices/src/legacy/pl011.rs b/devices/src/legacy/pl011.rs index b656b875df1e0d9ef79d6dcf14440dc95985abd6..5cd31b1a20ee8ecc653459bc63471d164a7a4a79 100644 --- a/devices/src/legacy/pl011.rs +++ b/devices/src/legacy/pl011.rs @@ -10,6 +10,8 @@ // NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. // See the Mulan PSL v2 for more details. +use std::os::unix::prelude::{AsRawFd, RawFd}; +use std::rc::Rc; use std::sync::{Arc, Mutex}; use anyhow::{Context, Result}; @@ -34,12 +36,18 @@ use migration::{ use migration_derive::{ByteCode, Desc}; use util::byte_code::ByteCode; use util::gen_base_func; -use util::loop_context::{create_new_eventfd, EventNotifierHelper}; +use util::loop_context::{ + create_new_eventfd, read_fd, EventNotifier, EventNotifierHelper, NotifierCallback, + NotifierOperation, +}; use util::num_ops::read_data_u32; +use vmm_sys_util::epoll::EventSet; +use vmm_sys_util::eventfd::EventFd; -const PL011_FLAG_TXFE: u8 = 0x80; -const PL011_FLAG_RXFF: u8 = 0x40; -const PL011_FLAG_RXFE: u8 = 0x10; +const PL011_FLAG_TXFE: u32 = 0x80; +const PL011_FLAG_RXFF: u32 = 0x40; +const PL011_FLAG_TXFF: u32 = 0x20; +const PL011_FLAG_RXFE: u32 = 0x10; // Interrupt bits in UARTRIS, UARTMIS and UARTIMSC // Receive timeout interrupt bit @@ -53,6 +61,7 @@ const INT_E: u32 = 1 << 7 | 1 << 8 | 1 << 9 | 1 << 10; // nUARTRI/nUARTCTS/nUARTDCD/nUARTDSR modem interrupt bits, bits 0~3. const INT_MS: u32 = 1 | 1 << 1 | 1 << 2 | 1 << 3; +// Keep in sync with PL011::id! Fifo size depends on revision and affects a lot of things. const PL011_FIFO_SIZE: usize = 16; /// Device state of PL011. @@ -125,6 +134,8 @@ pub struct PL011 { state: PL011State, /// Character device for redirection. chardev: Arc>, + /// Event to register for tx-ready signal. + tx_ready_evt: Arc, } impl PL011 { @@ -143,7 +154,8 @@ impl PL011 { }, paused: false, state: PL011State::new(), - chardev: Arc::new(Mutex::new(Chardev::new(cfg.chardev))), + chardev: Arc::new(Mutex::new(Chardev::new(cfg.chardev)?)), + tx_ready_evt: Arc::new(create_new_eventfd()?), }; pl011 .set_sys_resource(sysbus, region_base, region_size, "PL011") @@ -163,6 +175,31 @@ impl PL011 { } } + fn register_tx_ready_handler(dev: Arc>) { + let fd = dev.lock().unwrap().tx_ready_evt.as_raw_fd(); + + let tx_ready_handler: Rc = Rc::new(move |_, fd: RawFd| { + read_fd(fd); + + let mut locked_dev = dev.lock().unwrap(); + // protect from spurious event + if !locked_dev.chardev.lock().unwrap().outbuf_is_full() { + locked_dev.state.int_level |= INT_TX; + locked_dev.interrupt(); + } + None + }); + + let notifier = EventNotifier::new( + NotifierOperation::AddShared, + fd, + None, + EventSet::IN, + vec![tx_ready_handler], + ); + let _ = EventLoop::update_event(vec![notifier], None); + } + fn unpause_rx(&mut self) { if self.paused { trace::pl011_unpause_rx(); @@ -227,8 +264,11 @@ impl Device for PL011 { dev.clone(), PL011_SNAPSHOT_ID, ); + + Self::register_tx_ready_handler(dev.clone()); + let locked_dev = dev.lock().unwrap(); - locked_dev.chardev.lock().unwrap().set_receiver(&dev); + locked_dev.chardev.lock().unwrap().set_receiver(&dev, true); EventLoop::update_event( EventNotifierHelper::internal_notifiers(locked_dev.chardev.clone()), None, @@ -279,7 +319,15 @@ impl SysBusDevOps for PL011 { ret = self.state.rsr; } 6 => { - ret = self.state.flags; + let mut val = self.state.flags & !(PL011_FLAG_TXFE | PL011_FLAG_TXFF); + + let free_space = self.chardev.lock().unwrap().outbuf_free_size(); + if free_space >= PL011_FIFO_SIZE { + val |= PL011_FLAG_TXFE; + } else if free_space == 0 { + val |= PL011_FLAG_TXFF; + } + ret = val; } 8 => { ret = self.state.ilpr; @@ -340,12 +388,22 @@ impl SysBusDevOps for PL011 { match offset >> 2 { 0 => { let ch = value as u8; - if let Err(e) = self.chardev.lock().unwrap().fill_outbuf(vec![ch], None) { + // We must not get here if outbuf is full. + // So, it is safe to drop the char in case of error. + let mut locked_chardev = self.chardev.lock().unwrap(); + if let Err(e) = locked_chardev.write_char(ch) { error!("Failed to append pl011 data to outbuf of chardev, {:?}", e); - return false; + // Fallback to signal INT_TX always. Otherwise guest will stop using port forever. + } + // If outbuf is not full, signal INT_TX. Otherwise, enable listener callback. + if locked_chardev.outbuf_is_full() { + self.state.int_level &= !INT_TX; + locked_chardev.set_outbuf_listener(Some(self.tx_ready_evt.clone())); + } else { + drop(locked_chardev); + self.state.int_level |= INT_TX; + self.interrupt(); } - self.state.int_level |= INT_TX; - self.interrupt(); } 1 => { self.state.rsr = 0; @@ -413,6 +471,7 @@ impl StateTransfer for PL011 { self.state = *PL011State::from_bytes(state) .with_context(|| MigrationError::FromBytesError("PL011"))?; + let _ = self.tx_ready_evt.write(1); // signal resume TX, if it was paused self.unpause_rx(); Ok(()) } diff --git a/devices/src/legacy/serial.rs b/devices/src/legacy/serial.rs index e21112b3cf214dcfa7beb85c9e34c253af509b2b..44b158d2bbdb4d1aa89a39d74d50abc24a9e4f98 100644 --- a/devices/src/legacy/serial.rs +++ b/devices/src/legacy/serial.rs @@ -11,6 +11,8 @@ // See the Mulan PSL v2 for more details. use std::collections::VecDeque; +use std::os::unix::prelude::{AsRawFd, RawFd}; +use std::rc::Rc; use std::sync::{Arc, Mutex}; use anyhow::{bail, Context, Result}; @@ -34,7 +36,12 @@ use migration::{ use migration_derive::{ByteCode, Desc}; use util::byte_code::ByteCode; use util::gen_base_func; -use util::loop_context::{create_new_eventfd, EventNotifierHelper}; +use util::loop_context::{ + create_new_eventfd, read_fd, EventNotifier, EventNotifierHelper, NotifierCallback, + NotifierOperation, +}; +use vmm_sys_util::epoll::EventSet; +use vmm_sys_util::eventfd::EventFd; pub const SERIAL_ADDR: u64 = 0x3f8; @@ -121,6 +128,8 @@ pub struct Serial { state: SerialState, /// Character device for redirection. chardev: Arc>, + /// Event to register for tx-ready signal. + tx_ready_evt: Arc, } impl Serial { @@ -135,7 +144,8 @@ impl Serial { paused: false, rbr: VecDeque::new(), state: SerialState::new(), - chardev: Arc::new(Mutex::new(Chardev::new(cfg.chardev))), + chardev: Arc::new(Mutex::new(Chardev::new(cfg.chardev)?)), + tx_ready_evt: Arc::new(create_new_eventfd()?), }; serial.base.interrupt_evt = Some(Arc::new(create_new_eventfd()?)); serial @@ -145,6 +155,32 @@ impl Serial { Ok(serial) } + fn register_tx_ready_handler(dev: Arc>) { + let fd = dev.lock().unwrap().tx_ready_evt.as_raw_fd(); + + let tx_ready_handler: Rc = Rc::new(move |_, fd: RawFd| { + read_fd(fd); + + let mut locked_dev = dev.lock().unwrap(); + // protect from spurious event + if !locked_dev.chardev.lock().unwrap().outbuf_is_full() { + locked_dev.state.lsr |= UART_LSR_TEMT | UART_LSR_THRE; + locked_dev.state.thr_pending = 1; + locked_dev.update_iir(); + } + None + }); + + let notifier = EventNotifier::new( + NotifierOperation::AddShared, + fd, + None, + EventSet::IN, + vec![tx_ready_handler], + ); + let _ = EventLoop::update_event(vec![notifier], None); + } + fn unpause_rx(&mut self) { if self.paused { trace::serial_unpause_rx(); @@ -262,9 +298,9 @@ impl Serial { if self.state.lcr & UART_LCR_DLAB != 0 { self.state.div = (self.state.div & 0xff00) | u16::from(data); } else { - self.state.thr_pending = 1; - if self.state.mcr & UART_MCR_LOOP != 0 { + self.state.thr_pending = 1; + // loopback mode let len = self.rbr.len(); if len >= RECEIVER_BUFF_SIZE { @@ -276,10 +312,23 @@ impl Serial { self.rbr.push_back(data); self.state.lsr |= UART_LSR_DR; - } else if let Err(e) = - self.chardev.lock().unwrap().fill_outbuf(vec![data], None) - { - bail!("Failed to append data to output buffer of chardev, {:?}", e); + } else { + let mut locked_chardev = self.chardev.lock().unwrap(); + if let Err(e) = locked_chardev.write_char(data) { + error!("Failed to append rs232 data to outbuf of chardev, {:?}", e); + // We need signal INT_TX anyway. Otherwise guest will stop using port forever. + // Fallback to signalling + } + + // If outbuf is not full, signal INT_TX. Otherwise, enable listener callback. + if locked_chardev.outbuf_is_full() { + self.state.lsr &= !(UART_LSR_TEMT | UART_LSR_THRE); + locked_chardev.set_outbuf_listener(Some(self.tx_ready_evt.clone())); + } else { + drop(locked_chardev); + self.state.thr_pending = 1; + self.state.lsr |= UART_LSR_TEMT | UART_LSR_THRE; + } } self.update_iir(); @@ -304,6 +353,10 @@ impl Serial { if data & UART_MCR_LOOP == 0 { // loopback turned off. Unpause rx self.unpause_rx(); + } else if self.state.lsr & UART_LSR_TEMT == 0 { + self.state.thr_pending = 1; + self.state.lsr |= UART_LSR_TEMT | UART_LSR_THRE; + self.update_iir(); } self.state.mcr = data; } @@ -369,8 +422,11 @@ impl Device for Serial { dev.clone(), SERIAL_SNAPSHOT_ID, ); + + Self::register_tx_ready_handler(dev.clone()); + let locked_dev = dev.lock().unwrap(); - locked_dev.chardev.lock().unwrap().set_receiver(&dev); + locked_dev.chardev.lock().unwrap().set_receiver(&dev, true); EventLoop::update_event( EventNotifierHelper::internal_notifiers(locked_dev.chardev.clone()), None, @@ -450,6 +506,7 @@ impl StateTransfer for Serial { } self.rbr = rbr; self.state = serial_state; + let _ = self.tx_ready_evt.write(1); // signal resume TX, if it was paused self.unpause_rx(); Ok(()) diff --git a/machine/src/lib.rs b/machine/src/lib.rs index 8a47faacd8272355fede333bc5ea0d3b8fa1baf4..4e054a4c52cf842394bbe5f4e6be5a9032573b3b 100644 --- a/machine/src/lib.rs +++ b/machine/src/lib.rs @@ -837,7 +837,7 @@ pub trait MachineOps: MachineLifecycle { ) })?; - let mut serial_port = SerialPort::new(serialport_cfg, chardev_cfg); + let mut serial_port = SerialPort::new(serialport_cfg, chardev_cfg)?; let port = Arc::new(Mutex::new(serial_port.clone())); serial_port.realize()?; if !is_console { diff --git a/virtio/src/device/serial.rs b/virtio/src/device/serial.rs index f8a1e35feb85133ac20839a8f702ec79041d7b82..5e90bfccf1ec153c949837580ff2867c0cb09efb 100644 --- a/virtio/src/device/serial.rs +++ b/virtio/src/device/serial.rs @@ -350,7 +350,7 @@ pub struct SerialPort { } impl SerialPort { - pub fn new(port_cfg: VirtioSerialPortCfg, chardev_cfg: ChardevConfig) -> Self { + pub fn new(port_cfg: VirtioSerialPortCfg, chardev_cfg: ChardevConfig) -> Result { // Console is default host connected. And pty chardev has opened by default in realize() // function. let is_console = matches!(port_cfg.classtype.as_str(), "virtconsole"); @@ -359,16 +359,16 @@ impl SerialPort { host_connected = true; } - SerialPort { + Ok(SerialPort { name: Some(port_cfg.id), paused: false, - chardev: Arc::new(Mutex::new(Chardev::new(chardev_cfg))), + chardev: Arc::new(Mutex::new(Chardev::new(chardev_cfg)?)), nr: port_cfg.nr.unwrap(), is_console, guest_connected: false, host_connected, ctrl_handler: None, - } + }) } pub fn realize(&mut self) -> Result<()> { @@ -393,7 +393,7 @@ impl SerialPort { } fn activate(&mut self, handler: &Arc>) { - self.chardev.lock().unwrap().set_receiver(handler); + self.chardev.lock().unwrap().set_receiver(handler, false); } fn deactivate(&mut self) { @@ -459,7 +459,17 @@ impl SerialPortHandler { let locked_port = port.lock().unwrap(); let locked_cdev = locked_port.chardev.lock().unwrap(); if locked_cdev.outbuf_is_full() { + // disable further notifications until space appears + queue_lock + .vring + .suppress_queue_notify(&self.mem_space, self.driver_features, true) + .with_context(|| "Failed to disable tx queue notify")?; break; + } else { + queue_lock + .vring + .suppress_queue_notify(&self.mem_space, self.driver_features, false) + .with_context(|| "Failed to enable tx queue notify")?; } }