Merge pull request #38 from Trivernis/develop

Develop
main
Julius Riegel 3 years ago committed by GitHub
commit c4baf40e65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

381
Cargo.lock generated

@ -2,6 +2,15 @@
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3 version = 3
[[package]]
name = "aead"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b613b8e1e3cf911a086f53f03bf286f52fd7a7258e4fa606f0ef220d39d8877"
dependencies = [
"generic-array",
]
[[package]] [[package]]
name = "aho-corasick" name = "aho-corasick"
version = "0.7.18" version = "0.7.18"
@ -11,6 +20,15 @@ dependencies = [
"memchr", "memchr",
] ]
[[package]]
name = "ansi_term"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2"
dependencies = [
"winapi",
]
[[package]] [[package]]
name = "async-trait" name = "async-trait"
version = "0.1.52" version = "0.1.52"
@ -91,27 +109,44 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "block-buffer"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bf7fe51849ea569fd452f37822f606a5cabb684dc918707a0193fd4664ff324"
dependencies = [
"generic-array",
]
[[package]] [[package]]
name = "bromine" name = "bromine"
version = "0.19.0" version = "0.20.0"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"bincode", "bincode",
"byteorder", "byteorder",
"bytes",
"chacha20poly1305",
"criterion", "criterion",
"crossbeam-utils", "crossbeam-utils",
"futures", "futures",
"futures-core", "futures-core",
"lazy_static", "lazy_static",
"num_enum", "num_enum",
"port_check",
"postcard", "postcard",
"rand",
"rand_core 0.6.3",
"rmp-serde", "rmp-serde",
"serde", "serde",
"serde_json", "serde_json",
"sha2",
"thiserror", "thiserror",
"tokio", "tokio",
"tracing", "tracing",
"tracing-subscriber",
"trait-bound-typemap", "trait-bound-typemap",
"x25519-dalek",
] ]
[[package]] [[package]]
@ -159,6 +194,40 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chacha20"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01b72a433d0cf2aef113ba70f62634c56fddb0f244e6377185c56a7cadbd8f91"
dependencies = [
"cfg-if",
"cipher",
"cpufeatures",
"zeroize",
]
[[package]]
name = "chacha20poly1305"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b84ed6d1d5f7aa9bdde921a5090e0ca4d934d250ea3b402a5fab3a994e28a2a"
dependencies = [
"aead",
"chacha20",
"cipher",
"poly1305",
"zeroize",
]
[[package]]
name = "cipher"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ee52072ec15386f770805afd189a01c8841be8696bed250fa2f13c4c0d6dfb7"
dependencies = [
"generic-array",
]
[[package]] [[package]]
name = "clap" name = "clap"
version = "2.34.0" version = "2.34.0"
@ -182,6 +251,15 @@ dependencies = [
"volatile-register", "volatile-register",
] ]
[[package]]
name = "cpufeatures"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59a6001667ab124aebae2a495118e11d30984c3a653e99d86d58971708cf5e4b"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "criterion" name = "criterion"
version = "0.3.5" version = "0.3.5"
@ -276,6 +354,16 @@ dependencies = [
"lazy_static", "lazy_static",
] ]
[[package]]
name = "crypto-common"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57952ca27b5e3606ff4dd79b0020231aaf9d6aa76dc05fd30137538c50bd3ce8"
dependencies = [
"generic-array",
"typenum",
]
[[package]] [[package]]
name = "csv" name = "csv"
version = "1.1.6" version = "1.1.6"
@ -298,6 +386,38 @@ dependencies = [
"memchr", "memchr",
] ]
[[package]]
name = "curve25519-dalek"
version = "3.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90f9d052967f590a76e62eb387bd0bbb1b000182c3cefe5364db6b7211651bc0"
dependencies = [
"byteorder",
"digest 0.9.0",
"rand_core 0.5.1",
"subtle",
"zeroize",
]
[[package]]
name = "digest"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066"
dependencies = [
"generic-array",
]
[[package]]
name = "digest"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506"
dependencies = [
"block-buffer",
"crypto-common",
]
[[package]] [[package]]
name = "either" name = "either"
version = "1.6.1" version = "1.6.1"
@ -403,6 +523,38 @@ dependencies = [
"slab", "slab",
] ]
[[package]]
name = "generic-array"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd48d33ec7f05fbfa152300fdad764757cbded343c1aa1cff2fbaf4134851803"
dependencies = [
"typenum",
"version_check",
]
[[package]]
name = "getrandom"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce"
dependencies = [
"cfg-if",
"libc",
"wasi 0.9.0+wasi-snapshot-preview1",
]
[[package]]
name = "getrandom"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d39cd93900197114fa1fcb7ae84ca742095eed9442088988ae74fa744e930e77"
dependencies = [
"cfg-if",
"libc",
"wasi 0.10.2+wasi-snapshot-preview1",
]
[[package]] [[package]]
name = "half" name = "half"
version = "1.8.2" version = "1.8.2"
@ -525,7 +677,7 @@ dependencies = [
"log", "log",
"miow", "miow",
"ntapi", "ntapi",
"wasi", "wasi 0.11.0+wasi-snapshot-preview1",
"winapi", "winapi",
] ]
@ -608,12 +760,24 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "once_cell"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9"
[[package]] [[package]]
name = "oorandom" name = "oorandom"
version = "11.1.3" version = "11.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
[[package]]
name = "opaque-debug"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]] [[package]]
name = "pin-project-lite" name = "pin-project-lite"
version = "0.2.8" version = "0.2.8"
@ -654,6 +818,23 @@ dependencies = [
"plotters-backend", "plotters-backend",
] ]
[[package]]
name = "poly1305"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "048aeb476be11a4b6ca432ca569e375810de9294ae78f4774e78ea98a9246ede"
dependencies = [
"cpufeatures",
"opaque-debug",
"universal-hash",
]
[[package]]
name = "port_check"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6519412c9e0d4be579b9f0618364d19cb434b324fc6ddb1b27b1e682c7105ed"
[[package]] [[package]]
name = "postcard" name = "postcard"
version = "0.7.3" version = "0.7.3"
@ -671,6 +852,12 @@ version = "0.1.5-pre"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c68cb38ed13fd7bc9dd5db8f165b7c8d9c1a315104083a2b10f11354c2af97f" checksum = "7c68cb38ed13fd7bc9dd5db8f165b7c8d9c1a315104083a2b10f11354c2af97f"
[[package]]
name = "ppv-lite86"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
[[package]] [[package]]
name = "proc-macro-crate" name = "proc-macro-crate"
version = "1.1.3" version = "1.1.3"
@ -699,6 +886,45 @@ dependencies = [
"proc-macro2", "proc-macro2",
] ]
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core 0.6.3",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core 0.6.3",
]
[[package]]
name = "rand_core"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
dependencies = [
"getrandom 0.1.16",
]
[[package]]
name = "rand_core"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7"
dependencies = [
"getrandom 0.2.5",
]
[[package]] [[package]]
name = "rayon" name = "rayon"
version = "1.5.1" version = "1.5.1"
@ -890,12 +1116,38 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "sha2"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55deaec60f81eefe3cce0dc50bda92d6d8e88f2a27df7c5033b42afeb1ed2676"
dependencies = [
"cfg-if",
"cpufeatures",
"digest 0.10.3",
]
[[package]]
name = "sharded-slab"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31"
dependencies = [
"lazy_static",
]
[[package]] [[package]]
name = "slab" name = "slab"
version = "0.4.5" version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5" checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5"
[[package]]
name = "smallvec"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83"
[[package]] [[package]]
name = "socket2" name = "socket2"
version = "0.4.4" version = "0.4.4"
@ -921,6 +1173,12 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "subtle"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]] [[package]]
name = "syn" name = "syn"
version = "1.0.86" version = "1.0.86"
@ -932,6 +1190,18 @@ dependencies = [
"unicode-xid", "unicode-xid",
] ]
[[package]]
name = "synstructure"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f"
dependencies = [
"proc-macro2",
"quote",
"syn",
"unicode-xid",
]
[[package]] [[package]]
name = "textwrap" name = "textwrap"
version = "0.11.0" version = "0.11.0"
@ -961,6 +1231,15 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "thread_local"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5516c27b78311c50bf42c071425c560ac799b11c30b31f87e3081965fe5e0180"
dependencies = [
"once_cell",
]
[[package]] [[package]]
name = "tinytemplate" name = "tinytemplate"
version = "1.2.1" version = "1.2.1"
@ -1038,6 +1317,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa31669fa42c09c34d94d8165dd2012e8ff3c66aca50f3bb226b68f216f2706c" checksum = "aa31669fa42c09c34d94d8165dd2012e8ff3c66aca50f3bb226b68f216f2706c"
dependencies = [ dependencies = [
"lazy_static", "lazy_static",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6923477a48e41c1951f1999ef8bb5a3023eb723ceadafe78ffb65dc366761e3"
dependencies = [
"lazy_static",
"log",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e0ab7bdc962035a87fba73f3acca9b8a8d0034c2e6f60b84aeaaddddc155dce"
dependencies = [
"ansi_term",
"sharded-slab",
"smallvec",
"thread_local",
"tracing-core",
"tracing-log",
] ]
[[package]] [[package]]
@ -1049,6 +1354,12 @@ dependencies = [
"multi-trait-object", "multi-trait-object",
] ]
[[package]]
name = "typenum"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987"
[[package]] [[package]]
name = "unicode-width" name = "unicode-width"
version = "0.1.9" version = "0.1.9"
@ -1061,12 +1372,34 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
[[package]]
name = "universal-hash"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f214e8f697e925001e66ec2c6e37a4ef93f0f78c2eed7814394e10c62025b05"
dependencies = [
"generic-array",
"subtle",
]
[[package]]
name = "valuable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]] [[package]]
name = "vcell" name = "vcell"
version = "0.1.3" version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77439c1b53d2303b20d9459b1ade71a83c716e3f9c34f3228c00e6f185d6c002" checksum = "77439c1b53d2303b20d9459b1ade71a83c716e3f9c34f3228c00e6f185d6c002"
[[package]]
name = "version_check"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]] [[package]]
name = "void" name = "void"
version = "1.0.2" version = "1.0.2"
@ -1093,6 +1426,18 @@ dependencies = [
"winapi-util", "winapi-util",
] ]
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
[[package]]
name = "wasi"
version = "0.10.2+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
[[package]] [[package]]
name = "wasi" name = "wasi"
version = "0.11.0+wasi-snapshot-preview1" version = "0.11.0+wasi-snapshot-preview1"
@ -1193,3 +1538,35 @@ name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0" version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "x25519-dalek"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2392b6b94a576b4e2bf3c5b2757d63f10ada8020a2e4d08ac849ebcf6ea8e077"
dependencies = [
"curve25519-dalek",
"rand_core 0.5.1",
"zeroize",
]
[[package]]
name = "zeroize"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4756f7db3f7b5574938c3eb1c117038b8e07f95ee6718c0efad4ac21508f1efd"
dependencies = [
"zeroize_derive",
]
[[package]]
name = "zeroize_derive"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f8f187641dad4f680d25c4bfc4225b418165984179f26ca76ec4fb6441d3a17"
dependencies = [
"proc-macro2",
"quote",
"syn",
"synstructure",
]

