From a707b0c104d9fc165b298f3ef23044005535ccc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E7=85=9C?= <9930261+zhuyu901115@user.noreply.gitee.com> Date: Thu, 12 Jun 2025 09:16:28 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Ewin=5Fthread=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rocksdb/rocksdb/port/win/win_thread.cc | 183 ++++++++++++++++++ storage/rocksdb/rocksdb/port/win/win_thread.h | 122 ++++++++++++ 2 files changed, 305 insertions(+) create mode 100644 storage/rocksdb/rocksdb/port/win/win_thread.cc create mode 100644 storage/rocksdb/rocksdb/port/win/win_thread.h diff --git a/storage/rocksdb/rocksdb/port/win/win_thread.cc b/storage/rocksdb/rocksdb/port/win/win_thread.cc new file mode 100644 index 000000000..2bc05e8da --- /dev/null +++ b/storage/rocksdb/rocksdb/port/win/win_thread.cc @@ -0,0 +1,183 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#if defined(OS_WIN) + +#include "port/win/win_thread.h" + +#include +#include // __beginthreadex +#include + +#include +#include +#include + +namespace ROCKSDB_NAMESPACE { +namespace port { + +struct WindowsThread::Data { + + std::function func_; + uintptr_t handle_; + + Data(std::function&& func) : + func_(std::move(func)), + handle_(0) { + } + + Data(const Data&) = delete; + Data& operator=(const Data&) = delete; + + static unsigned int __stdcall ThreadProc(void* arg); +}; + + +void WindowsThread::Init(std::function&& func) { + + data_ = std::make_shared(std::move(func)); + // We create another instance of std::shared_ptr to get an additional ref + // since we may detach and destroy this instance before the threadproc + // may start to run. We choose to allocate this additional ref on the heap + // so we do not need to synchronize and allow this thread to proceed + std::unique_ptr> th_data(new std::shared_ptr(data_)); + + data_->handle_ = _beginthreadex(NULL, + 0, // stack size + &Data::ThreadProc, + th_data.get(), + 0, // init flag + &th_id_); + + if (data_->handle_ == 0) { + throw std::system_error(std::make_error_code( + std::errc::resource_unavailable_try_again), + "Unable to create a thread"); + } + th_data.release(); +} + +WindowsThread::WindowsThread() : + data_(nullptr), + th_id_(0) +{} + + +WindowsThread::~WindowsThread() { + // Must be joined or detached + // before destruction. + // This is the same as std::thread + if (data_) { + if (joinable()) { + assert(false); + std::terminate(); + } + data_.reset(); + } +} + +WindowsThread::WindowsThread(WindowsThread&& o) noexcept : + WindowsThread() { + *this = std::move(o); +} + +WindowsThread& WindowsThread::operator=(WindowsThread&& o) noexcept { + + if (joinable()) { + assert(false); + std::terminate(); + } + + data_ = std::move(o.data_); + + // Per spec both instances will have the same id + th_id_ = o.th_id_; + + return *this; +} + +bool WindowsThread::joinable() const { + return (data_ && data_->handle_ != 0); +} + +WindowsThread::native_handle_type WindowsThread::native_handle() const { + return reinterpret_cast(data_->handle_); +} + +unsigned WindowsThread::hardware_concurrency() { + return std::thread::hardware_concurrency(); +} + +void WindowsThread::join() { + + if (!joinable()) { + assert(false); + throw std::system_error( + std::make_error_code(std::errc::invalid_argument), + "Thread is no longer joinable"); + } + + if (GetThreadId(GetCurrentThread()) == th_id_) { + assert(false); + throw std::system_error( + std::make_error_code(std::errc::resource_deadlock_would_occur), + "Can not join itself"); + } + + auto ret = WaitForSingleObject(reinterpret_cast(data_->handle_), + INFINITE); + if (ret != WAIT_OBJECT_0) { + auto lastError = GetLastError(); + assert(false); + throw std::system_error(static_cast(lastError), + std::system_category(), + "WaitForSingleObjectFailed: thread join"); + } + + BOOL rc +#if defined(_MSC_VER) + = FALSE; +#else + __attribute__((__unused__)); +#endif + rc = CloseHandle(reinterpret_cast(data_->handle_)); + assert(rc != 0); + data_->handle_ = 0; +} + +bool WindowsThread::detach() { + + if (!joinable()) { + assert(false); + throw std::system_error( + std::make_error_code(std::errc::invalid_argument), + "Thread is no longer available"); + } + + BOOL ret = CloseHandle(reinterpret_cast(data_->handle_)); + data_->handle_ = 0; + + return (ret != 0); +} + +void WindowsThread::swap(WindowsThread& o) { + data_.swap(o.data_); + std::swap(th_id_, o.th_id_); +} + +unsigned int __stdcall WindowsThread::Data::ThreadProc(void* arg) { + auto ptr = reinterpret_cast*>(arg); + std::unique_ptr> data(ptr); + (*data)->func_(); + return 0; +} +} // namespace port +} // namespace ROCKSDB_NAMESPACE + +#endif diff --git a/storage/rocksdb/rocksdb/port/win/win_thread.h b/storage/rocksdb/rocksdb/port/win/win_thread.h new file mode 100644 index 000000000..472b29955 --- /dev/null +++ b/storage/rocksdb/rocksdb/port/win/win_thread.h @@ -0,0 +1,122 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include +#include +#include + +#include "rocksdb/rocksdb_namespace.h" + +namespace ROCKSDB_NAMESPACE { +namespace port { + +// This class is a replacement for std::thread +// 2 reasons we do not like std::thread: +// -- is that it dynamically allocates its internals that are automatically +// freed when the thread terminates and not on the destruction of the +// object. This makes it difficult to control the source of memory +// allocation +// - This implements Pimpl so we can easily replace the guts of the +// object in our private version if necessary. +class WindowsThread { + + struct Data; + + std::shared_ptr data_; + unsigned int th_id_; + + void Init(std::function&&); + +public: + + typedef void* native_handle_type; + + // Construct with no thread + WindowsThread(); + + // Template constructor + // + // This templated constructor accomplishes several things + // + // - Allows the class as whole to be not a template + // + // - take "universal" references to support both _lvalues and _rvalues + // + // - because this constructor is a catchall case in many respects it + // may prevent us from using both the default __ctor, the move __ctor. + // Also it may circumvent copy __ctor deletion. To work around this + // we make sure this one has at least one argument and eliminate + // it from the overload selection when WindowsThread is the first + // argument. + // + // - construct with Fx(Ax...) with a variable number of types/arguments. + // + // - Gathers together the callable object with its arguments and constructs + // a single callable entity + // + // - Makes use of std::function to convert it to a specification-template + // dependent type that both checks the signature conformance to ensure + // that all of the necessary arguments are provided and allows pimpl + // implementation. + template::type, + WindowsThread>::value>::type> + explicit WindowsThread(Fn&& fx, Args&&... ax) : + WindowsThread() { + + // Use binder to create a single callable entity + auto binder = std::bind(std::forward(fx), + std::forward(ax)...); + // Use std::function to take advantage of the type erasure + // so we can still hide implementation within pimpl + // This also makes sure that the binder signature is compliant + std::function target = binder; + + Init(std::move(target)); + } + + + ~WindowsThread(); + + WindowsThread(const WindowsThread&) = delete; + + WindowsThread& operator=(const WindowsThread&) = delete; + + WindowsThread(WindowsThread&&) noexcept; + + WindowsThread& operator=(WindowsThread&&) noexcept; + + bool joinable() const; + + unsigned int get_id() const { return th_id_; } + + native_handle_type native_handle() const; + + static unsigned hardware_concurrency(); + + void join(); + + bool detach(); + + void swap(WindowsThread&); +}; +} // namespace port +} // namespace ROCKSDB_NAMESPACE + +namespace std { +inline void swap(ROCKSDB_NAMESPACE::port::WindowsThread& th1, + ROCKSDB_NAMESPACE::port::WindowsThread& th2) { + th1.swap(th2); +} +} // namespace std + -- Gitee