From d3b3fdf0265081ec872c53265279a25e9ccc5aa0 Mon Sep 17 00:00:00 2001 From: Laika Date: Mon, 3 Feb 2025 23:33:53 +0100 Subject: [PATCH] Initial commit --- .github/workflows/test.yml | 30 +++ .gitignore | 1 + Cargo.lock | 72 +++++++ Cargo.toml | 21 ++ LICENSE | 21 ++ README.md | 102 +++++++++ SECURITY.md | 22 ++ src/lib.rs | 16 ++ src/shotgun.rs | 426 +++++++++++++++++++++++++++++++++++++ 9 files changed, 711 insertions(+) create mode 100644 .github/workflows/test.yml create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 LICENSE create mode 100644 README.md create mode 100644 SECURITY.md create mode 100644 src/lib.rs create mode 100644 src/shotgun.rs diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..7fd8034 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,30 @@ +name: Cargo test & clippy +permissions: + contents: read + pull-requests: write + +on: + push: + branches: ["main"] + + pull_request: + branches: ["main"] + +env: + CARGO_TERM_COLOR: always + +jobs: + task: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v5 + + - name: Build + run: cargo build --verbose + + - name: Run tests + run: cargo test --verbose --all-features + + - name: Run clippy + run: cargo clippy --all-features diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..28ecd8c --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,72 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "laika" +version = "0.1.4" +dependencies = [ + "tokio", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" + +[[package]] +name = "proc-macro2" +version = "1.0.103" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "syn" +version = "2.0.110" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a99801b5bd34ede4cf3fc688c5919368fea4e4814a4664359503e6015b280aea" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "tokio" +version = "1.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408" +dependencies = [ + "pin-project-lite", + "tokio-macros", +] + +[[package]] +name = "tokio-macros" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-ident" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..cc37e7d --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "laika" +version = "0.1.4" +edition = "2024" +authors = ["Laika Schmidt "] +description = "A namespace crate containing miscellaneous submodules (like an SPMC channel) of owner its-laika" +repository = "https://github.com/its-laika/crates" +readme = "README.md" +license = "MIT" +keywords = ["channel", "spmc", "oneshot"] +categories = ["concurrency"] +include = ["**/*.rs", "Cargo.toml"] + +[lib] +source = "src/lib.rs" + +[features] +shotgun = [] + +[dev-dependencies] +tokio = { version = "1.48", features = ["rt", "macros"] } diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..bd99959 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 Laika Schmidt + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..ee2b9c4 --- /dev/null +++ b/README.md @@ -0,0 +1,102 @@ +# Laika's namespace crate + +**This module contains multiple submodules (included via feature flags) with +different functionalities. They're all grouped under the `laika` namespace, +providing some kind of scoped crates (avoiding naming conflicts).** + +## Submodules / Features + +## shotgun +_A dead simple one-shot single producer, multiple consumer (SPMC) channel_ + +### About +Shotgun is a simple one-shot single producer, multiple consumer (SPMC) channel. +It internally uses `std::sync::Mutex` and `std::sync::Arc` and does not contain +any unsafe code. + +### When to use + +Likely when you need to pass a signal to multiple threads or functions to stop +in order to shut down the application. + +### How to use + +#### Synchronous +```rust +fn main() { + use laika::shotgun::channel; + use std::thread; + use std::time; + + let (tx, rx) = channel(); + + let rx1 = rx.clone(); + let thread1 = thread::spawn(move || loop { + if rx1.try_recv().is_some() { + return 1; + } + + thread::sleep(time::Duration::from_secs(1)); + }); + + let rx2 = rx.clone(); + let thread2 = thread::spawn(move || loop { + if rx2.try_recv().is_some() { + return 2; + } + + thread::sleep(time::Duration::from_secs(1)); + }); + + thread::sleep(time::Duration::from_secs(2)); + + tx.send(()); // `tx` is dropped here. + + assert!(thread1.join().is_ok_and(|v| v == 1)); + assert!(thread2.join().is_ok_and(|v| v == 2)); +} +``` + +#### Asynchronous +```rust +#[tokio] +async fn main() { + use laika::shotgun::channel; + use std::thread; + use std::time; + + let (tx, rx) = channel(); + + let rx1 = rx.clone(); + let fun1 = async move { + rx1.await; + 1 + }; + + let rx2 = rx.clone(); + let fun2 = async move { + // Explicit call to recv(), does the same as calling`.await` directly. + rx2.recv().await; + 2 + }; + + thread::sleep(time::Duration::from_secs(2)); + + tx.send(()); + + let rx3 = rx.clone(); + let fun3 = async move { + rx3.await; + 3 + }; + + let result = join!(fun1, fun2); + + assert_eq!(result.0, 1); + assert_eq!(result.1, 2); + assert_eq!(fun3.await, 3); +} +``` + +# License +[MIT](LICENSE) \ No newline at end of file diff --git a/SECURITY.md b/SECURITY.md new file mode 100644 index 0000000..38f82aa --- /dev/null +++ b/SECURITY.md @@ -0,0 +1,22 @@ +# Security Policy + +## Supported Versions + +Support is limited to the latest version and also the current state in the main +branch. + +| Version | Supported | +| ------------- | ------------------ | +| _main_ | :white_check_mark: | +| 0.1.4 | :white_check_mark: | +| <= 0.1.3 | :x: | + +## Reporting a Vulnerability + +If you want to report a vulnerability, check if it's possible to create an issue +without endangering other users. If that's not the case, send a mail to the +address on my ([its-laika](https://github.com/its-laika)) profile. I check my +mails pretty frequently so expect a reply within a few days. From there on, I'll +do my best to fix the vulnerability asap. If you want to, you can also provide +patches directly or via pull requests. A new version is pushed as soon as the +vulnerability is fixed. diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..b1936da --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,16 @@ +//! # Laika's namespace crate +//! +//! This module contains multiple submodules (included via feature flags) with +//! different functionalities. They're all grouped under the `laika` namespace, +//! providing some kind of scoped crates (avoiding naming conflicts). +//! +//! ## Submodules / Features +//! +//! ### [`shotgun`] +//! +//! Shotgun is a simple one-shot single producer, multiple consumer (SPMC) +//! channel. It internally uses `std::sync::Mutex` and `std::sync::Arc` and does +//! not contain any unsafe code. +//! See module documentation for more information. +#[cfg(feature = "shotgun")] +pub mod shotgun; diff --git a/src/shotgun.rs b/src/shotgun.rs new file mode 100644 index 0000000..4daf99a --- /dev/null +++ b/src/shotgun.rs @@ -0,0 +1,426 @@ +#![forbid(unsafe_code)] +//! # A dead simple one-shot single producer, multiple consumer (SPMC) channel +//! +//! Shotgun is a simple oneshot single producer, multiple consumer (SPMC) +//! channel. Internally using [`std::sync::Mutex`] and [`std::sync::Arc`], not +//! containing any unsafe code. + +use std::{ + clone::Clone, + future::Future, + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll, Waker}, +}; + +/// Oneshot receiver of a [`channel`] +/// +/// Use [`Receiver::try_recv`] or [`Receiver::recv`] to (try to) receive a value +/// from the channel, if it has been sent. As this is a oneshot receiver, only +/// one value can be received. +/// +/// # Examples +/// +/// ## Synchronous +/// +/// ```rust +/// let (mut tx, rx) = laika::shotgun::channel(); +/// +/// // Initialy, oneshot receiver has no value +/// assert_eq!(rx.try_recv(), None); +/// +/// // Send a value +/// tx.send(12); +/// +/// // Now, oneshot receiver has the value +/// assert_eq!(rx.try_recv(), Some(12)); +/// ``` +/// +/// ## Asynchronous +/// +/// ```no_run +/// let (mut tx, rx) = laika::shotgun::channel(); +/// +/// // ... in any async runtime +/// +/// let fun1 = async move { +/// rx.recv().await; +/// return 1; +/// }; +/// +/// // Send a value +/// tx.send(12); +/// ``` +#[derive(Clone, Debug)] +pub struct Receiver +where + T: Clone, +{ + /// Inner receiver that holds the sent value and possible wakers + inner: Arc>>, +} + +/// Oneshot sender of a [`channel`] +/// +/// Use [`Sender::send`] to send a value to all receivers of the channel. +/// As this is a oneshot sender, only one value can be sent. +/// +/// # Examples +/// ## Send a value +/// +/// ```rust +/// let (mut tx, rx) = laika::shotgun::channel(); +/// +/// // Send a value +/// tx.send(12); +/// ``` +/// +/// ## Sender is dropped after sending +/// +/// ```compile_fail +/// let (mut tx, rx) = laika::shotgun::channel(); +/// +/// // Send a value +/// tx.send(12); +/// tx.send(13); // This won't compile +/// ``` +#[derive(Debug)] +pub struct Sender +where + T: Clone, +{ + inner: _Sender, +} + +impl Receiver +where + T: Clone, +{ + /// Try to receive a value from the channel, if it has been sent. + /// As this is a oneshot receiver, only one value can be received. + /// This function is **non-blocking** and just returns [`None`] if no value + /// has been sent. + /// + /// # Panics + /// + /// Panics if mutex is poisened due to another thread panicking while using + /// inner receiver too. + /// + /// # Examples + /// ```rust + /// let (mut tx, rx) = laika::shotgun::channel(); + /// + /// // Initialy, oneshot receiver has no value + /// assert_eq!(rx.try_recv(), None); + /// + /// // Send a value + /// tx.send(12); + /// + /// // Now, oneshot receiver has the value + /// assert_eq!(rx.try_recv(), Some(12)); + /// // Value is kept after being received + /// assert_eq!(rx.try_recv(), Some(12)); + /// ``` + pub fn try_recv(&self) -> Option + where + T: Clone, + { + self.inner + .as_ref() + .lock() + .expect("Mutex is poisoned") + .try_recv() + } + + /// Receive a value from the channel. + /// Waits until value has been sent and then returns it. + /// This function is blocking asynchronously. + /// + /// # Note + /// You can directly [`Future`]'s `.await` on the receiver too. + /// + /// # Examples + /// (*Note that this won't compile because no async runtime exists here.*) + /// ```compile_fail + /// let (mut tx, rx) = laika::shotgun::channel(); + /// + /// let fun1 = async move { + /// rx.recv().await; + /// return 1; + /// }; + /// + /// std::thread::sleep(std::time::Duration::from_secs(1)); + /// + /// // Send a value + /// tx.send(()); + /// + /// // Now, oneshot receiver has the value + /// assert_eq!(fun1.await, 1); + /// ``` + pub async fn recv(self) -> T { + self.await + } +} + +impl Sender +where + T: Clone, +{ + /// Send a value to all receivers of the channel. + /// As this is a oneshot sender, only one value can be sent. + /// + /// # Examples + /// ## Send a value + /// + /// ```rust + /// let (mut tx, rx) = laika::shotgun::channel(); + /// + /// // Send a value + /// tx.send(12); + /// ``` + pub fn send(self, value: T) { + self.inner.send(value); + } +} + +/// Inner receiver of a [`channel`] +#[derive(Clone, Debug)] +struct _Receiver +where + T: Clone, +{ + /// Value that was sent by [`_Sender`] + value: Option, + /// Wakers that will be woken up when value is sent by [`_Sender`] + wakers: Vec, +} + +/// Inner sender of a [`channel`] +#[derive(Clone, Debug)] +struct _Sender +where + T: Clone, +{ + /// [`_Receiver`] instance that will receive the value and is referecend by + /// all [`Receiver`]s. + receiver: Option>>>, +} + +impl _Receiver +where + T: Clone, +{ + /// Clones the value (if it has been given by [`_Sender`]) and returns clone + /// of it. + fn try_recv(&self) -> Option { + self.value.clone() + } + + /// Sets the value to be received by all [`Receiver`]s from [`_Sender`]. + fn set(&mut self, value: T) { + self.value = Some(value); + + for waker in self.wakers.clone() { + waker.wake(); + } + } +} + +/// Implement [`Future`] for [`Receiver`] to be able to use it in async +/// functions. +impl Future for Receiver +where + T: Clone, +{ + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut inner = self.inner.lock().expect("Mutex is poisoned"); + + if let Some(value) = &inner.value { + Poll::Ready(value.clone()) + } else { + if inner.wakers.iter().all(|w| !w.will_wake(cx.waker())) { + inner.wakers.push(cx.waker().clone()); + } + Poll::Pending + } + } +} + +impl _Sender +where + T: Clone, +{ + /// Send a value to all [`Receiver`]s. + /// + /// # Panics + /// + /// Panics if mutex is poisened due to another thread panicking while using + /// referenced receiver too. + fn send(self, value: T) { + if let Some(recv) = self.receiver.as_ref() { + recv.lock().expect("Mutex is poisoned").set(value); + } + } +} + +/// Creates a one-shot, single producer multiple consumer channel that can be +/// used to send one value to multiple receivers. +/// +/// # Examples +/// +/// ```rust +/// let (mut tx, rx) = laika::shotgun::channel::(); +/// // do something with tx and rx +/// ``` +pub fn channel() -> (Sender, Receiver) +where + T: Clone, +{ + let mut sender = Sender { + inner: _Sender { receiver: None }, + }; + + let receiver_ref = Arc::new(Mutex::new(_Receiver { + value: None, + wakers: Vec::new(), + })); + + let receiver = Receiver { + inner: receiver_ref.clone(), + }; + + sender.inner.receiver = Some(receiver_ref); + + (sender, receiver) +} + +#[cfg(test)] +mod test { + use super::*; + use tokio::task::JoinSet; + + #[test] + fn test_basic() { + let (tx, rx) = channel(); + + assert_eq!(rx.try_recv(), None); + assert_eq!(rx.try_recv(), None); + + tx.send(()); + + assert_eq!(rx.try_recv(), Some(())); + assert_eq!(rx.try_recv(), Some(())); + } + + #[test] + fn test_work_without_receiver() { + let (tx, rx) = channel(); + assert_eq!(rx.try_recv(), None); + + drop(rx); + + tx.send(()); + } + + #[test] + fn test_work_without_sender() { + let (tx, rx) = channel::<()>(); + + assert_eq!(rx.try_recv(), None); + + drop(tx); + + assert_eq!(rx.try_recv(), None); + } + + #[test] + fn test_work_with_multiple_receivers() { + let (tx, rx) = channel(); + + let rx1 = rx.clone(); + let rx2 = rx.clone(); + + assert_eq!(rx.try_recv(), None); + assert_eq!(rx1.try_recv(), None); + assert_eq!(rx2.try_recv(), None); + + tx.send(1337); + + assert_eq!(rx.try_recv(), Some(1337)); + assert_eq!(rx1.try_recv(), Some(1337)); + assert_eq!(rx2.try_recv(), Some(1337)); + } + + #[test] + fn test_works_in_threads() { + use std::thread; + use std::time; + + let (tx, rx) = channel(); + + let rx1 = rx.clone(); + + let thread1 = thread::spawn(move || loop { + if rx1.try_recv().is_some() { + return 1; + } + + thread::sleep(time::Duration::from_secs(1)); + }); + + let rx2 = rx.clone(); + let thread2 = thread::spawn(move || loop { + if rx2.try_recv().is_some() { + return 2; + } + + thread::sleep(time::Duration::from_secs(1)); + }); + + thread::sleep(time::Duration::from_secs(2)); + + tx.send(()); + + assert!(thread1.join().is_ok_and(|v| v == 1)); + assert!(thread2.join().is_ok_and(|v| v == 2)); + } + + #[tokio::test] + async fn test_recv() { + use std::thread; + use std::time; + + let (tx, rx) = channel(); + + let mut join_set = JoinSet::new(); + let rx1 = rx.clone(); + join_set.spawn(async move { + rx1.await; + 1 + }); + + let rx2 = rx.clone(); + join_set.spawn(async move { + rx2.recv().await; // Explicit call to recv + 2 + }); + + thread::sleep(time::Duration::from_secs(2)); + + tx.send(()); + + let rx3 = rx.clone(); + let fun3 = async move { + rx3.await; + 3 + }; + + let result = join_set.join_all().await; + + assert_eq!(result[0], 1); + assert_eq!(result[1], 2); + assert_eq!(fun3.await, 3); + } +}