@ -1,6 +1,6 @@
[package] [package]
name = "bromine" name = "bromine"
version = "0.19.0" version = "0.20.0"
authors = ["trivernis <trivernis@protonmail.com>"] authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018" edition = "2018"
readme = "README.md" readme = "README.md"
@ -31,6 +31,12 @@ trait-bound-typemap = "0.3.3"
rmp-serde = { version = "1.0.0", optional = true } rmp-serde = { version = "1.0.0", optional = true }
bincode = { version = "1.3.3", optional = true } bincode = { version = "1.3.3", optional = true }
serde_json = { version = "1.0.79", optional = true } serde_json = { version = "1.0.79", optional = true }
bytes = "1.1.0"
chacha20poly1305 = "0.9.0"
x25519-dalek = "1.2.0"
rand = "0.8.5"
rand_core = "0.6.3"
sha2 = "0.10.2"
[dependencies.serde] [dependencies.serde]
optional = true optional = true
@ -50,6 +56,8 @@ features = ["alloc"]
rmp-serde = "1.0.0" rmp-serde = "1.0.0"
crossbeam-utils = "0.8.7" crossbeam-utils = "0.8.7"
futures = "0.3.21" futures = "0.3.21"
tracing-subscriber = "0.3.9"
port_check = "0.1.5"
[dev-dependencies.serde] [dev-dependencies.serde]
version = "1.0.136" version = "1.0.136"

