diff --git a/apply-patches b/apply-patches new file mode 100755 index 0000000000000000000000000000000000000000..9afd70bba2b849b9f46ea3fd58b284be24959379 --- /dev/null +++ b/apply-patches @@ -0,0 +1,14 @@ +#!/bin/bash + +set -e + +tar -xzvf kuasar-0.0.1.tar.gz +cp -rf ./kuasar-0.0.1/* ./ +cat ./series.conf | while read line +do + if [[ $line == '' || $line =~ ^\s*# ]]; then + continue + fi + echo "====patch $line======" + patch -p1 -F1 -s < ./patch/$line +done diff --git a/initrd/make-initrd-rpm.list b/initrd/make-initrd-rpm.list new file mode 100755 index 0000000000000000000000000000000000000000..4bf74f3ada9e58cdcc558b81c10f4ef54e533110 --- /dev/null +++ b/initrd/make-initrd-rpm.list @@ -0,0 +1,40 @@ +# nfs-utils +nfs-utils +# tcp_wrappers-libs +tcp_wrappers-libs +# rpcbind +rpcbind +# libverto-tevent +libverto-tevent +# libtirpc +libtirpc +# libtevent +libtevent +# libtalloc +libtalloc +# libref_array +libref_array +# libpath_utils +libpath_utils +# libnfsidmap +libnfsidmap +# libini_config +libini_config +# libevent +libevent +# libcom_err +libcom_err +# libcollection +libcollection +# libbasicobjects +libbasicobjects +# keyutils +keyutils +# gssproxy +gssproxy +# e2fsprogs-libs +e2fsprogs-libs +# docker-runc +docker-runc +# bash +bash diff --git a/initrd/make_kuasar_initrd.sh b/initrd/make_kuasar_initrd.sh new file mode 100755 index 0000000000000000000000000000000000000000..2aaeaab4137f49c457e8e8c0b7c8d25efb1b346c --- /dev/null +++ b/initrd/make_kuasar_initrd.sh @@ -0,0 +1,96 @@ +#!/bin/bash + +# Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. +# kata_integration is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +#     http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. +# Description: make kuasar initrd + +script_dir="$(dirname $(readlink -f $0))" +rpmlist=${script_dir}/make-initrd-rpm.list + +IMAGE_NAME=${IMAGE_NAME:-kuasar.initrd} +ROOTFS_DIR=${ROOTFS_DIR:-/tmp/kuasar-rootfs} + +# create a temp dir to store rootfs +rm -rf ${ROOTFS_DIR} +mkdir -p ${ROOTFS_DIR}/lib \ + ${ROOTFS_DIR}/lib64 \ + ${ROOTFS_DIR}/lib/modules + +mkdir -m 0755 -p ${ROOTFS_DIR}/dev \ + ${ROOTFS_DIR}/sys \ + ${ROOTFS_DIR}/sbin \ + ${ROOTFS_DIR}/bin \ + ${ROOTFS_DIR}/tmp \ + ${ROOTFS_DIR}/proc + +if [ ! -f "${script_dir}/vmm-task" ];then + echo "vmm-task doesn't exist!" + exit 1 +fi + +# busybox +cp /sbin/busybox ${ROOTFS_DIR}/sbin/ +cp ${script_dir}/vmm-task ${ROOTFS_DIR}/init +# glibc-devel glibc +cp /lib64/libnss_dns* ${ROOTFS_DIR}/lib64 +cp /lib64/libnss_files* ${ROOTFS_DIR}/lib64 + +# cp run request files in initrd +cat $rpmlist | while read rpm +do + if [ "${rpm:0:1}" != "#" ]; then + rpm -ql $rpm > /dev/null 2>&1 + if [ $? -ne 0 ]; then + continue + fi + array=($(rpm -ql $rpm| grep -v "share" | grep -v ".build-id")) + echo array + for file in ${array[@]}; + do + source=$file + dts_file=${ROOTFS_DIR}$file + dts_folder=${dts_file%/*} + if [ ! -d "$dts_folder" ];then + mkdir -p $dts_folder + fi + cp -r -f -d $source $dts_folder + done + fi +done + +#create symlinks to busybox +BUSYBOX_BINARIES=(/bin/sh /bin/mount /bin/umount /bin/ls /bin/ps /bin/file /bin/ldd /bin/tar /bin/hwclock /sbin/modprobe /sbin/depmod /bin/ip /bin/modinfo /bin/insmod /bin/rmmod) +for bin in ${BUSYBOX_BINARIES[@]} +do + mkdir -p ${ROOTFS_DIR}/`dirname ${bin}` + ln -sf /sbin/busybox ${ROOTFS_DIR}/${bin} +done + +LDD_BINARIES=(/init /sbin/busybox /usr/bin/bash) +for bin in ${LDD_BINARIES[@]} + do + ldd ${ROOTFS_DIR}${bin} | while read line + do + arr=(${line// / }) + + for lib in ${arr[@]} + do + echo $lib + if [ "${lib:0:1}" = "/" ]; then + dir=${ROOTFS_DIR}`dirname $lib` + mkdir -p "${dir}" + cp -f $lib $dir + fi + done + done + done + +cp ${ROOTFS_DIR}/usr/bin/bash ${ROOTFS_DIR}/bin/bash +(cd ${ROOTFS_DIR} && find . | cpio -H newc -o | gzip -9 ) > ${script_dir}/${IMAGE_NAME} diff --git a/kuasar-0.0.1.tar.gz b/kuasar-0.0.1.tar.gz new file mode 100644 index 0000000000000000000000000000000000000000..4d210a06a911b405e0073970862332008130de5d Binary files /dev/null and b/kuasar-0.0.1.tar.gz differ diff --git a/kuasar.spec b/kuasar.spec new file mode 100644 index 0000000000000000000000000000000000000000..92fad6a26d4a77cabab8224e91390f650deb0062 --- /dev/null +++ b/kuasar.spec @@ -0,0 +1,85 @@ +%global debug_package %{nil} + +Name: kuasar +Version: 0.0.1 +Release: 1 +Summary: Kuasar is an efficient container runtime that supports multiple sandbox techniques. +License: Apache License 2.0 +URL: https://github.com/kuasar-io/kuasar +Source0: kuasar.tar.gz +Source1: kernel.tar.gz +BuildRequires: automake golang bc glibc-devel glibc-static busybox glib2-devel glib2 ipvsadm conntrack-tools nfs-utils bash gcc cmake gcc-c++ +BuildRequires: patch elfutils-libelf-devel openssl-devel bison flex rust cargo rust-packaging libgcc dtc-devel docker-runc + +%define _cargo /usr/bin/env CARGO_HOME=.cargo /usr/bin/cargo +%define hypervisor stratovirt +%define localbindir /usr/local/bin +%define kuasarconfdir /var/lib/kuasar + +%description +Kuasar is an efficient container runtime that provides cloud-native, all-scenario container solutions by supporting multiple sandbox techniques. + +%prep +%setup -T -c -a 0 -n kuasar-%{version} +%setup -T -c -a 1 -n kernel + +cd %{_builddir}/kuasar-%{version} +sh -x apply-patches + +cd %{_builddir}/kernel +mv kernel linux +cd %{_builddir}/kernel/linux +%ifarch %{ix86} x86_64 +cp %{_builddir}/kuasar-%{version}/vmm/scripts/kernel/stratovirt/kuasar-openeuler-kernel-x86_64.config ./.config +%else +cp %{_builddir}/kuasar-%{version}/vmm/scripts/kernel/stratovirt/kuasar-openeuler-kernel-aarch64.config ./.config +%endif + +%build +# build vmm-sandboxer and vmm-task +cd %{_builddir}/kuasar-%{version} +HYPERVISOR=%{hypervisor} make bin/vmm-sandboxer +%ifarch %{ix86} x86_64 +ARCH=x86_64 HYPERVISOR=%{hypervisor} make bin/vmm-task +%else +ARCH=aarch64 HYPERVISOR=%{hypervisor} make bin/vmm-task +%endif + +# build initrd +cp %{_builddir}/kuasar-%{version}/bin/vmm-task ./initrd +cd initrd && sh -x make_kuasar_initrd.sh + +# build kernel +cd %{_builddir}/kernel/linux/ +make olddefconfig +make %{?_smp_mflags} + +%install +mkdir -p -m 750 %{buildroot}/%{kuasarconfdir} +%ifarch %{ix86} x86_64 +install -p -m 750 -D %{_builddir}/kernel/linux/arch/x86/boot/bzImage %{buildroot}/%{kuasarconfdir}/vmlinux.bin +%else +install -p -m 750 -D %{_builddir}/kernel/linux/arch/arm64/boot/Image %{buildroot}/%{kuasarconfdir}/vmlinux.bin +%endif +install -p -m 750 -D %{_builddir}/kuasar-%{version}/initrd/kuasar.initrd %{buildroot}/%{kuasarconfdir}/kuasar.initrd +install -p -m 640 -D %{_builddir}/kuasar-%{version}/vmm/sandbox/config_%{hypervisor}.toml %{buildroot}/%{kuasarconfdir}/config_%{hypervisor}.toml + +install -p -m 750 -D %{_builddir}/kuasar-%{version}/bin/vmm-sandboxer %{buildroot}/%{localbindir}/vmm-sandboxer +install -p -m 640 -D %{_builddir}/kuasar-%{version}/vmm/service/kuasar-vmm.service %{buildroot}/%{_unitdir}/kuasar-vmm.service +install -p -m 640 -D %{_builddir}/kuasar-%{version}/vmm/service/kuasar-vmm %{buildroot}/etc/sysconfig/kuasar-vmm + +%clean +rm -rf %{buildroot} + +%files +%defattr(-,root,root,-) +%config(noreplace) %{_unitdir}/kuasar-vmm.service +%config(noreplace) /etc/sysconfig/kuasar-vmm +%{localbindir}/vmm-sandboxer +%{kuasarconfdir}/vmlinux.bin +%{kuasarconfdir}/kuasar.initrd +%config(noreplace) %{kuasarconfdir}/config_%{hypervisor}.toml + +%changelog +* Tue Jul 18 2023 flyflyflypeng - 0.0.1-1 +- init package diff --git a/patch/0001-replace-stratovirt-qapi.patch b/patch/0001-replace-stratovirt-qapi.patch new file mode 100644 index 0000000000000000000000000000000000000000..9ef5d0efbfba2bc7adf6f5303c2d0f0c648d9e6a --- /dev/null +++ b/patch/0001-replace-stratovirt-qapi.patch @@ -0,0 +1,1256 @@ +From e87d7523914d1d715bcebf93fa35fa2097b015dc Mon Sep 17 00:00:00 2001 +From: Vanient +Date: Fri, 28 Jul 2023 09:17:57 +0800 +Subject: [PATCH] replace stratovirt qapi + +Signed-off-by: Vanient +--- + vmm/sandbox/.cargo/config.toml | 3 + + .../stratovirt-qapi/.cargo-checksum.json | 1 + + vmm/sandbox/stratovirt-qapi/Cargo.toml | 90 ++++ + .../stratovirt-qapi/src/futures/codec.rs | 100 +++++ + .../stratovirt-qapi/src/futures/mod.rs | 413 ++++++++++++++++++ + .../stratovirt-qapi/src/futures/tokio.rs | 206 +++++++++ + .../stratovirt-qapi/src/futures/tower.rs | 25 ++ + vmm/sandbox/stratovirt-qapi/src/lib.rs | 340 ++++++++++++++ + 8 files changed, 1178 insertions(+) + create mode 100644 vmm/sandbox/stratovirt-qapi/.cargo-checksum.json + create mode 100644 vmm/sandbox/stratovirt-qapi/Cargo.toml + create mode 100644 vmm/sandbox/stratovirt-qapi/src/futures/codec.rs + create mode 100644 vmm/sandbox/stratovirt-qapi/src/futures/mod.rs + create mode 100644 vmm/sandbox/stratovirt-qapi/src/futures/tokio.rs + create mode 100644 vmm/sandbox/stratovirt-qapi/src/futures/tower.rs + create mode 100644 vmm/sandbox/stratovirt-qapi/src/lib.rs + +diff --git a/vmm/sandbox/.cargo/config.toml b/vmm/sandbox/.cargo/config.toml +index abd187d4..7da499cf 100644 +--- a/vmm/sandbox/.cargo/config.toml ++++ b/vmm/sandbox/.cargo/config.toml +@@ -11,3 +11,6 @@ replace-with = "vendored-sources" + + [source.vendored-sources] + directory = "vendor" ++ ++[patch.crates-io.qapi] ++path= "stratovirt-qapi" +diff --git a/vmm/sandbox/stratovirt-qapi/.cargo-checksum.json b/vmm/sandbox/stratovirt-qapi/.cargo-checksum.json +new file mode 100644 +index 00000000..6bd4ce0a +--- /dev/null ++++ b/vmm/sandbox/stratovirt-qapi/.cargo-checksum.json +@@ -0,0 +1 @@ ++{"files":{"Cargo.toml":"6b0a056cc6d5e7ef6a9dbc1631bb21f3fb259a76242e7a7a5412c7229c138bac","src/futures/codec.rs":"fff02e429a6d02aa9d8b971eb450350236f000dd55297eaec0fa6810d4030866","src/futures/mod.rs":"b9a130d055f8be7ac5067d8ffaa149fb4f485eb46c5c2aacc3977178aeff07a9","src/futures/tokio.rs":"722f370984a19286451bc4f2433fc11dd40a9c0ae018ba2e16a0224f4e630455","src/futures/tower.rs":"c4b3c810fa3210e1c0ef683ad8d9e861e1f6fdb393ac7207d02470132c51239d","src/lib.rs":"a3bfa927a3deefbcd5b70a2666f43f09d413f7b37ac7f8e71711a0684d648d54"},"package":"37a6fe62413c9eec316d34ed057e19e2a47bfa5a6094ccaf6de81ceeefee74a0"} +\ No newline at end of file +diff --git a/vmm/sandbox/stratovirt-qapi/Cargo.toml b/vmm/sandbox/stratovirt-qapi/Cargo.toml +new file mode 100644 +index 00000000..739be0d0 +--- /dev/null ++++ b/vmm/sandbox/stratovirt-qapi/Cargo.toml +@@ -0,0 +1,90 @@ ++# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO ++# ++# When uploading crates to the registry Cargo will automatically ++# "normalize" Cargo.toml files for maximal compatibility ++# with all versions of Cargo and also rewrite `path` dependencies ++# to registry (e.g., crates.io) dependencies. ++# ++# If you are reading this file be aware that the original Cargo.toml ++# will likely look very different (and much more reasonable). ++# See Cargo.toml.orig for the original contents. ++ ++[package] ++edition = "2018" ++name = "qapi" ++version = "0.8.0" ++authors = ["arcnmx"] ++description = "QEMU QMP and Guest Agent API" ++documentation = "https://docs.rs/qapi" ++readme = "../README.md" ++keywords = ["qemu", "qmp", "qga", "qapi"] ++license = "MIT" ++repository = "https://github.com/arcnmx/qapi-rs" ++[package.metadata.docs.rs] ++all-features = true ++[dependencies.bytes] ++version = "^1.0.0" ++optional = true ++ ++[dependencies.futures] ++version = "^0.3.5" ++optional = true ++ ++[dependencies.futures_codec] ++version = "^0.4.1" ++optional = true ++ ++[dependencies.log] ++version = "^0.4.6" ++ ++[dependencies.memchr] ++version = "^2.3.3" ++optional = true ++ ++[dependencies.qapi-qga] ++version = "^0.8.0" ++optional = true ++ ++[dependencies.qapi-qmp] ++version = "^0.8.0" ++optional = true ++ ++[dependencies.qapi-spec] ++version = "^0.3.0" ++ ++[dependencies.serde] ++version = "^1.0.27" ++ ++[dependencies.serde_json] ++version = "^1.0.9" ++ ++[dependencies.tokio] ++version = "^1.0.0" ++features = ["io-util"] ++optional = true ++default-features = false ++ ++[dependencies.tokio-util] ++version = "^0.6.0" ++features = ["codec"] ++optional = true ++ ++[dependencies.tower-service] ++version = "^0.3.0" ++optional = true ++ ++[features] ++async = ["futures"] ++async-io = ["async", "futures_codec", "bytes", "memchr"] ++async-tokio = ["async", "tokio", "tokio-util", "bytes", "memchr"] ++async-tokio-all = ["async-tokio-net", "async-tokio-spawn"] ++async-tokio-net = ["async-tokio", "tokio/net"] ++async-tokio-spawn = ["async-tokio", "tokio/rt"] ++async-tower = ["async", "tower-service"] ++qga = ["qapi-qga"] ++qmp = ["qapi-qmp"] ++[badges.maintenance] ++status = "passively-maintained" ++ ++[badges.travis-ci] ++repository = "arcnmx/qapi-rs" +diff --git a/vmm/sandbox/stratovirt-qapi/src/futures/codec.rs b/vmm/sandbox/stratovirt-qapi/src/futures/codec.rs +new file mode 100644 +index 00000000..033c865c +--- /dev/null ++++ b/vmm/sandbox/stratovirt-qapi/src/futures/codec.rs +@@ -0,0 +1,100 @@ ++use std::io; ++use std::marker::PhantomData; ++use bytes::{BytesMut, BufMut}; ++use serde::{de::DeserializeOwned, Serialize}; ++ ++pub struct JsonLinesCodec { ++ next_index: usize, ++ _decoder: PhantomData D>, ++} ++ ++impl JsonLinesCodec { ++ pub fn new() -> Self { ++ Self { ++ next_index: 0, ++ _decoder: PhantomData, ++ } ++ } ++} ++ ++impl JsonLinesCodec { ++ fn priv_decode(&mut self, buf: &mut BytesMut) -> Result, io::Error> { ++ match memchr::memchr(b'\n', &buf[self.next_index..]) { ++ Some(offset) => { ++ let index = offset + self.next_index; ++ self.next_index = 0; ++ let line = buf.split_to(index + 1); ++ serde_json::from_slice(&line) ++ .map_err(From::from) ++ .map(Some) ++ }, ++ None => { ++ self.next_index = buf.len(); ++ Ok(None) ++ }, ++ } ++ } ++ ++ fn priv_decode_eof(&mut self, buf: &mut BytesMut) -> Result, io::Error> { ++ if buf.is_empty() { ++ Ok(None) ++ } else { ++ serde_json::from_slice(buf) ++ .map_err(From::from) ++ .map(Some) ++ } ++ } ++} ++ ++#[cfg(feature = "futures_codec")] ++impl futures_codec::Decoder for JsonLinesCodec { ++ type Item = D; ++ type Error = io::Error; ++ ++ fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { ++ self.priv_decode(buf) ++ } ++ ++ fn decode_eof(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { ++ self.priv_decode_eof(buf) ++ } ++} ++ ++#[cfg(feature = "tokio-util")] ++impl tokio_util::codec::Decoder for JsonLinesCodec { ++ type Item = D; ++ type Error = io::Error; ++ ++ fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { ++ self.priv_decode(buf) ++ } ++ ++ fn decode_eof(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { ++ self.priv_decode_eof(buf) ++ } ++} ++ ++fn encode(item: S, bytes: &mut BytesMut) -> Result<(), io::Error> { ++ serde_json::to_writer(bytes.writer(), &item)?; ++ bytes.put_u8(b'\n'); ++ Ok(()) ++} ++ ++#[cfg(feature = "futures_codec")] ++impl futures_codec::Encoder for JsonLinesCodec { ++ type Item = S; ++ type Error = io::Error; ++ ++ fn encode(&mut self, item: S, bytes: &mut BytesMut) -> Result<(), Self::Error> { ++ encode(item, bytes) ++ } ++} ++ ++#[cfg(feature = "tokio-util")] ++impl tokio_util::codec::Encoder for JsonLinesCodec { ++ type Error = io::Error; ++ ++ fn encode(&mut self, item: S, bytes: &mut BytesMut) -> Result<(), Self::Error> { ++ encode(item, bytes) ++ } ++} +diff --git a/vmm/sandbox/stratovirt-qapi/src/futures/mod.rs b/vmm/sandbox/stratovirt-qapi/src/futures/mod.rs +new file mode 100644 +index 00000000..cb451f94 +--- /dev/null ++++ b/vmm/sandbox/stratovirt-qapi/src/futures/mod.rs +@@ -0,0 +1,413 @@ ++#[cfg(feature = "qapi-qmp")] ++use qapi_qmp::{QmpMessage, QmpMessageAny, QapiCapabilities, QMPCapability}; ++ ++use qapi_spec::Response; ++use crate::{Any, Execute, ExecuteResult, Command}; ++ ++use std::collections::BTreeMap; ++use std::convert::TryInto; ++use std::marker::Unpin; ++use std::sync::{Arc, Mutex as StdMutex, atomic::{AtomicUsize, AtomicBool, Ordering}}; ++use std::task::{Context, Poll}; ++use std::pin::Pin; ++use std::io; ++use futures::channel::oneshot; ++use futures::task::AtomicWaker; ++use futures::lock::Mutex; ++use futures::{Future, FutureExt, Sink, SinkExt, Stream}; ++use serde::Deserialize; ++use log::{trace, info, warn}; ++ ++#[cfg(any(feature = "futures_codec", feature = "tokio-util"))] ++mod codec; ++ ++#[cfg(feature = "tokio")] ++mod tokio; ++#[cfg(feature = "tokio")] ++pub use self::tokio::*; ++ ++#[cfg(feature = "tower-service")] ++mod tower; ++ ++pub struct QapiStream { ++ service: QapiService, ++ events: QapiEvents, ++} ++ ++impl QapiStream { ++ pub fn with_parts(service: QapiService, events: QapiEvents) -> Self { ++ Self { ++ service, ++ events, ++ } ++ } ++ ++ pub fn into_parts(self) -> (QapiService, QapiEvents) { ++ (self.service, self.events) ++ } ++ ++ #[cfg(feature = "async-tokio-spawn")] ++ pub fn spawn_tokio(self) -> (QapiService, ::tokio::task::JoinHandle<()>) where ++ QapiEvents: Future> + Send + 'static, ++ { ++ let handle = self.events.spawn_tokio(); ++ (self.service, handle) ++ } ++ ++ pub fn execute<'a, C: Command + 'a>(&'a mut self, command: C) -> impl Future> + 'a where ++ QapiEvents: Future> + Unpin, ++ W: Sink, Error=io::Error> + Unpin ++ { ++ let execute = self.service.execute(command).fuse(); ++ ++ async move { ++ futures::pin_mut!(execute); ++ ++ futures::select_biased! { ++ res = execute => res, ++ res = (&mut self.events).fuse() => { ++ res?; ++ Err(io::Error::new(io::ErrorKind::UnexpectedEof, "unexpected EOF when executing command").into()) ++ }, ++ } ++ } ++ } ++} ++ ++#[cfg(feature = "qapi-qmp")] ++pub struct QmpStreamNegotiation { ++ pub stream: QapiStream, ++ pub capabilities: QapiCapabilities, ++} ++ ++#[cfg(feature = "qapi-qmp")] ++impl QmpStreamNegotiation where ++ QapiEvents: Future> + Unpin, ++ W: Sink, Error=io::Error> + Unpin, ++{ ++ pub async fn negotiate_caps(mut self, caps: C) -> io::Result> where ++ C: IntoIterator, ++ { ++ let _ = self.stream.execute(qapi_qmp::qmp_capabilities { ++ enable: None, ++ }).await?; ++ ++ Ok(self.stream) ++ } ++ ++ pub async fn negotiate(self) -> io::Result> { ++ self.negotiate_caps(std::iter::empty()).await ++ } ++} ++ ++type QapiCommandMap = BTreeMap>>; ++ ++pub struct QapiService { ++ shared: Arc, ++ write: Arc>, ++ id_counter: AtomicUsize, ++} ++ ++impl QapiService { ++ fn new(write: W, shared: Arc) -> Self { ++ QapiService { ++ shared, ++ write: Mutex::new(write).into(), ++ id_counter: AtomicUsize::new(0), ++ } ++ } ++ ++ fn next_oob_id(&self) -> u32 { ++ self.id_counter.fetch_add(1, Ordering::Relaxed) as _ ++ } ++ ++ fn command_id(&self) -> Option { ++ if self.shared.supports_oob { ++ Some(self.next_oob_id()) ++ } else { ++ None ++ } ++ } ++ ++ fn command_response(receiver: oneshot::Receiver>) -> impl Future> { ++ receiver.map(|res| match res { ++ Ok(Ok(res)) => C::Ok::deserialize(&res) ++ .map_err(io::Error::from).map_err(From::from), ++ Ok(Err(e)) => Err(e.into()), ++ Err(_cancelled) => Err(io::Error::new(io::ErrorKind::UnexpectedEof, "QAPI stream disconnected").into()), ++ }) ++ } ++ ++ pub fn execute(&self, command: C) -> impl Future> where ++ W: Sink, Error=io::Error> + Unpin ++ { ++ let id = self.command_id(); ++ let sink = self.write.clone(); ++ let shared = self.shared.clone(); ++ let command = Execute::new(command, id); ++ ++ async move { ++ let mut sink = sink.lock().await; ++ let receiver = shared.command_insert(id.unwrap_or_default()); ++ ++ sink.send(command).await?; ++ if id.is_some() { ++ // retain write lock only if id/oob execution isn't supported ++ drop(sink) ++ } ++ ++ Self::command_response::(receiver).await ++ } ++ } ++ ++ /*pub async fn execute_oob(&self, command: C) -> io::Result> { ++ /* TODO: should we assert C::ALLOW_OOB here and/or at the type level? ++ * If oob isn't supported should we fall back to serial execution or error? ++ */ ++ self.execute_(command, true).await ++ }*/ ++ ++ #[cfg(feature = "qapi-qga")] ++ pub fn guest_sync(&self, sync_value: isize) -> impl Future> where ++ W: Sink, Error=io::Error> + Unpin ++ { ++ self.execute(qapi_qga::guest_sync { ++ id: sync_value, ++ }).map(move |res| res.and_then(|res| if res == sync_value { ++ Ok(()) ++ } else { ++ Err(io::Error::new(io::ErrorKind::InvalidData, "QGA sync failed").into()) ++ })) ++ } ++ ++ fn stop(&self) { ++ let mut commands = self.shared.commands.lock().unwrap(); ++ if self.shared.abandoned.load(Ordering::Relaxed) { ++ self.shared.stop(); ++ } ++ commands.abandoned = true; ++ } ++} ++ ++impl Drop for QapiService { ++ fn drop(&mut self) { ++ self.stop(); ++ } ++} ++ ++#[derive(Default)] ++struct QapiSharedCommands { ++ pending: QapiCommandMap, ++ abandoned: bool, ++} ++ ++struct QapiShared { ++ commands: StdMutex, ++ stop_waker: AtomicWaker, ++ stop: AtomicBool, ++ abandoned: AtomicBool, ++ supports_oob: bool, ++} ++ ++impl QapiShared { ++ fn new(supports_oob: bool) -> Self { ++ Self { ++ commands: Default::default(), ++ stop_waker: Default::default(), ++ stop: Default::default(), ++ abandoned: Default::default(), ++ supports_oob, ++ } ++ } ++ ++ fn stop(&self) { ++ self.stop.store(true, Ordering::Relaxed); ++ self.stop_waker.wake(); ++ } ++ ++ fn is_stopped(&self) -> bool { ++ self.stop.load(Ordering::Relaxed) ++ } ++ ++ fn poll_next Poll>>(&self, cx: &mut Context, poll: P) -> Poll> { ++ if self.is_stopped() { ++ return Poll::Ready(None) ++ } ++ ++ // attempt to complete the future ++ match poll(cx) { ++ Poll::Ready(res) => { ++ if res.is_none() { ++ self.stop.store(true, Ordering::Relaxed); ++ } ++ Poll::Ready(res) ++ }, ++ Poll::Pending => { ++ self.stop_waker.register(cx.waker()); ++ if self.is_stopped() { ++ Poll::Ready(None) ++ } else { ++ Poll::Pending ++ } ++ }, ++ } ++ } ++ ++ fn command_remove(&self, id: u32) -> Option>> { ++ let mut commands = self.commands.lock().unwrap(); ++ commands.pending.remove(&id) ++ } ++ ++ fn command_insert(&self, id: u32) -> oneshot::Receiver> { ++ let (sender, receiver) = oneshot::channel(); ++ let mut commands = self.commands.lock().unwrap(); ++ if !commands.abandoned { ++ // otherwise sender is dropped immediately ++ if let Some(_prev) = commands.pending.insert(id, sender) { ++ panic!("QAPI duplicate command id {:?}, this should not happen", id); ++ } ++ } ++ receiver ++ } ++} ++ ++#[must_use] ++pub struct QapiEvents { ++ stream: S, ++ shared: Arc, ++} ++ ++impl QapiEvents { ++ pub fn release(&self) -> Result<(), ()> { ++ let commands = self.shared.commands.lock().unwrap(); ++ if commands.abandoned { ++ Err(()) ++ } else { ++ self.shared.abandoned.store(true, Ordering::Relaxed); ++ Ok(()) ++ } ++ } ++ ++ pub async fn into_future(self) -> () where ++ Self: Future>, ++ { ++ if self.release().is_err() { ++ info!("QAPI service abandoned before spawning"); ++ return ++ } ++ ++ match self.await { ++ Ok(()) => (), ++ Err(e) => ++ warn!("QAPI stream closed with error {:?}", e), ++ } ++ } ++ ++ pub fn spawn(self, spawn: SP) -> Result<(), futures::task::SpawnError> where ++ Self: Future> + Send + 'static, ++ S: 'static ++ { ++ use futures::task::SpawnExt; ++ ++ spawn.spawn(self.into_future()) ++ } ++ ++ #[cfg(feature = "async-tokio-spawn")] ++ pub fn spawn_tokio(self) -> ::tokio::task::JoinHandle<()> where ++ Self: Future> + Send + 'static, ++ S: 'static ++ { ++ ::tokio::spawn(self.into_future()) ++ } ++} ++ ++impl Drop for QapiEvents { ++ fn drop(&mut self) { ++ let mut commands = self.shared.commands.lock().unwrap(); ++ commands.pending.clear(); ++ commands.abandoned = true; ++ } ++} ++ ++fn response_id(res: &Response, supports_oob: bool) -> io::Result { ++ match (res.id().and_then(|id| id.as_u64()), supports_oob) { ++ (Some(id), true) => ++ id.try_into().map_err(|e| ++ io::Error::new(io::ErrorKind::InvalidData, e) ++ ), ++ (None, false) => ++ Ok(Default::default()), ++ (None, true) => ++ Err(io::Error::new(io::ErrorKind::InvalidData, format!("QAPI expected response with numeric ID, got {:?}", res.id()))), ++ (Some(..), false) => ++ Err(io::Error::new(io::ErrorKind::InvalidData, format!("QAPI expected response without ID, got {:?}", res.id()))), ++ } ++} ++ ++fn handle_response(shared: &QapiShared, res: Response) -> io::Result<()> { ++ let id = response_id(&res, shared.supports_oob)?; ++ ++ if let Some(sender) = shared.command_remove(id) { ++ sender.send(res.result()).map_err(|_e| ++ io::Error::new(io::ErrorKind::InvalidData, format!("failed to send response for ID {:?}", id)) ++ ) ++ } else { ++ Err(io::Error::new(io::ErrorKind::InvalidData, format!("unknown QAPI response with ID {:?}", res.id()))) ++ } ++} ++ ++impl Future for QapiEvents where ++ S: Stream>, ++ M: TryInto>, ++{ ++ type Output = io::Result<()>; ++ ++ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { ++ let this = unsafe { self.get_unchecked_mut() }; ++ let stream = unsafe { Pin::new_unchecked(&mut this.stream) }; ++ let shared = &this.shared; ++ ++ shared.poll_next(cx, |cx| Poll::Ready(Some(match futures::ready!(stream.poll_next(cx)) { ++ None => return Poll::Ready(None), ++ Some(Err(e)) => Err(e), ++ Some(Ok(res)) => match res.try_into() { ++ Ok(res) => match handle_response(shared, res) { ++ Err(e) => Err(e), ++ Ok(()) => { ++ cx.waker().wake_by_ref(); // TODO: I've seen this not work with tokio? ++ return Poll::Pending ++ }, ++ }, ++ Err(..) => { ++ trace!("Ignoring QAPI event"); ++ cx.waker().wake_by_ref(); // TODO: I've seen this not work with tokio? ++ return Poll::Pending ++ }, ++ }, ++ }))).map(|res| res.unwrap_or(Ok(()))) ++ } ++} ++ ++#[cfg(feature = "qapi-qmp")] ++impl>> Stream for QapiEvents { ++ type Item = io::Result; ++ ++ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { ++ let this = unsafe { self.get_unchecked_mut() }; ++ let stream = unsafe { Pin::new_unchecked(&mut this.stream) }; ++ let shared = &this.shared; ++ ++ shared.poll_next(cx, |cx| Poll::Ready(match futures::ready!(stream.poll_next(cx)) { ++ None => None, // eof ++ Some(Err(e)) => Some(Err(e)), ++ Some(Ok(QmpMessage::Event(e))) => Some(Ok(e)), ++ Some(Ok(QmpMessage::Response(res))) => match handle_response(shared, res) { ++ Err(e) => Some(Err(e)), ++ Ok(()) => { ++ cx.waker().wake_by_ref(); // TODO: I've seen this not work with tokio? ++ return Poll::Pending ++ }, ++ }, ++ })) ++ } ++} +diff --git a/vmm/sandbox/stratovirt-qapi/src/futures/tokio.rs b/vmm/sandbox/stratovirt-qapi/src/futures/tokio.rs +new file mode 100644 +index 00000000..16b73478 +--- /dev/null ++++ b/vmm/sandbox/stratovirt-qapi/src/futures/tokio.rs +@@ -0,0 +1,206 @@ ++use std::io; ++use std::pin::Pin; ++use std::task::{Context, Poll}; ++use std::sync::Arc; ++use futures::{Sink, Stream}; ++use tokio::io::{AsyncRead, AsyncWrite, ReadHalf, WriteHalf, split}; ++use tokio_util::codec::{Framed, FramedParts}; ++use qapi_spec::{Execute, Response, Any}; ++#[cfg(feature = "qapi-qmp")] ++use qapi_qmp::{QmpMessageAny, QmpCommand, QapiCapabilities, QMPCapability}; ++#[cfg(feature = "qapi-qmp")] ++use super::QmpStreamNegotiation; ++use super::{codec::JsonLinesCodec, QapiEvents, QapiService, QapiStream, QapiShared}; ++ ++pub struct QgaStreamTokio { ++ stream: Framed>> ++} ++ ++impl QgaStreamTokio { ++ fn new(stream: S) -> Self { ++ Self { ++ stream: Framed::from_parts(FramedParts::new::<()>(stream, JsonLinesCodec::new())), ++ } ++ } ++ ++ fn pair(self, write: W) -> QapiStream { ++ let shared = Arc::new(QapiShared::new(false)); ++ let events = QapiEvents { ++ stream: self, ++ shared: shared.clone(), ++ }; ++ let service = QapiService::new(write, shared); ++ QapiStream { ++ service, ++ events, ++ } ++ } ++ ++ pub fn open_split(read: S, write: W) -> QapiStream> { ++ let r = Self::new(read); ++ let w = QgaStreamTokio::new(write); ++ ++ r.pair(w) ++ } ++} ++ ++impl QgaStreamTokio> { ++ pub fn open(stream: R) -> QapiStream>> where ++ R: AsyncRead + AsyncWrite, ++ { ++ let (r, w) = split(stream); ++ let r = Self::new(r); ++ let w = QgaStreamTokio::new(w); ++ ++ r.pair(w) ++ } ++} ++ ++#[cfg(feature = "async-tokio-net")] ++impl QgaStreamTokio> { ++ pub async fn open_uds>(socket_addr: P) -> io::Result>>> { ++ let socket = tokio::net::UnixStream::connect(socket_addr).await?; ++ let (r, w) = split(socket); ++ Ok(Self::open_split(r, w)) ++ } ++} ++ ++impl QgaStreamTokio { ++ fn stream(self: Pin<&mut Self>) -> Pin<&mut Framed>>> { ++ unsafe { ++ self.map_unchecked_mut(|this| &mut this.stream) ++ } ++ } ++} ++ ++impl Stream for QgaStreamTokio { ++ type Item = io::Result>; ++ ++ fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { ++ self.stream().poll_next(cx) ++ } ++} ++ ++#[cfg(feature = "qapi-qga")] ++impl Sink> for QgaStreamTokio { ++ type Error = io::Error; ++ ++ fn start_send(self: Pin<&mut Self>, item: Execute) -> Result<(), Self::Error> { ++ self.stream().start_send(item) ++ } ++ ++ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { ++ Sink::>::poll_ready(self.stream(), cx) ++ } ++ ++ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { ++ Sink::>::poll_flush(self.stream(), cx) ++ } ++ ++ fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { ++ Sink::>::poll_close(self.stream(), cx) ++ } ++} ++ ++#[cfg(feature = "qapi-qmp")] ++pub struct QmpStreamTokio { ++ stream: Framed>, ++} ++ ++#[cfg(feature = "qapi-qmp")] ++impl QmpStreamTokio { ++ fn stream(self: Pin<&mut Self>) -> Pin<&mut Framed>> { ++ unsafe { ++ self.map_unchecked_mut(|this| &mut this.stream) ++ } ++ } ++} ++ ++#[cfg(feature = "qapi-qmp")] ++impl Stream for QmpStreamTokio { ++ type Item = io::Result; ++ ++ fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { ++ self.stream().poll_next(cx) ++ } ++} ++ ++#[cfg(feature = "qapi-qmp")] ++impl Sink> for QmpStreamTokio { ++ type Error = io::Error; ++ ++ fn start_send(self: Pin<&mut Self>, item: Execute) -> Result<(), Self::Error> { ++ self.stream().start_send(item) ++ } ++ ++ fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { ++ Sink::>::poll_ready(self.stream(), cx) ++ } ++ ++ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { ++ Sink::>::poll_flush(self.stream(), cx) ++ } ++ ++ fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { ++ Sink::>::poll_close(self.stream(), cx) ++ } ++} ++ ++#[cfg(feature = "qapi-qmp")] ++impl QmpStreamTokio { ++ pub fn new(stream: S) -> Self { ++ Self { ++ stream: Framed::from_parts(FramedParts::new::<()>(stream, JsonLinesCodec::::new())), ++ } ++ } ++ ++ pub async fn open_split(read: S, write: W) -> io::Result>> where ++ S: AsyncRead + Unpin, ++ { ++ use futures::StreamExt; ++ ++ let mut lines = Framed::from_parts(FramedParts::new::<()>(read, JsonLinesCodec::::new())); ++ ++ let capabilities = lines.next().await.ok_or_else(|| ++ io::Error::new(io::ErrorKind::UnexpectedEof, "QMP greeting expected") ++ )??; ++ ++ let lines = lines.into_parts(); ++ let mut read = FramedParts::new::<()>(lines.io, JsonLinesCodec::new()); ++ read.read_buf = lines.read_buf; ++ let stream = Framed::from_parts(read); ++ ++ let supports_oob = capabilities.capabilities().any(|c| c == QMPCapability::oob); ++ let shared = Arc::new(QapiShared::new(supports_oob)); ++ let events = QapiEvents { ++ stream: Self { stream }, ++ shared: shared.clone(), ++ }; ++ let service = QapiService::new(QmpStreamTokio::new(write), shared); ++ ++ Ok(QmpStreamNegotiation { ++ stream: QapiStream { ++ service, ++ events, ++ }, ++ capabilities, ++ }) ++ } ++} ++ ++#[cfg(feature = "qapi-qmp")] ++impl QmpStreamTokio> { ++ pub async fn open(stream: RW) -> io::Result>>> where RW: Unpin { ++ let (r, w) = split(stream); ++ Self::open_split(r, w).await ++ } ++} ++ ++#[cfg(all(feature = "qapi-qmp", feature = "async-tokio-net"))] ++impl QmpStreamTokio> { ++ pub async fn open_uds>(socket_addr: P) -> io::Result>>> { ++ let socket = tokio::net::UnixStream::connect(socket_addr).await?; ++ let (r, w) = split(socket); ++ Self::open_split(r, w).await ++ } ++} +diff --git a/vmm/sandbox/stratovirt-qapi/src/futures/tower.rs b/vmm/sandbox/stratovirt-qapi/src/futures/tower.rs +new file mode 100644 +index 00000000..374ec8c8 +--- /dev/null ++++ b/vmm/sandbox/stratovirt-qapi/src/futures/tower.rs +@@ -0,0 +1,25 @@ ++use std::io; ++use std::pin::Pin; ++use std::task::{Context, Poll}; ++use futures::{Sink, Future, FutureExt}; ++use tower_service::Service; ++use crate::{Command, Execute, ExecuteError}; ++use super::QapiService; ++ ++// this really doesn't work well for lifetime reasons? ++ ++impl Service for QapiService where ++ W: Sink, Error=io::Error> + Unpin + Send, ++{ ++ type Response = C::Ok; ++ type Error = ExecuteError; ++ type Future = Pin> + 'static>>; ++ ++ fn poll_ready(&mut self, _: &mut Context) -> Poll> { ++ Poll::Ready(Ok(())) ++ } ++ ++ fn call(&mut self, req: C) -> Self::Future { ++ self.execute(req).boxed() ++ } ++} +diff --git a/vmm/sandbox/stratovirt-qapi/src/lib.rs b/vmm/sandbox/stratovirt-qapi/src/lib.rs +new file mode 100644 +index 00000000..d9d8505d +--- /dev/null ++++ b/vmm/sandbox/stratovirt-qapi/src/lib.rs +@@ -0,0 +1,340 @@ ++#![doc(html_root_url = "http://docs.rs/qapi/0.8.0")] ++ ++#[cfg(feature = "qapi-qmp")] ++pub use qapi_qmp as qmp; ++ ++#[cfg(feature = "qapi-qga")] ++pub use qapi_qga as qga; ++ ++pub use qapi_spec::{Any, Dictionary, Empty, Never, Execute, ExecuteOob, Command, CommandResult, Event, Enum, Error, ErrorClass, Timestamp}; ++ ++pub use self::stream::Stream; ++ ++#[cfg(feature = "qapi-qmp")] ++pub use self::qmp_impl::*; ++ ++#[cfg(feature = "qapi-qga")] ++pub use self::qga_impl::*; ++ ++use std::{error, fmt, io}; ++ ++#[cfg(feature = "async")] ++pub mod futures; ++ ++#[derive(Debug)] ++pub enum ExecuteError { ++ Qapi(Error), ++ Io(io::Error), ++} ++ ++pub type ExecuteResult = Result<::Ok, ExecuteError>; ++ ++impl fmt::Display for ExecuteError { ++ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { ++ match self { ++ ExecuteError::Qapi(e) => fmt::Display::fmt(e, f), ++ ExecuteError::Io(e) => fmt::Display::fmt(e, f), ++ } ++ } ++} ++ ++impl error::Error for ExecuteError { ++ fn source(&self) -> Option<&(dyn error::Error + 'static)> { ++ match self { ++ ExecuteError::Qapi(e) => Some(e), ++ ExecuteError::Io(e) => Some(e), ++ } ++ } ++} ++ ++impl From for ExecuteError { ++ fn from(e: io::Error) -> Self { ++ ExecuteError::Io(e) ++ } ++} ++ ++impl From for ExecuteError { ++ fn from(e: Error) -> Self { ++ ExecuteError::Qapi(e) ++ } ++} ++ ++impl From for io::Error { ++ fn from(e: ExecuteError) -> Self { ++ match e { ++ ExecuteError::Qapi(e) => e.into(), ++ ExecuteError::Io(e) => e, ++ } ++ } ++} ++ ++#[cfg(any(feature = "qapi-qmp", feature = "qapi-qga"))] ++mod qapi { ++ use serde_json; ++ use serde::{Serialize, Deserialize}; ++ use std::io::{self, BufRead, Write}; ++ use crate::{Command, Execute}; ++ use log::trace; ++ ++ pub struct Qapi { ++ pub stream: S, ++ pub buffer: Vec, ++ } ++ ++ impl Qapi { ++ pub fn new(s: S) -> Self { ++ Qapi { ++ stream: s, ++ buffer: Default::default(), ++ } ++ } ++ } ++ ++ impl Qapi { ++ pub fn decode_line<'de, D: Deserialize<'de>>(&'de mut self) -> io::Result> { ++ self.buffer.clear(); ++ let line = self.stream.read_until(b'\n', &mut self.buffer)?; ++ let line = &self.buffer[..line]; ++ trace!("<- {}", String::from_utf8_lossy(line)); ++ ++ if line.is_empty() { ++ Ok(None) ++ } else { ++ serde_json::from_slice(line).map(Some).map_err(From::from) ++ } ++ } ++ } ++ ++ impl Qapi { ++ pub fn encode_line(&mut self, command: &C) -> io::Result<()> { ++ { ++ let mut ser = serde_json::Serializer::new(&mut self.stream); ++ command.serialize(&mut ser)?; ++ } ++ ++ self.stream.write(&[b'\n'])?; ++ ++ self.stream.flush() ++ } ++ ++ pub fn write_command(&mut self, command: &C) -> io::Result<()> { ++ self.encode_line(&Execute::<&C>::from(command))?; ++ ++ trace!("-> execute {}: {}", C::NAME, serde_json::to_string_pretty(command).unwrap()); ++ ++ Ok(()) ++ } ++ } ++} ++ ++mod stream { ++ use std::io::{Read, Write, BufRead, Result}; ++ ++ pub struct Stream { ++ r: R, ++ w: W, ++ } ++ ++ impl Stream { ++ pub fn new(r: R, w: W) -> Self { ++ Stream { ++ r, ++ w, ++ } ++ } ++ ++ pub fn into_inner(self) -> (R, W) { ++ (self.r, self.w) ++ } ++ ++ pub fn get_ref_read(&self) -> &R { &self.r } ++ pub fn get_mut_read(&mut self) -> &mut R { &mut self.r } ++ pub fn get_ref_write(&self) -> &W { &self.w } ++ pub fn get_mut_write(&mut self) -> &mut W { &mut self.w } ++ } ++ ++ impl Read for Stream { ++ fn read(&mut self, buf: &mut [u8]) -> Result { ++ self.r.read(buf) ++ } ++ } ++ ++ impl BufRead for Stream { ++ fn fill_buf(&mut self) -> Result<&[u8]> { ++ self.r.fill_buf() ++ } ++ ++ fn consume(&mut self, amt: usize) { ++ self.r.consume(amt) ++ } ++ } ++ ++ impl Write for Stream { ++ fn write(&mut self, buf: &[u8]) -> Result { ++ self.w.write(buf) ++ } ++ ++ fn flush(&mut self) -> Result<()> { ++ self.w.flush() ++ } ++ } ++} ++ ++#[cfg(feature = "qapi-qmp")] ++mod qmp_impl { ++ use std::io::{self, BufRead, Read, Write, BufReader}; ++ use std::vec::Drain; ++ use qapi_qmp::{QMP, QapiCapabilities, QmpMessage, Event, qmp_capabilities, query_version}; ++ use crate::{qapi::Qapi, Stream, ExecuteResult, ExecuteError, Command}; ++ ++ pub struct Qmp { ++ inner: Qapi, ++ event_queue: Vec, ++ } ++ ++ impl Qmp, S>> { ++ pub fn from_stream(s: S) -> Self { ++ Self::new(Stream::new(BufReader::new(s.clone()), s)) ++ } ++ } ++ ++ impl Qmp { ++ pub fn new(stream: S) -> Self { ++ Qmp { ++ inner: Qapi::new(stream), ++ event_queue: Default::default(), ++ } ++ } ++ ++ pub fn into_inner(self) -> S { ++ self.inner.stream ++ } ++ ++ pub fn inner(&self) -> &S { ++ &self.inner.stream ++ } ++ ++ pub fn inner_mut(&mut self) -> &mut S { ++ &mut self.inner.stream ++ } ++ ++ pub fn events(&mut self) -> Drain { ++ self.event_queue.drain(..) ++ } ++ } ++ ++ impl Qmp { ++ pub fn read_capabilities(&mut self) -> io::Result { ++ self.inner.decode_line().map(|v: Option| ++ v.expect("unexpected eof").QMP ++ ) ++ } ++ ++ pub fn read_response(&mut self) -> ExecuteResult { ++ loop { ++ match self.inner.decode_line()? { ++ None => return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "expected command response").into()), ++ Some(QmpMessage::Response(res)) => return res.result().map_err(From::from), ++ Some(QmpMessage::Event(e)) => self.event_queue.push(e), ++ } ++ } ++ } ++ } ++ ++ impl Qmp { ++ pub fn write_command(&mut self, command: &C) -> io::Result<()> { ++ self.inner.write_command(command) ++ } ++ ++ pub fn execute(&mut self, command: &C) -> ExecuteResult { ++ self.write_command(command)?; ++ self.read_response::() ++ } ++ ++ pub fn handshake(&mut self) -> Result { ++ let caps = self.read_capabilities()?; ++ self.execute(&qmp_capabilities { enable: None }) ++ .map(|_| caps) ++ } ++ ++ /// Can be used to poll the socket for pending events ++ pub fn nop(&mut self) -> io::Result<()> { ++ self.execute(&query_version { }) ++ .map_err(From::from) ++ .map(drop) ++ } ++ } ++} ++ ++#[cfg(feature = "qapi-qga")] ++mod qga_impl { ++ use std::io::{self, BufRead, Read, Write, BufReader}; ++ use qapi_qga::guest_sync; ++ use qapi_spec::Response; ++ use crate::{qapi::Qapi, Stream, Command, ExecuteResult, ExecuteError}; ++ ++ pub struct Qga { ++ inner: Qapi, ++ } ++ ++ impl Qga, S>> { ++ pub fn from_stream(s: S) -> Self { ++ Self::new(Stream::new(BufReader::new(s.clone()), s)) ++ } ++ } ++ ++ impl Qga { ++ pub fn new(stream: S) -> Self { ++ Qga { ++ inner: Qapi::new(stream), ++ } ++ } ++ ++ pub fn into_inner(self) -> S { ++ self.inner.stream ++ } ++ ++ pub fn inner(&self) -> &S { ++ &self.inner.stream ++ } ++ ++ pub fn inner_mut(&mut self) -> &mut S { ++ &mut self.inner.stream ++ } ++ } ++ ++ impl Qga { ++ pub fn read_response(&mut self) -> ExecuteResult { ++ loop { ++ match self.inner.decode_line()?.map(|r: Response<_>| r.result()) { ++ None => return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "expected command response").into()), ++ Some(Ok(res)) => return Ok(res), ++ Some(Err(e)) => return Err(e.into()), ++ } ++ } ++ } ++ } ++ ++ impl Qga { ++ pub fn write_command(&mut self, command: &C) -> io::Result<()> { ++ self.inner.write_command(command) ++ } ++ ++ pub fn execute(&mut self, command: &C) -> ExecuteResult { ++ self.write_command(command)?; ++ self.read_response::() ++ } ++ ++ pub fn guest_sync(&mut self, id: isize) -> Result<(), ExecuteError> { ++ let sync = guest_sync { ++ id, ++ }; ++ ++ match self.execute(&sync) { ++ Ok(r) if r == sync.id => Ok(()), ++ Ok(..) => Err(io::Error::new(io::ErrorKind::InvalidData, "guest-sync handshake failed").into()), ++ Err(e) => Err(e.into()), ++ } ++ } ++ } ++} +-- +2.33.0 + diff --git a/series.conf b/series.conf new file mode 100644 index 0000000000000000000000000000000000000000..e726e0365c3a30c98a9daad836b429e49afcb084 --- /dev/null +++ b/series.conf @@ -0,0 +1 @@ +0001-replace-stratovirt-qapi.patch