@ -1,3 +1,4 @@
use bytes::Bytes;
use criterion::{black_box, BenchmarkId, Throughput}; use criterion::{black_box, BenchmarkId, Throughput};
use criterion::{criterion_group, criterion_main}; use criterion::{criterion_group, criterion_main};
use criterion::{BatchSize, Criterion}; use criterion::{BatchSize, Criterion};
@ -9,17 +10,21 @@ use tokio::runtime::Runtime;
pub const EVENT_NAME: &str = "bench_event"; pub const EVENT_NAME: &str = "bench_event";
fn create_event_bytes_reader(data_size: usize) -> Cursor<Vec<u8>> { fn create_event_bytes_reader(data_size: usize) -> Cursor<Vec<u8>> {
let bytes = Event::initiator(None, EVENT_NAME.to_string(), vec![0u8; data_size]) let bytes = Event::initiator(
.into_bytes() None,
.unwrap(); EVENT_NAME.to_string(),
Cursor::new(bytes) Bytes::from(vec![0u8; data_size]),
)
.into_bytes()
.unwrap();
Cursor::new(bytes.to_vec())
} }
fn event_deserialization(c: &mut Criterion) { fn event_deserialization(c: &mut Criterion) {
let runtime = Runtime::new().unwrap(); let runtime = Runtime::new().unwrap();
let mut group = c.benchmark_group("event_deserialization"); let mut group = c.benchmark_group("event_deserialization");
for size in (0..10) for size in (0..16)
.step_by(2) .step_by(2)
.map(|i| 1024 * 2u32.pow(i as u32) as usize) .map(|i| 1024 * 2u32.pow(i as u32) as usize)
{ {

@ -1,4 +1,5 @@
use bromine::event::Event; use bromine::event::Event;
use bytes::Bytes;
use criterion::{ use criterion::{
black_box, criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput, black_box, criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput,
}; };
@ -6,13 +7,17 @@ use criterion::{
pub const EVENT_NAME: &str = "bench_event"; pub const EVENT_NAME: &str = "bench_event";
fn create_event(data_size: usize) -> Event { fn create_event(data_size: usize) -> Event {
Event::initiator(None, EVENT_NAME.to_string(), vec![0u8; data_size]) Event::initiator(
None,
EVENT_NAME.to_string(),
Bytes::from(vec![0u8; data_size]),
)
} }
fn event_serialization(c: &mut Criterion) { fn event_serialization(c: &mut Criterion) {
let mut group = c.benchmark_group("event_serialization"); let mut group = c.benchmark_group("event_serialization");
for size in (0..10) for size in (0..16)
.step_by(2) .step_by(2)
.map(|i| 1024 * 2u32.pow(i as u32) as usize) .map(|i| 1024 * 2u32.pow(i as u32) as usize)
{ {

@ -3,6 +3,7 @@ use crate::error::Result;
use crate::payload::{FromPayload, IntoPayload}; use crate::payload::{FromPayload, IntoPayload};
use crate::prelude::{IPCError, IPCResult}; use crate::prelude::{IPCError, IPCResult};
use byteorder::{BigEndian, ReadBytesExt}; use byteorder::{BigEndian, ReadBytesExt};
use bytes::{BufMut, Bytes, BytesMut};
use std::error::Error; use std::error::Error;
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};
use std::io::Read; use std::io::Read;
@ -29,14 +30,13 @@ impl Display for ErrorEventData {
} }
impl IntoPayload for ErrorEventData { impl IntoPayload for ErrorEventData {
fn into_payload(self, _: &Context) -> IPCResult<Vec<u8>> { fn into_payload(self, _: &Context) -> IPCResult<Bytes> {
let mut buf = Vec::new(); let mut buf = BytesMut::new();
buf.append(&mut self.code.to_be_bytes().to_vec()); buf.put_u16(self.code);
let message_len = self.message.len() as u32; buf.put_u32(self.message.len() as u32);
buf.append(&mut message_len.to_be_bytes().to_vec()); buf.put(Bytes::from(self.message));
buf.append(&mut self.message.into_bytes());
Ok(buf.freeze())
Ok(buf)
} }
} }

@ -2,6 +2,7 @@ use crate::error::{Error, Result};
use crate::events::generate_event_id; use crate::events::generate_event_id;
use crate::events::payload::FromPayload; use crate::events::payload::FromPayload;
use byteorder::{BigEndian, ReadBytesExt}; use byteorder::{BigEndian, ReadBytesExt};
use bytes::{BufMut, Bytes, BytesMut};
use num_enum::{IntoPrimitive, TryFromPrimitive}; use num_enum::{IntoPrimitive, TryFromPrimitive};
use std::convert::TryFrom; use std::convert::TryFrom;
use std::fmt::Debug; use std::fmt::Debug;
@ -16,7 +17,7 @@ pub const FORMAT_VERSION: [u8; 3] = [0, 9, 0];
#[derive(Debug)] #[derive(Debug)]
pub struct Event { pub struct Event {
header: EventHeader, header: EventHeader,
data: Vec<u8>, data: Bytes,
} }
#[derive(Debug)] #[derive(Debug)]
@ -41,21 +42,21 @@ impl Event {
/// Creates a new event that acts as an initiator for further response events /// Creates a new event that acts as an initiator for further response events
#[tracing::instrument(level = "trace", skip(data))] #[tracing::instrument(level = "trace", skip(data))]
#[inline] #[inline]
pub fn initiator(namespace: Option<String>, name: String, data: Vec<u8>) -> Self { pub fn initiator(namespace: Option<String>, name: String, data: Bytes) -> Self {
Self::new(namespace, name, data, None, EventType::Initiator) Self::new(namespace, name, data, None, EventType::Initiator)
} }
/// Creates a new event that is a response to a previous event /// Creates a new event that is a response to a previous event
#[tracing::instrument(level = "trace", skip(data))] #[tracing::instrument(level = "trace", skip(data))]
#[inline] #[inline]
pub fn response(namespace: Option<String>, name: String, data: Vec<u8>, ref_id: u64) -> Self { pub fn response(namespace: Option<String>, name: String, data: Bytes, ref_id: u64) -> Self {
Self::new(namespace, name, data, Some(ref_id), EventType::Response) Self::new(namespace, name, data, Some(ref_id), EventType::Response)
} }
/// Creates a new error event as a response to a previous event /// Creates a new error event as a response to a previous event
#[tracing::instrument(level = "trace", skip(data))] #[tracing::instrument(level = "trace", skip(data))]
#[inline] #[inline]
pub fn error(namespace: Option<String>, name: String, data: Vec<u8>, ref_id: u64) -> Self { pub fn error(namespace: Option<String>, name: String, data: Bytes, ref_id: u64) -> Self {
Self::new(namespace, name, data, Some(ref_id), EventType::Error) Self::new(namespace, name, data, Some(ref_id), EventType::Error)
} }
@ -63,7 +64,7 @@ impl Event {
/// and might contain a final response payload /// and might contain a final response payload
#[tracing::instrument(level = "trace", skip(data))] #[tracing::instrument(level = "trace", skip(data))]
#[inline] #[inline]
pub fn end(namespace: Option<String>, name: String, data: Vec<u8>, ref_id: u64) -> Self { pub fn end(namespace: Option<String>, name: String, data: Bytes, ref_id: u64) -> Self {
Self::new(namespace, name, data, Some(ref_id), EventType::Response) Self::new(namespace, name, data, Some(ref_id), EventType::Response)
} }
@ -72,7 +73,7 @@ impl Event {
pub(crate) fn new( pub(crate) fn new(
namespace: Option<String>, namespace: Option<String>,
name: String, name: String,
data: Vec<u8>, data: Bytes,
ref_id: Option<u64>, ref_id: Option<u64>,
event_type: EventType, event_type: EventType,
) -> Self { ) -> Self {
@ -145,57 +146,59 @@ impl Event {
// additional header fields can be added a the end because when reading they will just be ignored // additional header fields can be added a the end because when reading they will just be ignored
let header: EventHeader = EventHeader::from_read(&mut Cursor::new(header_bytes))?; let header: EventHeader = EventHeader::from_read(&mut Cursor::new(header_bytes))?;
let mut data = vec![0u8; data_length as usize]; let mut buf = vec![0u8; data_length as usize];
reader.read_exact(&mut data).await?; reader.read_exact(&mut buf).await?;
let event = Event { header, data }; let event = Event {
header,
data: Bytes::from(buf),
};
Ok(event) Ok(event)
} }
/// Encodes the event into bytes /// Encodes the event into bytes
#[tracing::instrument(level = "trace", skip(self))] #[tracing::instrument(level = "trace", skip(self))]
pub fn into_bytes(mut self) -> Result<Vec<u8>> { pub fn into_bytes(self) -> Result<Bytes> {
let mut header_bytes = self.header.into_bytes(); let header_bytes = self.header.into_bytes();
let header_length = header_bytes.len() as u16; let header_length = header_bytes.len() as u16;
let data_length = self.data.len(); let data_length = self.data.len();
let total_length = header_length as u64 + data_length as u64; let total_length = header_length as u64 + data_length as u64;
tracing::trace!(total_length, header_length, data_length); tracing::trace!(total_length, header_length, data_length);
let mut buf = Vec::with_capacity(total_length as usize); let mut buf = BytesMut::with_capacity(total_length as usize);
buf.append(&mut total_length.to_be_bytes().to_vec()); buf.put_u64(total_length);
buf.append(&mut header_length.to_be_bytes().to_vec()); buf.put_u16(header_length);
buf.append(&mut header_bytes); buf.put(header_bytes);
buf.append(&mut self.data); buf.put(self.data);
Ok(buf) Ok(buf.freeze())
} }
} }
impl EventHeader { impl EventHeader {
/// Serializes the event header into bytes /// Serializes the event header into bytes
pub fn into_bytes(self) -> Vec<u8> { pub fn into_bytes(self) -> Bytes {
let mut buf = FORMAT_VERSION.to_vec(); let mut buf = BytesMut::with_capacity(256);
buf.append(&mut self.id.to_be_bytes().to_vec()); buf.put_slice(&FORMAT_VERSION);
buf.push(self.event_type.into()); buf.put_u64(self.id);
buf.put_u8(u8::from(self.event_type));
if let Some(ref_id) = self.ref_id { if let Some(ref_id) = self.ref_id {
buf.push(0xFF); buf.put_u8(0xFF);
buf.append(&mut ref_id.to_be_bytes().to_vec()); buf.put_u64(ref_id);
} else { } else {
buf.push(0x00); buf.put_u8(0x00);
} }
if let Some(namespace) = self.namespace { if let Some(namespace) = self.namespace {
let namespace_len = namespace.len() as u16; buf.put_u16(namespace.len() as u16);
buf.append(&mut namespace_len.to_be_bytes().to_vec()); buf.put(Bytes::from(namespace));
buf.append(&mut namespace.into_bytes());
} else { } else {
buf.append(&mut 0u16.to_be_bytes().to_vec()); buf.put_u16(0);
} }
let name_len = self.name.len() as u16; buf.put_u16(self.name.len() as u16);
buf.append(&mut name_len.to_be_bytes().to_vec()); buf.put(Bytes::from(self.name));
buf.append(&mut self.name.into_bytes());
buf buf.freeze()
} }
/// Parses an event header from an async reader /// Parses an event header from an async reader

@ -2,13 +2,14 @@ use crate::error::Result;
use crate::events::event::Event; use crate::events::event::Event;
use crate::ipc::context::Context; use crate::ipc::context::Context;
use crate::payload::{BytePayload, IntoPayload}; use crate::payload::{BytePayload, IntoPayload};
use bytes::Bytes;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::{Debug, Formatter}; use std::fmt::{Debug, Formatter};
use std::future::Future; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
pub struct Response(Vec<u8>); pub struct Response(Bytes);
impl Response { impl Response {
/// Creates a new response with a given payload /// Creates a new response with a given payload
@ -20,11 +21,11 @@ impl Response {
/// Creates an empty response /// Creates an empty response
pub fn empty() -> Self { pub fn empty() -> Self {
Self(vec![]) Self(Bytes::new())
} }
pub(crate) fn into_byte_payload(self) -> BytePayload { pub(crate) fn into_byte_payload(self) -> BytePayload {
BytePayload::new(self.0) BytePayload::from(self.0)
} }
} }

@ -1,5 +1,6 @@
use crate::prelude::IPCResult; use crate::prelude::IPCResult;
use byteorder::{BigEndian, ReadBytesExt}; use byteorder::{BigEndian, ReadBytesExt};
use bytes::{BufMut, Bytes, BytesMut};
use std::io::Read; use std::io::Read;
#[cfg(feature = "serialize")] #[cfg(feature = "serialize")]
@ -7,18 +8,13 @@ pub use super::payload_serializer::*;
/// Trait that serializes a type into bytes and can fail /// Trait that serializes a type into bytes and can fail
pub trait TryIntoBytes { pub trait TryIntoBytes {
fn try_into_bytes(self) -> IPCResult<Vec<u8>>; fn try_into_bytes(self) -> IPCResult<Bytes>;
}
/// Trait that serializes a type into bytes and never fails
pub trait IntoBytes {
fn into_bytes(self) -> Vec<u8>;
} }
/// Trait to convert event data into sending bytes /// Trait to convert event data into sending bytes
/// It is implemented for all types that implement Serialize /// It is implemented for all types that implement Serialize
pub trait IntoPayload { pub trait IntoPayload {
fn into_payload(self, ctx: &Context) -> IPCResult<Vec<u8>>; fn into_payload(self, ctx: &Context) -> IPCResult<Bytes>;
} }
/// Trait to get the event data from receiving bytes. /// Trait to get the event data from receiving bytes.
@ -31,25 +27,39 @@ pub trait FromPayload: Sized {
/// serializing them /// serializing them
#[derive(Clone)] #[derive(Clone)]
pub struct BytePayload { pub struct BytePayload {
bytes: Vec<u8>, bytes: Bytes,
} }
impl BytePayload { impl BytePayload {
#[inline] #[inline]
pub fn new(bytes: Vec<u8>) -> Self { pub fn new(bytes: Vec<u8>) -> Self {
Self { bytes } Self {
bytes: Bytes::from(bytes),
}
} }
/// Returns the bytes of the payload /// Returns the bytes as a `Vec<u8>` of the payload
#[inline] #[inline]
pub fn into_inner(self) -> Vec<u8> { pub fn into_inner(self) -> Vec<u8> {
self.bytes.to_vec()
}
/// Returns the bytes struct of the payload
#[inline]
pub fn into_bytes(self) -> Bytes {
self.bytes self.bytes
} }
} }
impl From<Bytes> for BytePayload {
fn from(bytes: Bytes) -> Self {
Self { bytes }
}
}
impl IntoPayload for BytePayload { impl IntoPayload for BytePayload {
#[inline] #[inline]
fn into_payload(self, _: &Context) -> IPCResult<Vec<u8>> { fn into_payload(self, _: &Context) -> IPCResult<Bytes> {
Ok(self.bytes) Ok(self.bytes)
} }
} }
@ -87,20 +97,18 @@ impl<P1, P2> TandemPayload<P1, P2> {
} }
impl<P1: IntoPayload, P2: IntoPayload> IntoPayload for TandemPayload<P1, P2> { impl<P1: IntoPayload, P2: IntoPayload> IntoPayload for TandemPayload<P1, P2> {
fn into_payload(self, ctx: &Context) -> IPCResult<Vec<u8>> { fn into_payload(self, ctx: &Context) -> IPCResult<Bytes> {
let mut p1_bytes = self.load1.into_payload(&ctx)?; let p1_bytes = self.load1.into_payload(&ctx)?;
let mut p2_bytes = self.load2.into_payload(&ctx)?; let p2_bytes = self.load2.into_payload(&ctx)?;
let mut p1_length_bytes = (p1_bytes.len() as u64).to_be_bytes().to_vec(); let mut bytes = BytesMut::with_capacity(p1_bytes.len() + p2_bytes.len() + 16);
let mut p2_length_bytes = (p2_bytes.len() as u64).to_be_bytes().to_vec();
let mut bytes = Vec::new(); bytes.put_u64(p1_bytes.len() as u64);
bytes.append(&mut p1_length_bytes); bytes.put(p1_bytes);
bytes.append(&mut p1_bytes); bytes.put_u64(p2_bytes.len() as u64);
bytes.append(&mut p2_length_bytes); bytes.put(p2_bytes);
bytes.append(&mut p2_bytes);
Ok(bytes) Ok(bytes.freeze())
} }
} }
@ -123,8 +131,8 @@ impl<P1: FromPayload, P2: FromPayload> FromPayload for TandemPayload<P1, P2> {
#[cfg(not(feature = "serialize"))] #[cfg(not(feature = "serialize"))]
impl IntoPayload for () { impl IntoPayload for () {
fn into_payload(self, _: &Context) -> IPCResult<Vec<u8>> { fn into_payload(self, _: &Context) -> IPCResult<Bytes> {
Ok(vec![]) Ok(Bytes::new())
} }
} }
@ -135,6 +143,7 @@ mod serde_payload {
use crate::payload::{FromPayload, TryIntoBytes}; use crate::payload::{FromPayload, TryIntoBytes};
use crate::prelude::{IPCResult, IntoPayload}; use crate::prelude::{IPCResult, IntoPayload};
use byteorder::ReadBytesExt; use byteorder::ReadBytesExt;
use bytes::{BufMut, Bytes, BytesMut};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
use std::io::Read; use std::io::Read;
@ -168,20 +177,20 @@ mod serde_payload {
} }
impl<T: Serialize> TryIntoBytes for SerdePayload<T> { impl<T: Serialize> TryIntoBytes for SerdePayload<T> {
fn try_into_bytes(self) -> IPCResult<Vec<u8>> { fn try_into_bytes(self) -> IPCResult<Bytes> {
let mut buf = Vec::new(); let mut buf = BytesMut::new();
let mut data_bytes = self.serializer.serialize(self.data)?; let data_bytes = self.serializer.serialize(self.data)?;
let format_id = self.serializer as u8; let format_id = self.serializer as u8;
buf.push(format_id); buf.put_u8(format_id);
buf.append(&mut data_bytes); buf.put(data_bytes);
Ok(buf) Ok(buf.freeze())
} }
} }
impl<T: Serialize> IntoPayload for SerdePayload<T> { impl<T: Serialize> IntoPayload for SerdePayload<T> {
#[inline] #[inline]
fn into_payload(self, _: &Context) -> IPCResult<Vec<u8>> { fn into_payload(self, _: &Context) -> IPCResult<Bytes> {
self.try_into_bytes() self.try_into_bytes()
} }
} }
@ -198,7 +207,7 @@ mod serde_payload {
impl<T: Serialize> IntoPayload for T { impl<T: Serialize> IntoPayload for T {
#[inline] #[inline]
fn into_payload(self, ctx: &Context) -> IPCResult<Vec<u8>> { fn into_payload(self, ctx: &Context) -> IPCResult<Bytes> {
ctx.create_serde_payload(self).into_payload(&ctx) ctx.create_serde_payload(self).into_payload(&ctx)
} }
} }

@ -1,3 +1,4 @@
use bytes::Bytes;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
use std::io::Read; use std::io::Read;
@ -49,7 +50,7 @@ pub enum SerializationError {
UnknownFormat(usize), UnknownFormat(usize),
} }
#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)] #[derive(Clone, Copy, Debug, Ord, PartialOrd, Eq, PartialEq)]
pub enum DynamicSerializer { pub enum DynamicSerializer {
Messagepack, Messagepack,
Bincode, Bincode,
@ -109,7 +110,7 @@ impl DynamicSerializer {
} }
} }
pub fn serialize<T: Serialize>(&self, data: T) -> SerializationResult<Vec<u8>> { pub fn serialize<T: Serialize>(&self, data: T) -> SerializationResult<Bytes> {
match self { match self {
#[cfg(feature = "serialize_rmp")] #[cfg(feature = "serialize_rmp")]
DynamicSerializer::Messagepack => serialize_rmp::serialize(data), DynamicSerializer::Messagepack => serialize_rmp::serialize(data),

@ -1,13 +1,14 @@
use crate::payload::SerializationResult; use crate::payload::SerializationResult;
use bytes::Bytes;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
use std::io::Read; use std::io::Read;
#[inline] #[inline]
pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Vec<u8>> { pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Bytes> {
let bytes = bincode::serialize(&data)?; let bytes = bincode::serialize(&data)?;
Ok(bytes) Ok(Bytes::from(bytes))
} }
#[inline] #[inline]

@ -1,13 +1,14 @@
use crate::payload::SerializationResult; use crate::payload::SerializationResult;
use bytes::Bytes;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
use std::io::Read; use std::io::Read;
#[inline] #[inline]
pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Vec<u8>> { pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Bytes> {
let bytes = serde_json::to_vec(&data)?; let bytes = serde_json::to_vec(&data)?;
Ok(bytes) Ok(Bytes::from(bytes))
} }
#[inline] #[inline]

@ -1,13 +1,14 @@
use crate::payload::SerializationResult; use crate::payload::SerializationResult;
use bytes::Bytes;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
use std::io::Read; use std::io::Read;
#[inline] #[inline]
pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Vec<u8>> { pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Bytes> {
let bytes = postcard::to_allocvec(&data)?.to_vec(); let bytes = postcard::to_allocvec(&data)?.to_vec();
Ok(bytes) Ok(Bytes::from(bytes))
} }
#[inline] #[inline]

@ -1,13 +1,14 @@
use crate::payload::SerializationResult; use crate::payload::SerializationResult;
use bytes::Bytes;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::Serialize; use serde::Serialize;
use std::io::Read; use std::io::Read;
#[inline] #[inline]
pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Vec<u8>> { pub fn serialize<T: Serialize>(data: T) -> SerializationResult<Bytes> {
let bytes = rmp_serde::to_vec(&data)?; let bytes = rmp_serde::to_vec(&data)?;
Ok(bytes) Ok(Bytes::from(bytes))
} }
#[inline] #[inline]

@ -11,6 +11,7 @@ use crate::namespaces::builder::NamespaceBuilder;
use crate::namespaces::namespace::Namespace; use crate::namespaces::namespace::Namespace;
#[cfg(feature = "serialize")] #[cfg(feature = "serialize")]
use crate::payload::DynamicSerializer; use crate::payload::DynamicSerializer;
use crate::prelude::AsyncProtocolStream;
use crate::protocol::AsyncStreamProtocolListener; use crate::protocol::AsyncStreamProtocolListener;
use std::collections::HashMap; use std::collections::HashMap;
use std::future::Future; use std::future::Future;
@ -64,6 +65,8 @@ pub struct IPCBuilder<L: AsyncStreamProtocolListener> {
timeout: Duration, timeout: Duration,
#[cfg(feature = "serialize")] #[cfg(feature = "serialize")]
default_serializer: DynamicSerializer, default_serializer: DynamicSerializer,
listener_options: L::ListenerOptions,
stream_options: <L::Stream as AsyncProtocolStream>::StreamOptions,
} }
impl<L> IPCBuilder<L> impl<L> IPCBuilder<L>
@ -89,6 +92,8 @@ where
timeout: Duration::from_secs(60), timeout: Duration::from_secs(60),
#[cfg(feature = "serialize")] #[cfg(feature = "serialize")]
default_serializer: DynamicSerializer::first_available(), default_serializer: DynamicSerializer::first_available(),
listener_options: Default::default(),
stream_options: Default::default(),
} }
} }
@ -163,6 +168,23 @@ where
self self
} }
/// Sets the options for the given protocol listener
pub fn server_options(mut self, options: L::ListenerOptions) -> Self {
self.listener_options = options;
self
}
/// Sets the options for the given protocol stream
pub fn client_options(
mut self,
options: <L::Stream as AsyncProtocolStream>::StreamOptions,
) -> Self {
self.stream_options = options;
self
}
/// Builds an ipc server /// Builds an ipc server
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub async fn build_server(self) -> Result<()> { pub async fn build_server(self) -> Result<()> {
@ -176,7 +198,9 @@ where
#[cfg(feature = "serialize")] #[cfg(feature = "serialize")]
default_serializer: self.default_serializer, default_serializer: self.default_serializer,
}; };
server.start::<L>(self.address.unwrap()).await?; server
.start::<L>(self.address.unwrap(), self.listener_options)
.await?;
Ok(()) Ok(())
} }
@ -198,7 +222,9 @@ where
default_serializer: self.default_serializer, default_serializer: self.default_serializer,
}; };
let ctx = client.connect::<L::Stream>(self.address.unwrap()).await?; let ctx = client
.connect::<L::Stream>(self.address.unwrap(), self.stream_options.clone())
.await?;
Ok(ctx) Ok(ctx)
} }
@ -230,7 +256,9 @@ where
default_serializer: self.default_serializer.clone(), default_serializer: self.default_serializer.clone(),
}; };
let ctx = client.connect::<L::Stream>(address.clone()).await?; let ctx = client
.connect::<L::Stream>(address.clone(), self.stream_options.clone())
.await?;
contexts.push(ctx); contexts.push(ctx);
} }

@ -33,12 +33,13 @@ pub struct IPCClient {
impl IPCClient { impl IPCClient {
/// Connects to a given address and returns an emitter for events to that address. /// Connects to a given address and returns an emitter for events to that address.
/// Invoked by [IPCBuilder::build_client](crate::builder::IPCBuilder::build_client) /// Invoked by [IPCBuilder::build_client](crate::builder::IPCBuilder::build_client)
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self, options))]
pub async fn connect<S: AsyncProtocolStream + 'static>( pub async fn connect<S: AsyncProtocolStream + 'static>(
self, self,
address: S::AddressType, address: S::AddressType,
options: S::StreamOptions,
) -> Result<Context> { ) -> Result<Context> {
let stream = S::protocol_connect(address).await?; let stream = S::protocol_connect(address, options).await?;
let (read_half, write_half) = stream.protocol_into_split(); let (read_half, write_half) = stream.protocol_into_split();
let emitter = StreamEmitter::new::<S>(write_half); let emitter = StreamEmitter::new::<S>(write_half);

@ -31,12 +31,13 @@ pub struct IPCServer {
impl IPCServer { impl IPCServer {
/// Starts the IPC Server. /// Starts the IPC Server.
/// Invoked by [IPCBuilder::build_server](crate::builder::IPCBuilder::build_server) /// Invoked by [IPCBuilder::build_server](crate::builder::IPCBuilder::build_server)
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self, options))]
pub async fn start<L: AsyncStreamProtocolListener>( pub async fn start<L: AsyncStreamProtocolListener>(
self, self,
address: L::AddressType, address: L::AddressType,
options: L::ListenerOptions,
) -> Result<()> { ) -> Result<()> {
let listener = L::protocol_bind(address.clone()).await?; let listener = L::protocol_bind(address.clone(), options).await?;
let handler = Arc::new(self.handler); let handler = Arc::new(self.handler);
let namespaces = Arc::new(self.namespaces); let namespaces = Arc::new(self.namespaces);
let data = Arc::new(RwLock::new(self.data)); let data = Arc::new(RwLock::new(self.data));

@ -91,6 +91,7 @@ impl<P: IntoPayload + Send + Sync + 'static> Future for EmitMetadata<P> {
let event_bytes = event.into_bytes()?; let event_bytes = event.into_bytes()?;
let mut stream = stream.lock().await; let mut stream = stream.lock().await;
stream.deref_mut().write_all(&event_bytes[..]).await?; stream.deref_mut().write_all(&event_bytes[..]).await?;
stream.deref_mut().flush().await?;
tracing::trace!(bytes_len = event_bytes.len()); tracing::trace!(bytes_len = event_bytes.len());
Ok(event_id) Ok(event_id)

@ -37,7 +37,7 @@ impl<P: IntoPayload> EventMetadata<P> {
let payload = self.payload.take().ok_or(Error::InvalidState)?; let payload = self.payload.take().ok_or(Error::InvalidState)?;
let res_id = self.res_id.take().ok_or(Error::InvalidState)?; let res_id = self.res_id.take().ok_or(Error::InvalidState)?;
let event_type = self.event_type.take().ok_or(Error::InvalidState)?; let event_type = self.event_type.take().ok_or(Error::InvalidState)?;
let payload_bytes = payload.into_payload(&ctx)?; let payload_bytes = payload.into_payload(&ctx)?.into();
let event = Event::new( let event = Event::new(
namespace, namespace,

@ -101,6 +101,7 @@
//! # } //! # }
//! ``` //! ```
extern crate core;
#[cfg(all( #[cfg(all(
feature = "serialize", feature = "serialize",
not(any( not(any(
@ -119,6 +120,12 @@ mod macros;
mod namespaces; mod namespaces;
pub mod protocol; pub mod protocol;
/// Reexported for usage in payload implementations
pub use bytes;
/// Reexported for sharing data in context
pub use trait_bound_typemap;
pub use events::error_event; pub use events::error_event;
pub use events::event; pub use events::event;
pub use events::event_handler; pub use events::event_handler;
@ -146,4 +153,5 @@ pub mod prelude {
pub use crate::payload::*; pub use crate::payload::*;
pub use crate::protocol::*; pub use crate::protocol::*;
pub use crate::*; pub use crate::*;
pub use trait_bound_typemap::TypeMap;
} }

@ -0,0 +1,167 @@
use crate::prelude::encrypted::EncryptedStream;
use crate::prelude::IPCResult;
use crate::protocol::AsyncProtocolStream;
use bytes::Bytes;
use chacha20poly1305::aead::{Aead, NewAead};
use chacha20poly1305::{ChaCha20Poly1305, Key, Nonce};
use rand::thread_rng;
use rand_core::RngCore;
use sha2::{Digest, Sha256};
use std::io;
use std::io::ErrorKind;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use x25519_dalek::{PublicKey, StaticSecret};
/// A structure used for encryption.
/// It holds the cipher initialized with the given key
/// and two counters for both encryption and decryption
/// count which are used to keep track of the nonce.
#[derive(Clone)]
pub(crate) struct CipherBox {
cipher: ChaCha20Poly1305,
en_count: Arc<AtomicU64>,
de_count: Arc<AtomicU64>,
}
impl CipherBox {
pub fn new(key: Bytes) -> Self {
let key = Key::from_slice(&key[..]);
let cipher = ChaCha20Poly1305::new(key);
Self {
cipher,
en_count: Arc::new(AtomicU64::new(0)),
de_count: Arc::new(AtomicU64::new(0)),
}
}
/// Encrypts the given message
#[tracing::instrument(level = "trace", skip_all)]
pub fn encrypt(&self, data: Bytes) -> io::Result<Bytes> {
self.cipher
.encrypt(&self.en_nonce(), &data[..])
.map(Bytes::from)
.map_err(|_| io::Error::from(ErrorKind::InvalidData))
}
/// Decrypts the given message
#[tracing::instrument(level = "trace", skip_all)]
pub fn decrypt(&self, data: Bytes) -> io::Result<Bytes> {
self.cipher
.decrypt(&self.de_nonce(), &data[..])
.map(Bytes::from)
.map_err(|_| io::Error::from(ErrorKind::InvalidData))
}
/// Updates the stored key.
/// This must be done simultaneously on server and client side
/// to keep track of the nonce
#[tracing::instrument(level = "trace", skip_all)]
pub fn update_key(&mut self, key: Bytes) {
let key = Key::from_slice(&key[..]);
self.cipher = ChaCha20Poly1305::new(key);
self.reset_counters();
}
/// Resets the nonce counters.
/// This must be done simultaneously on server and client side.
#[tracing::instrument(level = "trace", skip_all)]
pub fn reset_counters(&mut self) {
self.de_count.store(0, Ordering::SeqCst);
self.en_count.store(0, Ordering::SeqCst);
}
fn en_nonce(&self) -> Nonce {
let count = self.en_count.fetch_add(1, Ordering::SeqCst);
tracing::trace!("encrypted count {}", count);
nonce_from_number(count)
}
fn de_nonce(&self) -> Nonce {
let count = self.de_count.fetch_add(1, Ordering::SeqCst);
tracing::trace!("decrypted count {}", count);
nonce_from_number(count)
}
}
/// Generates a nonce from a given number
/// The given number is repeated to fit the nonce bytes
fn nonce_from_number(number: u64) -> Nonce {
let number_bytes: [u8; 8] = number.to_be_bytes();
let num_vec = number_bytes.repeat(2);
let mut nonce_bytes = [0u8; 12];
nonce_bytes.copy_from_slice(&num_vec[..12]);
nonce_bytes.into()
}
impl<T: AsyncProtocolStream> EncryptedStream<T> {
/// Does a server-client key exchange.
/// 1. The server receives the public key of the client
/// 2. The server sends its own public key
/// 3. The server creates an intermediary encrypted connection
/// 4. The server generates a new secret
/// 5. The server sends the secret to the client
/// 6. The connection is upgraded with the new shared key
pub async fn from_server_key_exchange(mut inner: T, secret: StaticSecret) -> IPCResult<Self> {
let other_pub = receive_public_key(&mut inner).await?;
send_public_key(&mut inner, &secret).await?;
let shared_secret = secret.diffie_hellman(&other_pub);
let mut stream = Self::new(inner, shared_secret);
let permanent_secret = generate_secret();
stream.write_all(&permanent_secret).await?;
stream.flush().await?;
stream.update_key(permanent_secret.into());
Ok(stream)
}
/// Does a client-server key exchange.
/// 1. The client sends its public key to the server
/// 2. The client receives the servers public key
/// 3. The client creates an intermediary encrypted connection
/// 4. The client receives the new key from the server
/// 5. The connection is upgraded with the new shared key
pub async fn from_client_key_exchange(mut inner: T, secret: StaticSecret) -> IPCResult<Self> {
send_public_key(&mut inner, &secret).await?;
let other_pub = receive_public_key(&mut inner).await?;
let shared_secret = secret.diffie_hellman(&other_pub);
let mut stream = Self::new(inner, shared_secret);
let mut key_buf = vec![0u8; 32];
stream.read_exact(&mut key_buf).await?;
stream.update_key(key_buf.into());
Ok(stream)
}
}
#[tracing::instrument(level = "debug", skip_all)]
async fn receive_public_key<T: AsyncProtocolStream>(stream: &mut T) -> IPCResult<PublicKey> {
let mut pk_buf = [0u8; 32];
stream.read_exact(&mut pk_buf).await?;
Ok(PublicKey::from(pk_buf))
}
#[tracing::instrument(level = "debug", skip_all)]
async fn send_public_key<T: AsyncProtocolStream>(
stream: &mut T,
secret: &StaticSecret,
) -> IPCResult<()> {
let own_pk = PublicKey::from(secret);
stream.write_all(own_pk.as_bytes()).await?;
stream.flush().await?;
Ok(())
}
#[tracing::instrument(level = "trace", skip_all)]
fn generate_secret() -> Vec<u8> {
let mut rng = thread_rng();
let mut buf = vec![0u8; 32];
rng.fill_bytes(&mut buf);
Sha256::digest(&buf).to_vec()
}

@ -0,0 +1,240 @@
use crate::prelude::encrypted::{
EncryptedPackage, EncryptedReadStream, EncryptedStream, EncryptedWriteStream,
};
use crate::prelude::AsyncProtocolStream;
use crate::protocol::encrypted::crypt_handling::CipherBox;
use bytes::{Buf, BufMut, Bytes};
use std::cmp::min;
use std::io;
use std::io::Error;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
const WRITE_BUF_SIZE: usize = 1024;
impl<T: AsyncProtocolStream> Unpin for EncryptedStream<T> {}
impl<T: AsyncProtocolStream> AsyncWrite for EncryptedStream<T> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, Error>> {
Pin::new(&mut self.write_half).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
Pin::new(&mut self.write_half).poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
Pin::new(&mut self.write_half).poll_shutdown(cx)
}
}
impl<T: AsyncProtocolStream> AsyncRead for EncryptedStream<T> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.read_half).poll_read(cx, buf)
}
}
impl<T: 'static + AsyncRead + Unpin + Send + Sync> Unpin for EncryptedReadStream<T> {}
impl<T: 'static + AsyncRead + Send + Sync + Unpin> AsyncRead for EncryptedReadStream<T> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
if self.fut.is_none() {
if self.remaining.len() > 0 {
let max_copy = min(buf.remaining(), self.remaining.len());
let bytes = self.remaining.copy_to_bytes(max_copy);
buf.put_slice(&bytes);
tracing::trace!("{} bytes read from buffer", bytes.len());
}
if buf.remaining() > 0 {
tracing::trace!("{} bytes remaining to read", buf.remaining());
let reader = self.inner.take().unwrap();
let cipher = self.cipher.take().unwrap();
self.fut = Some(Box::pin(async move { read_bytes(reader, cipher).await }));
} else {
return Poll::Ready(Ok(()));
}
}
match self.fut.as_mut().unwrap().as_mut().poll(cx) {
Poll::Ready((result, reader, cipher)) => {
self.inner = Some(reader);
self.cipher = Some(cipher);
match result {
Ok(bytes) => {
self.fut = None;
self.remaining.put(bytes);
let max_copy = min(self.remaining.len(), buf.remaining());
let bytes = self.remaining.copy_to_bytes(max_copy);
buf.put_slice(&bytes);
tracing::trace!("{} bytes read from buffer", bytes.len());
if buf.remaining() == 0 {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
Err(e) => Poll::Ready(Err(e)),
}
}
Poll::Pending => Poll::Pending,
}
}
}
impl<T: 'static + AsyncWrite + Unpin + Send + Sync> Unpin for EncryptedWriteStream<T> {}
impl<T: 'static + AsyncWrite + Unpin + Send + Sync> AsyncWrite for EncryptedWriteStream<T> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, Error>> {
let written_length = buf.len();
if self.fut_write.is_none() {
self.buffer.put(Bytes::from(buf.to_vec()));
if self.buffer.len() >= WRITE_BUF_SIZE {
tracing::trace!("buffer has reached sending size: {}", self.buffer.len());
let buffer_len = self.buffer.len();
let max_copy = min(u32::MAX as usize, buffer_len);
let plaintext = self.buffer.copy_to_bytes(max_copy);
let writer = self.inner.take().unwrap();
let cipher = self.cipher.take().unwrap();
self.fut_write = Some(Box::pin(write_bytes(plaintext, writer, cipher)))
} else {
return Poll::Ready(Ok(written_length));
}
}
match self.fut_write.as_mut().unwrap().as_mut().poll(cx) {
Poll::Ready((result, writer, cipher)) => {
self.inner = Some(writer);
self.cipher = Some(cipher);
self.fut_write = None;
Poll::Ready(result.map(|_| written_length))
}
Poll::Pending => Poll::Pending,
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
let buffer_len = self.buffer.len();
if self.fut_flush.is_none() {
let max_copy = min(u32::MAX as usize, buffer_len);
let plaintext = self.buffer.copy_to_bytes(max_copy);
let writer = self.inner.take().unwrap();
let cipher = self.cipher.take().unwrap();
self.fut_flush = Some(Box::pin(async move {
let (mut writer, cipher) = if plaintext.len() > 0 {
let (result, writer, cipher) = write_bytes(plaintext, writer, cipher).await;
if result.is_err() {
return (result, writer, cipher);
}
(writer, cipher)
} else {
(writer, cipher)
};
if let Err(e) = writer.flush().await {
(Err(e), writer, cipher)
} else {
(Ok(()), writer, cipher)
}
}))
}
match self.fut_flush.as_mut().unwrap().as_mut().poll(cx) {
Poll::Ready((result, writer, cipher)) => {
self.inner = Some(writer);
self.cipher = Some(cipher);
self.fut_flush = None;
Poll::Ready(result)
}
Poll::Pending => Poll::Pending,
}
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
if self.fut_shutdown.is_none() {
match self.as_mut().poll_flush(cx) {
Poll::Ready(result) => match result {
Ok(_) => {
let mut writer = self.inner.take().unwrap();
self.fut_shutdown = Some(Box::pin(async move { writer.shutdown().await }));
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
},
Poll::Pending => Poll::Pending,
}
} else {
match self.fut_shutdown.as_mut().unwrap().as_mut().poll(cx) {
Poll::Ready(result) => {
self.fut_shutdown = None;
Poll::Ready(result)
}
Poll::Pending => Poll::Pending,
}
}
}
}
#[tracing::instrument(level = "trace", skip_all)]
async fn write_bytes<T: AsyncWrite + Unpin>(
bytes: Bytes,
mut writer: T,
cipher: CipherBox,
) -> (io::Result<()>, T, CipherBox) {
tracing::trace!("plaintext size: {}", bytes.len());
let encrypted_bytes = match cipher.encrypt(bytes) {
Ok(b) => b,
Err(e) => {
return (Err(e), writer, cipher);
}
};
let package_bytes = EncryptedPackage::new(encrypted_bytes).into_bytes();
tracing::trace!("encrypted size: {}", package_bytes.len());
if let Err(e) = writer.write_all(&package_bytes[..]).await {
return (Err(e), writer, cipher);
}
tracing::trace!("everything sent");
(Ok(()), writer, cipher)
}
#[tracing::instrument(level = "trace", skip_all)]
async fn read_bytes<T: AsyncRead + Unpin>(
mut reader: T,
cipher: CipherBox,
) -> (io::Result<Bytes>, T, CipherBox) {
let package = match EncryptedPackage::from_async_read(&mut reader).await {
Ok(p) => p,
Err(e) => {
return (Err(e), reader, cipher);
}
};
tracing::trace!("received {} bytes", package.bytes.len());
match cipher.decrypt(package.into_inner()) {
Ok(bytes) => (Ok(bytes), reader, cipher),
Err(e) => (Err(e), reader, cipher),
}
}

@ -0,0 +1,152 @@
mod crypt_handling;
mod io_impl;
mod protocol_impl;
use bytes::{BufMut, Bytes, BytesMut};
pub use io_impl::*;
pub use protocol_impl::*;
use rand_core::RngCore;
use std::future::Future;
use std::io;
use std::pin::Pin;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite};
use x25519_dalek::{SharedSecret, StaticSecret};
use crate::prelude::encrypted::crypt_handling::CipherBox;
use crate::prelude::{AsyncProtocolStream, AsyncStreamProtocolListener};
pub type OptionalFuture<T> = Option<Pin<Box<dyn Future<Output = T> + Send + Sync>>>;
#[derive(Clone)]
pub struct EncryptionOptions<T: Clone + Default> {
pub inner_options: T,
pub secret: StaticSecret,
}
impl<T: Clone + Default> Default for EncryptionOptions<T> {
fn default() -> Self {
let mut rng = rand::thread_rng();
let mut secret = [0u8; 32];
rng.fill_bytes(&mut secret);
Self {
secret: StaticSecret::from(secret),
inner_options: T::default(),
}
}
}
pub struct EncryptedListener<T: AsyncStreamProtocolListener> {
inner: T,
secret: StaticSecret,
}
impl<T: AsyncStreamProtocolListener> EncryptedListener<T> {
pub fn new(inner: T, secret: StaticSecret) -> Self {
Self { inner, secret }
}
}
pub struct EncryptedStream<T: AsyncProtocolStream> {
read_half: EncryptedReadStream<T::OwnedSplitReadHalf>,
write_half: EncryptedWriteStream<T::OwnedSplitWriteHalf>,
}
impl<T: AsyncProtocolStream> EncryptedStream<T> {
pub fn new(inner: T, secret: SharedSecret) -> Self {
let cipher_box = CipherBox::new(Bytes::from(secret.to_bytes().to_vec()));
let (read, write) = inner.protocol_into_split();
let read_half = EncryptedReadStream::new(read, cipher_box.clone());
let write_half = EncryptedWriteStream::new(write, cipher_box);
Self {
read_half,
write_half,
}
}
pub fn update_key(&mut self, key: Bytes) {
self.write_half
.cipher
.as_mut()
.unwrap()
.update_key(key.clone());
self.read_half
.cipher
.as_mut()
.unwrap()
.update_key(key.clone());
}
}
pub struct EncryptedReadStream<T: AsyncRead> {
inner: Option<T>,
fut: OptionalFuture<(io::Result<Bytes>, T, CipherBox)>,
remaining: BytesMut,
cipher: Option<CipherBox>,
}
impl<T: 'static + AsyncRead + Unpin + Send + Sync> EncryptedReadStream<T> {
pub(crate) fn new(inner: T, cipher: CipherBox) -> Self {
Self {
inner: Some(inner),
fut: None,
remaining: BytesMut::new(),
cipher: Some(cipher),
}
}
}
pub struct EncryptedWriteStream<T: 'static + AsyncWrite + Unpin + Send + Sync> {
inner: Option<T>,
cipher: Option<CipherBox>,
buffer: BytesMut,
fut_write: OptionalFuture<(io::Result<()>, T, CipherBox)>,
fut_flush: OptionalFuture<(io::Result<()>, T, CipherBox)>,
fut_shutdown: OptionalFuture<io::Result<()>>,
}
impl<T: 'static + AsyncWrite + Unpin + Send + Sync> EncryptedWriteStream<T> {
pub(crate) fn new(inner: T, cipher: CipherBox) -> Self {
Self {
inner: Some(inner),
cipher: Some(cipher),
buffer: BytesMut::with_capacity(1024),
fut_write: None,
fut_flush: None,
fut_shutdown: None,
}
}
}
pub(crate) struct EncryptedPackage {
bytes: Bytes,
}
impl EncryptedPackage {
pub fn new(bytes: Bytes) -> Self {
Self { bytes }
}
pub fn into_bytes(self) -> Bytes {
let mut buf = BytesMut::with_capacity(4 + self.bytes.len());
buf.put_u32(self.bytes.len() as u32);
buf.put(self.bytes);
buf.freeze()
}
pub async fn from_async_read<R: AsyncRead + Unpin>(reader: &mut R) -> io::Result<Self> {
let length = reader.read_u32().await?;
let mut bytes_buf = vec![0u8; length as usize];
reader.read_exact(&mut bytes_buf).await?;
Ok(Self {
bytes: Bytes::from(bytes_buf),
})
}
pub fn into_inner(self) -> Bytes {
self.bytes
}
}

@ -0,0 +1,55 @@
use crate::error::Result;
use crate::prelude::encrypted::{EncryptedReadStream, EncryptedWriteStream};
use crate::prelude::{AsyncProtocolStreamSplit, IPCResult};
use crate::protocol::encrypted::{EncryptedListener, EncryptedStream, EncryptionOptions};
use crate::protocol::{AsyncProtocolStream, AsyncStreamProtocolListener};
use async_trait::async_trait;
#[async_trait]
impl<T: AsyncStreamProtocolListener> AsyncStreamProtocolListener for EncryptedListener<T> {
type AddressType = T::AddressType;
type RemoteAddressType = T::RemoteAddressType;
type Stream = EncryptedStream<T::Stream>;
type ListenerOptions = EncryptionOptions<T::ListenerOptions>;
async fn protocol_bind(
address: Self::AddressType,
options: Self::ListenerOptions,
) -> IPCResult<Self> {
let inner = T::protocol_bind(address, options.inner_options).await?;
Ok(EncryptedListener::new(inner, options.secret))
}
async fn protocol_accept(&self) -> IPCResult<(Self::Stream, Self::RemoteAddressType)> {
let (inner_stream, remote_addr) = self.inner.protocol_accept().await?;
let stream =
Self::Stream::from_server_key_exchange(inner_stream, self.secret.clone()).await?;
Ok((stream, remote_addr))
}
}
#[async_trait]
impl<T: AsyncProtocolStream> AsyncProtocolStream for EncryptedStream<T> {
type AddressType = T::AddressType;
type StreamOptions = EncryptionOptions<T::StreamOptions>;
async fn protocol_connect(
address: Self::AddressType,
options: Self::StreamOptions,
) -> Result<Self> {
let inner = T::protocol_connect(address, options.inner_options).await?;
EncryptedStream::from_client_key_exchange(inner, options.secret).await
}
}
#[async_trait]
impl<T: AsyncProtocolStream> AsyncProtocolStreamSplit for EncryptedStream<T> {
type OwnedSplitReadHalf = EncryptedReadStream<T::OwnedSplitReadHalf>;
type OwnedSplitWriteHalf = EncryptedWriteStream<T::OwnedSplitWriteHalf>;
fn protocol_into_split(self) -> (Self::OwnedSplitReadHalf, Self::OwnedSplitWriteHalf) {
(self.read_half, self.write_half)
}
}

@ -1,5 +1,6 @@
pub mod tcp; pub mod tcp;
pub mod encrypted;
#[cfg(unix)] #[cfg(unix)]
pub mod unix_socket; pub mod unix_socket;
@ -12,25 +13,33 @@ use tokio::io::{AsyncRead, AsyncWrite};
pub trait AsyncStreamProtocolListener: Sized + Send + Sync { pub trait AsyncStreamProtocolListener: Sized + Send + Sync {
type AddressType: Clone + Debug + Send + Sync; type AddressType: Clone + Debug + Send + Sync;
type RemoteAddressType: Debug + Send + Sync; type RemoteAddressType: Debug + Send + Sync;
type Stream: 'static + AsyncProtocolStream<AddressType = Self::AddressType> + Send + Sync; type Stream: 'static + AsyncProtocolStream<AddressType = Self::AddressType>;
type ListenerOptions: Clone + Default + Send + Sync;
async fn protocol_bind(address: Self::AddressType) -> IPCResult<Self>; async fn protocol_bind(
address: Self::AddressType,
options: Self::ListenerOptions,
) -> IPCResult<Self>;
async fn protocol_accept(&self) -> IPCResult<(Self::Stream, Self::RemoteAddressType)>; async fn protocol_accept(&self) -> IPCResult<(Self::Stream, Self::RemoteAddressType)>;
} }
pub trait AsyncProtocolStreamSplit { pub trait AsyncProtocolStreamSplit {
type OwnedSplitReadHalf: AsyncRead + Send + Sync + Unpin; type OwnedSplitReadHalf: 'static + AsyncRead + Send + Sync + Unpin;
type OwnedSplitWriteHalf: AsyncWrite + Send + Sync + Unpin; type OwnedSplitWriteHalf: 'static + AsyncWrite + Send + Sync + Unpin;
fn protocol_into_split(self) -> (Self::OwnedSplitReadHalf, Self::OwnedSplitWriteHalf); fn protocol_into_split(self) -> (Self::OwnedSplitReadHalf, Self::OwnedSplitWriteHalf);
} }
#[async_trait] #[async_trait]
pub trait AsyncProtocolStream: pub trait AsyncProtocolStream:
AsyncRead + AsyncWrite + Send + Sync + AsyncProtocolStreamSplit + Sized AsyncRead + AsyncWrite + Send + Sync + AsyncProtocolStreamSplit + Sized + Unpin
{ {
type AddressType: Clone + Debug + Send + Sync; type AddressType: Clone + Debug + Send + Sync;
type StreamOptions: Clone + Default + Send + Sync;
async fn protocol_connect(address: Self::AddressType) -> IPCResult<Self>; async fn protocol_connect(
address: Self::AddressType,
options: Self::StreamOptions,
) -> IPCResult<Self>;
} }

@ -10,8 +10,12 @@ impl AsyncStreamProtocolListener for TcpListener {
type AddressType = SocketAddr; type AddressType = SocketAddr;
type RemoteAddressType = SocketAddr; type RemoteAddressType = SocketAddr;
type Stream = TcpStream; type Stream = TcpStream;
type ListenerOptions = ();
async fn protocol_bind(address: Self::AddressType) -> IPCResult<Self> { async fn protocol_bind(
address: Self::AddressType,
_: Self::ListenerOptions,
) -> IPCResult<Self> {
let listener = TcpListener::bind(address).await?; let listener = TcpListener::bind(address).await?;
Ok(listener) Ok(listener)
@ -36,8 +40,12 @@ impl AsyncProtocolStreamSplit for TcpStream {
#[async_trait] #[async_trait]
impl AsyncProtocolStream for TcpStream { impl AsyncProtocolStream for TcpStream {
type AddressType = SocketAddr; type AddressType = SocketAddr;
type StreamOptions = ();
async fn protocol_connect(address: Self::AddressType) -> IPCResult<Self> { async fn protocol_connect(
address: Self::AddressType,
_: Self::StreamOptions,
) -> IPCResult<Self> {
let stream = TcpStream::connect(address).await?; let stream = TcpStream::connect(address).await?;
Ok(stream) Ok(stream)

@ -13,8 +13,9 @@ impl AsyncStreamProtocolListener for UnixListener {
type AddressType = PathBuf; type AddressType = PathBuf;
type RemoteAddressType = SocketAddr; type RemoteAddressType = SocketAddr;
type Stream = UnixStream; type Stream = UnixStream;
type ListenerOptions = ();
async fn protocol_bind(address: Self::AddressType) -> Result<Self> { async fn protocol_bind(address: Self::AddressType, _: Self::ListenerOptions) -> Result<Self> {
let listener = UnixListener::bind(address)?; let listener = UnixListener::bind(address)?;
Ok(listener) Ok(listener)
@ -39,8 +40,12 @@ impl AsyncProtocolStreamSplit for UnixStream {
#[async_trait] #[async_trait]
impl AsyncProtocolStream for UnixStream { impl AsyncProtocolStream for UnixStream {
type AddressType = PathBuf; type AddressType = PathBuf;
type StreamOptions = ();
async fn protocol_connect(address: Self::AddressType) -> IPCResult<Self> { async fn protocol_connect(
address: Self::AddressType,
_: Self::StreamOptions,
) -> IPCResult<Self> {
let stream = UnixStream::connect(address).await?; let stream = UnixStream::connect(address).await?;
stream stream
.ready(Interest::READABLE | Interest::WRITABLE) .ready(Interest::READABLE | Interest::WRITABLE)

@ -0,0 +1,111 @@
use crate::utils::call_counter::increment_counter_for_event;
use crate::utils::protocol::TestProtocolListener;
use crate::utils::{get_free_port, start_server_and_client};
use bromine::prelude::encrypted::EncryptedListener;
use bromine::prelude::*;
use bromine::IPCBuilder;
use byteorder::{BigEndian, ReadBytesExt};
use bytes::{BufMut, Bytes, BytesMut};
use futures::StreamExt;
use rand_core::RngCore;
use std::io::Read;
use std::time::Duration;
mod utils;
#[tokio::test]
async fn it_sends_and_receives_smaller_packages() {
send_and_receive_bytes(140).await.unwrap();
}
#[tokio::test]
async fn it_sends_and_receives_larger_packages() {
send_and_receive_bytes(1024 * 32).await.unwrap();
}
#[tokio::test]
async fn it_sends_and_receives_strings() {
let ctx = get_client_with_server().await;
let response = ctx
.emit("string", StringPayload(String::from("Hello World")))
.await_reply()
.await
.unwrap();
let response_string = response.payload::<StringPayload>().unwrap().0;
assert_eq!(&response_string, "Hello World")
}
async fn send_and_receive_bytes(byte_size: usize) -> IPCResult<()> {
let ctx = get_client_with_server().await;
let mut rng = rand::thread_rng();
let mut buffer = vec![0u8; byte_size];
rng.fill_bytes(&mut buffer);
let mut stream = ctx
.emit("bytes", BytePayload::new(buffer.clone()))
.stream_replies()
.await?;
let mut count = 0;
while let Some(response) = stream.next().await {
let bytes = response.unwrap().payload::<BytePayload>()?;
assert_eq!(bytes.into_inner(), buffer);
count += 1;
}
assert_eq!(count, 100);
Ok(())
}
async fn get_client_with_server() -> Context {
let port = get_free_port();
start_server_and_client(move || get_builder(port)).await
}
fn get_builder(port: u8) -> IPCBuilder<EncryptedListener<TestProtocolListener>> {
IPCBuilder::new()
.address(port)
.on("bytes", callback!(handle_bytes))
.on("string", callback!(handle_string))
.timeout(Duration::from_secs(10))
}
async fn handle_bytes(ctx: &Context, event: Event) -> IPCResult<Response> {
increment_counter_for_event(ctx, &event).await;
let bytes = event.payload::<BytePayload>()?.into_inner();
for _ in 0u8..99 {
ctx.emit("bytes", BytePayload::new(bytes.clone())).await?;
}
ctx.response(BytePayload::new(bytes))
}
async fn handle_string(ctx: &Context, event: Event) -> IPCResult<Response> {
ctx.response(event.payload::<StringPayload>()?)
}
pub struct StringPayload(String);
impl IntoPayload for StringPayload {
fn into_payload(self, _: &Context) -> IPCResult<Bytes> {
let mut buf = BytesMut::with_capacity(self.0.len() + 4);
buf.put_u32(self.0.len() as u32);
buf.put(Bytes::from(self.0));
Ok(buf.freeze())
}
}
impl FromPayload for StringPayload {
fn from_payload<R: Read>(mut reader: R) -> IPCResult<Self> {
let len = reader.read_u32::<BigEndian>()?;
let mut buf = vec![0u8; len as usize];
reader.read_exact(&mut buf)?;
let string = String::from_utf8(buf).map_err(|_| IPCError::from("not a string"))?;
Ok(StringPayload(string))
}
}

@ -3,6 +3,7 @@ use crate::utils::protocol::TestProtocolListener;
use crate::utils::{get_free_port, start_server_and_client}; use crate::utils::{get_free_port, start_server_and_client};
use bromine::prelude::*; use bromine::prelude::*;
use byteorder::ReadBytesExt; use byteorder::ReadBytesExt;
use bytes::Bytes;
use futures::StreamExt; use futures::StreamExt;
use std::io::Read; use std::io::Read;
use std::time::Duration; use std::time::Duration;
@ -66,16 +67,16 @@ async fn handle_stream_event(ctx: &Context, event: Event) -> IPCResult<Response>
pub struct EmptyPayload; pub struct EmptyPayload;
impl IntoPayload for EmptyPayload { impl IntoPayload for EmptyPayload {
fn into_payload(self, _: &Context) -> IPCResult<Vec<u8>> { fn into_payload(self, _: &Context) -> IPCResult<Bytes> {
Ok(vec![]) Ok(Bytes::new())
} }
} }
pub struct NumberPayload(u8); pub struct NumberPayload(u8);
impl IntoPayload for NumberPayload { impl IntoPayload for NumberPayload {
fn into_payload(self, _: &Context) -> IPCResult<Vec<u8>> { fn into_payload(self, _: &Context) -> IPCResult<Bytes> {
Ok(vec![self.0]) Ok(Bytes::from(vec![self.0]))
} }
} }

@ -91,6 +91,7 @@ mod payload_impl {
use bromine::payload::{FromPayload, IntoPayload}; use bromine::payload::{FromPayload, IntoPayload};
use bromine::prelude::IPCResult; use bromine::prelude::IPCResult;
use byteorder::{BigEndian, ReadBytesExt}; use byteorder::{BigEndian, ReadBytesExt};
use bytes::{BufMut, Bytes, BytesMut};
use std::io::Read; use std::io::Read;
pub struct SimplePayload { pub struct SimplePayload {
@ -99,17 +100,13 @@ mod payload_impl {
} }
impl IntoPayload for SimplePayload { impl IntoPayload for SimplePayload {
fn into_payload(self, _: &Context) -> IPCResult<Vec<u8>> { fn into_payload(self, _: &Context) -> IPCResult<Bytes> {
let mut buf = Vec::new(); let mut buf = BytesMut::new();
let string_length = self.string.len() as u16; buf.put_u16(self.string.len() as u16);
let string_length_bytes = string_length.to_be_bytes(); buf.put(Bytes::from(self.string));
buf.append(&mut string_length_bytes.to_vec()); buf.put_u32(self.number);
let mut string_bytes = self.string.into_bytes();
buf.append(&mut string_bytes); Ok(buf.freeze())
let num_bytes = self.number.to_be_bytes();
buf.append(&mut num_bytes.to_vec());
Ok(buf)
} }
} }

@ -2,6 +2,7 @@ mod utils;
use crate::utils::start_server_and_client; use crate::utils::start_server_and_client;
use bromine::prelude::*; use bromine::prelude::*;
use bytes::Bytes;
use std::time::Duration; use std::time::Duration;
use utils::call_counter::*; use utils::call_counter::*;
use utils::get_free_port; use utils::get_free_port;
@ -132,7 +133,7 @@ async fn handle_error_event(ctx: &Context, event: Event) -> IPCResult<Response>
pub struct EmptyPayload; pub struct EmptyPayload;
impl IntoPayload for EmptyPayload { impl IntoPayload for EmptyPayload {
fn into_payload(self, _: &Context) -> IPCResult<Vec<u8>> { fn into_payload(self, _: &Context) -> IPCResult<Bytes> {
Ok(vec![]) Ok(Bytes::new())
} }
} }

@ -1,3 +1,4 @@
#![allow(unused)]
use bromine::context::Context; use bromine::context::Context;
use bromine::event::Event; use bromine::event::Event;
use std::collections::HashMap; use std::collections::HashMap;

@ -1,20 +1,32 @@
#![allow(unused)]
use bromine::context::Context; use bromine::context::Context;
use bromine::protocol::AsyncStreamProtocolListener; use bromine::protocol::AsyncStreamProtocolListener;
use bromine::IPCBuilder; use bromine::IPCBuilder;
use call_counter::*; use call_counter::*;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::oneshot::channel; use tokio::sync::oneshot::channel;
pub mod call_counter; pub mod call_counter;
pub mod protocol; pub mod protocol;
pub fn setup() {
lazy_static! {
static ref SETUP_DONE: Arc<AtomicBool> = Default::default();
}
if !SETUP_DONE.swap(true, Ordering::SeqCst) {
tracing_subscriber::fmt::init();
}
}
pub fn get_free_port() -> u8 { pub fn get_free_port() -> u8 {
lazy_static! { lazy_static! {
static ref PORT_COUNTER: Arc<AtomicU8> = Arc::new(AtomicU8::new(0)); static ref PORT_COUNTER: Arc<AtomicU8> = Arc::new(AtomicU8::new(0));
} }
PORT_COUNTER.fetch_add(1, Ordering::Relaxed) let count = PORT_COUNTER.fetch_add(1, Ordering::Relaxed);
count
} }
pub async fn start_server_and_client< pub async fn start_server_and_client<
@ -23,6 +35,7 @@ pub async fn start_server_and_client<
>( >(
builder_fn: F, builder_fn: F,
) -> Context { ) -> Context {
setup();
let counters = CallCounter::default(); let counters = CallCounter::default();
let (sender, receiver) = channel::<()>(); let (sender, receiver) = channel::<()>();
let client_builder = builder_fn().insert::<CallCounterKey>(counters.clone()); let client_builder = builder_fn().insert::<CallCounterKey>(counters.clone());

@ -62,8 +62,9 @@ impl AsyncStreamProtocolListener for TestProtocolListener {
type AddressType = u8; type AddressType = u8;
type RemoteAddressType = u8; type RemoteAddressType = u8;
type Stream = TestProtocolStream; type Stream = TestProtocolStream;
type ListenerOptions = ();
async fn protocol_bind(address: Self::AddressType) -> Result<Self> { async fn protocol_bind(address: Self::AddressType, _: Self::ListenerOptions) -> Result<Self> {
let (sender, receiver) = channel(1); let (sender, receiver) = channel(1);
add_port(address, sender).await; add_port(address, sender).await;
@ -104,7 +105,7 @@ pub struct TestProtocolStream {
impl TestProtocolStream { impl TestProtocolStream {
/// Read from the receiver and remaining buffer /// Read from the receiver and remaining buffer
async fn read_from_receiver( async fn read_from_receiver(
buf: &mut ReadBuf<'static>, buf: &mut ReadBuf<'_>,
receiver: Arc<Mutex<Receiver<Vec<u8>>>>, receiver: Arc<Mutex<Receiver<Vec<u8>>>>,
remaining_buf: Arc<Mutex<Vec<u8>>>, remaining_buf: Arc<Mutex<Vec<u8>>>,
) { ) {
@ -133,7 +134,7 @@ impl TestProtocolStream {
/// Read from the remaining buffer returning a boolean if the /// Read from the remaining buffer returning a boolean if the
/// read buffer has been filled /// read buffer has been filled
async fn read_from_remaining_buffer( async fn read_from_remaining_buffer(
buf: &mut ReadBuf<'static>, buf: &mut ReadBuf<'_>,
remaining_buf: &mut Vec<u8>, remaining_buf: &mut Vec<u8>,
) -> bool { ) -> bool {
if remaining_buf.len() < buf.capacity() { if remaining_buf.len() < buf.capacity() {
@ -170,8 +171,9 @@ impl AsyncProtocolStreamSplit for TestProtocolStream {
#[async_trait] #[async_trait]
impl AsyncProtocolStream for TestProtocolStream { impl AsyncProtocolStream for TestProtocolStream {
type AddressType = u8; type AddressType = u8;
type StreamOptions = ();
async fn protocol_connect(address: Self::AddressType) -> Result<Self> { async fn protocol_connect(address: Self::AddressType, _: Self::StreamOptions) -> Result<Self> {
get_port(address) get_port(address)
.await .await
.ok_or_else(|| IPCError::from("Failed to connect")) .ok_or_else(|| IPCError::from("Failed to connect"))
@ -180,70 +182,66 @@ impl AsyncProtocolStream for TestProtocolStream {
impl AsyncRead for TestProtocolStream { impl AsyncRead for TestProtocolStream {
fn poll_read( fn poll_read(
self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>, buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> { ) -> Poll<std::io::Result<()>> {
unsafe { if self.future.is_none() {
// we need a mutable reference to access the inner future // we need to change the lifetime to be able to use the read buffer in the read future
let stream = self.get_unchecked_mut(); let buf: &mut ReadBuf<'static> = unsafe {
// SAFETY: idk tbh
mem::transmute(buf)
};
let receiver = Arc::clone(&self.receiver);
let remaining_buf = Arc::clone(&self.remaining_buf);
if stream.future.is_none() { let future = TestProtocolStream::read_from_receiver(buf, receiver, remaining_buf);
// we need to change the lifetime to be able to use the read buffer in the read future self.future = Some(Box::pin(future));
let buf: &mut ReadBuf<'static> = mem::transmute(buf); }
let receiver = Arc::clone(&stream.receiver); if let Some(future) = &mut self.future {
let remaining_buf = Arc::clone(&stream.remaining_buf); match future.as_mut().poll(cx) {
Poll::Ready(_) => {
let future = TestProtocolStream::read_from_receiver(buf, receiver, remaining_buf); self.future = None;
stream.future = Some(Box::pin(future)); Poll::Ready(Ok(()))
}
if let Some(future) = &mut stream.future {
match future.as_mut().poll(cx) {
Poll::Ready(_) => {
stream.future = None;
Poll::Ready(Ok(()))
}
Poll::Pending => Poll::Pending,
} }
} else { Poll::Pending => Poll::Pending,
Poll::Pending
} }
} else {
Poll::Pending
} }
} }
} }
impl Unpin for TestProtocolStream {}
impl AsyncWrite for TestProtocolStream { impl AsyncWrite for TestProtocolStream {
fn poll_write( fn poll_write(
self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &[u8], buf: &[u8],
) -> Poll<std::prelude::rust_2015::Result<usize, Error>> { ) -> Poll<std::prelude::rust_2015::Result<usize, Error>> {
let write_len = buf.len(); let write_len = buf.len();
unsafe {
// we need a mutable reference to access the inner future
let stream = self.get_unchecked_mut();
if stream.future.is_none() { if self.future.is_none() {
// we take ownership here so that we don't need to change lifetimes here // we take ownership here so that we don't need to change lifetimes here
let buf = buf.to_vec(); let buf = buf.to_vec();
let sender = stream.sender.clone(); let sender = self.sender.clone();
let future = async move { let future = async move {
sender.send(buf).await.unwrap(); sender.send(buf).await.unwrap();
}; };
stream.future = Some(Box::pin(future)); self.future = Some(Box::pin(future));
} }
if let Some(future) = &mut stream.future { if let Some(future) = &mut self.future {
match future.as_mut().poll(cx) { match future.as_mut().poll(cx) {
Poll::Ready(_) => { Poll::Ready(_) => {
stream.future = None; self.future = None;
Poll::Ready(Ok(write_len)) Poll::Ready(Ok(write_len))
}
Poll::Pending => Poll::Pending,
} }
} else { Poll::Pending => Poll::Pending,
Poll::Pending
} }
} else {
Poll::Pending
} }
} }

Loading…
Cancel
Save