Merge branch 'main' of github.com:Trivernis/bromine

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/24/head
trivernis 3 years ago
commit f6a0bd7d7c
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

@ -0,0 +1,45 @@
name: Run the benchmarks of the crate
on:
push:
branches: [ main, develop ]
env:
CARGO_TERM_COLOR: always
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Cache build data
uses: actions/cache@v2
with:
path: |
target
~/.cargo/
key: ${{ runner.os }}-cargo
restore-keys: |
${{ runner.os }}-cargo
- name: Extract branch name
shell: bash
run: echo "##[set-output name=branch;]$(echo ${GITHUB_REF#refs/heads/})"
id: extract_branch
- name: Run benchmark
run: cargo bench -- --save-baseline ${{steps.extract_branch.outputs.branch}}
- name: upload artifact
uses: actions/upload-artifact@v2
with:
name: benchmark-results
path: target/criterion/
- name: deploy to github pages
uses: JamesIves/github-pages-deploy-action@4.1.5
with:
branch: gh-pages
folder: target/criterion/

@ -10,10 +10,13 @@ env:
CARGO_TERM_COLOR: always
jobs:
build:
runs-on: ubuntu-latest
build:
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2

626
Cargo.lock generated

@ -2,25 +2,75 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "async-trait"
version = "0.1.51"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44318e776df68115a881de9a8fd1b9e53368d7a4a5ce4cc48517da3393233a5e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi",
"libc",
"winapi",
]
[[package]]
name = "autocfg"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "bitflags"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bromine"
version = "0.5.0"
version = "0.11.0"
dependencies = [
"async-trait",
"byteorder",
"criterion",
"futures",
"lazy_static",
"log",
"rmp-serde",
"serde",
"thiserror",
"tokio",
"tracing",
"typemap_rev",
]
[[package]]
name = "bstr"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223"
dependencies = [
"lazy_static",
"memchr",
"regex-automata",
"serde",
]
[[package]]
name = "bumpalo"
version = "3.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f1e260c3a9040a7c19a12468758f4c16f31a81a1fe087482be9570ec864bb6c"
[[package]]
name = "byteorder"
version = "1.4.3"
@ -33,12 +83,237 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
[[package]]
name = "cast"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c24dab4283a142afa2fdca129b80ad2c6284e073930f964c3a1293c225ee39a"
dependencies = [
"rustc_version",
]
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "clap"
version = "2.34.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0610544180c38b88101fecf2dd634b174a62eef6946f84dfc6a7127512b381c"
dependencies = [
"bitflags",
"textwrap",
"unicode-width",
]
[[package]]
name = "criterion"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1604dafd25fba2fe2d5895a9da139f8dc9b319a5fe5354ca137cbbce4e178d10"
dependencies = [
"atty",
"cast",
"clap",
"criterion-plot",
"csv",
"futures",
"itertools",
"lazy_static",
"num-traits",
"oorandom",
"plotters",
"rayon",
"regex",
"serde",
"serde_cbor",
"serde_derive",
"serde_json",
"tinytemplate",
"tokio",
"walkdir",
]
[[package]]
name = "criterion-plot"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d00996de9f2f7559f7f4dc286073197f83e92256a59ed395f9aac01fe717da57"
dependencies = [
"cast",
"itertools",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6455c0ca19f0d2fbf751b908d5c55c1f5cbc65e03c4225427254b46890bdde1e"
dependencies = [
"cfg-if",
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd"
dependencies = [
"cfg-if",
"crossbeam-utils",
"lazy_static",
"memoffset",
"scopeguard",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db"
dependencies = [
"cfg-if",
"lazy_static",
]
[[package]]
name = "csv"
version = "1.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1"
dependencies = [
"bstr",
"csv-core",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "csv-core"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90"
dependencies = [
"memchr",
]
[[package]]
name = "either"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
[[package]]
name = "futures"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cd0210d8c325c245ff06fd95a3b13689a1a276ac8cfa8e8720cb840bfb84b9e"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fc8cd39e3dbf865f7340dce6a2d401d24fd37c6fe6c4f0ee0de8bfca2252d27"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "629316e42fe7c2a0b9a65b47d159ceaa5453ab14e8f0a3c5eedbb8cd55b4a445"
[[package]]
name = "futures-executor"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b808bf53348a36cab739d7e04755909b9fcaaa69b7d7e588b37b6ec62704c97"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e481354db6b5c353246ccf6a728b0c5511d752c08da7260546fc0933869daa11"
[[package]]
name = "futures-macro"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a89f17b21645bc4ed773c69af9c9a0effd4a3f1a3876eadd453469f8854e7fdd"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "996c6442437b62d21a32cd9906f9c41e7dc1e19a9579843fad948696769305af"
[[package]]
name = "futures-task"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dabf1872aaab32c886832f2276d2f5399887e2bd613698a02359e4ea83f8de12"
[[package]]
name = "futures-util"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41d22213122356472061ac0f1ab2cee28d2bac8491410fd68c2af53d1cedb83e"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]]
name = "half"
version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7"
[[package]]
name = "hermit-abi"
version = "0.1.19"
@ -48,6 +323,30 @@ dependencies = [
"libc",
]
[[package]]
name = "itertools"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69ddb889f9d0d08a67338271fa9b62996bc788c7796a5c18cf057420aaed5eaf"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4"
[[package]]
name = "js-sys"
version = "0.3.55"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cc9ffccd38c451a86bf13657df244e9c3f37493cce8e5e21e940963777acc84"
dependencies = [
"wasm-bindgen",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
@ -56,9 +355,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.103"
version = "0.2.108"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd8f7255a17a627354f321ef0055d63b898c6fb27eff628af4d1b66b7331edf6"
checksum = "8521a1b57e76b1ec69af7599e75e38e7b7fad6610f037db8c79b127201b5d119"
[[package]]
name = "log"
@ -75,11 +374,20 @@ version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a"
[[package]]
name = "memoffset"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59accc507f1338036a0477ef61afdae33cde60840f4dfe481319ce3ad116ddf9"
dependencies = [
"autocfg",
]
[[package]]
name = "mio"
version = "0.7.13"
version = "0.7.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c2bdb6314ec10835cd3293dd268473a835c02b7b352e788be788b3c6ca6bb16"
checksum = "8067b404fe97c70829f082dec8bcf4f71225d7eaea1d8645349cb76fa06205cc"
dependencies = [
"libc",
"log",
@ -125,17 +433,57 @@ dependencies = [
"libc",
]
[[package]]
name = "oorandom"
version = "11.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
[[package]]
name = "pin-project-lite"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d31d11c69a6b52a174b42bdc0c30e5e11670f90788b2c471c31c1d17d449443"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "plotters"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a3fd9ec30b9749ce28cd91f255d569591cdf937fe280c312143e3c4bad6f2a"
dependencies = [
"num-traits",
"plotters-backend",
"plotters-svg",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "plotters-backend"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d88417318da0eaf0fdcdb51a0ee6c3bed624333bff8f946733049380be67ac1c"
[[package]]
name = "plotters-svg"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "521fa9638fa597e1dc53e9412a4f9cefb01187ee1f7413076f9e6749e2885ba9"
dependencies = [
"plotters-backend",
]
[[package]]
name = "proc-macro2"
version = "1.0.29"
version = "1.0.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9f5105d4fdaab20335ca9565e106a5d9b82b6219b5ba735731124ac6711d23d"
checksum = "ba508cc11742c0dc5c1659771673afbab7a0efab23aa17e854cbab0837ed0b43"
dependencies = [
"unicode-xid",
]
@ -149,6 +497,52 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "rayon"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c06aca804d41dbc8ba42dfd964f0d01334eceb64314b9ecf7c5fad5188a06d90"
dependencies = [
"autocfg",
"crossbeam-deque",
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d78120e2c850279833f1dd3582f730c4ab53ed95aeaaaa862a2a5c71b1656d8e"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-utils",
"lazy_static",
"num_cpus",
]
[[package]]
name = "regex"
version = "1.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461"
dependencies = [
"regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
[[package]]
name = "regex-syntax"
version = "0.6.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
[[package]]
name = "rmp"
version = "0.8.10"
@ -170,6 +564,42 @@ dependencies = [
"serde",
]
[[package]]
name = "rustc_version"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
dependencies = [
"semver",
]
[[package]]
name = "ryu"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c9613b5a66ab9ba26415184cfc41156594925a9cf3a2057e57f31ff145f6568"
[[package]]
name = "same-file"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
dependencies = [
"winapi-util",
]
[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "semver"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "568a8e6258aa33c13358f81fd834adb854c6f7c9468520910a9b1e8fac068012"
[[package]]
name = "serde"
version = "1.0.130"
@ -179,6 +609,16 @@ dependencies = [
"serde_derive",
]
[[package]]
name = "serde_cbor"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bef2ebfde456fb76bbcf9f59315333decc4fda0b2b44b420243c11e0f5ec1f5"
dependencies = [
"half",
"serde",
]
[[package]]
name = "serde_derive"
version = "1.0.130"
@ -190,17 +630,43 @@ dependencies = [
"syn",
]
[[package]]
name = "serde_json"
version = "1.0.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0ffa0837f2dfa6fb90868c2b5468cad482e175f7dad97e7421951e663f2b527"
dependencies = [
"itoa",
"ryu",
"serde",
]
[[package]]
name = "slab"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5"
[[package]]
name = "syn"
version = "1.0.80"
version = "1.0.82"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d010a1623fbd906d51d650a9916aaefc05ffa0e4053ff7fe601167f3e715d194"
checksum = "8daf5dd0bb60cbd4137b1b587d2fc0ae729bc07cf01cd70b36a1ed5ade3b9d59"
dependencies = [
"proc-macro2",
"quote",
"unicode-xid",
]
[[package]]
name = "textwrap"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060"
dependencies = [
"unicode-width",
]
[[package]]
name = "thiserror"
version = "1.0.30"
@ -221,11 +687,21 @@ dependencies = [
"syn",
]
[[package]]
name = "tinytemplate"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc"
dependencies = [
"serde",
"serde_json",
]
[[package]]
name = "tokio"
version = "1.12.0"
version = "1.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2c2416fdedca8443ae44b4527de1ea633af61d8f7169ffa6e72c5b53d24efcc"
checksum = "70e992e41e0d2fb9f755b37446f20900f64446ef54874f40a60c78f021ac6144"
dependencies = [
"autocfg",
"bytes",
@ -240,27 +716,140 @@ dependencies = [
[[package]]
name = "tokio-macros"
version = "1.4.1"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "154794c8f499c2619acd19e839294703e9e32e7630ef5f46ea80d4ef0fbee5eb"
checksum = "c9efc1aba077437943f7515666aa2b882dfabfbfdf89c819ea75a8d6e9eaba5e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tracing"
version = "0.1.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "375a639232caf30edfc78e8d89b2d4c375515393e7af7e16f01cd96917fb2105"
dependencies = [
"cfg-if",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4f480b8f81512e825f337ad51e94c1eb5d3bbdf2b363dcd01e2b19a9ffe3f8e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tracing-core"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4"
dependencies = [
"lazy_static",
]
[[package]]
name = "typemap_rev"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed5b74f0a24b5454580a79abb6994393b09adf0ab8070f15827cb666255de155"
[[package]]
name = "unicode-width"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973"
[[package]]
name = "unicode-xid"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
[[package]]
name = "walkdir"
version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56"
dependencies = [
"same-file",
"winapi",
"winapi-util",
]
[[package]]
name = "wasm-bindgen"
version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "632f73e236b219150ea279196e54e610f5dbafa5d61786303d4da54f84e47fce"
dependencies = [
"cfg-if",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a317bf8f9fba2476b4b2c85ef4c4af8ff39c3c7f0cdfeed4f82c34a880aa837b"
dependencies = [
"bumpalo",
"lazy_static",
"log",
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d56146e7c495528bf6587663bea13a8eb588d39b36b679d83972e1a2dbbdacf9"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7803e0eea25835f8abdc585cd3021b3deb11543c6fe226dcd30b228857c5c5ab"
dependencies = [
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0237232789cf037d5480773fe568aac745bfe2afbc11a863e97901780a6b47cc"
[[package]]
name = "web-sys"
version = "0.3.55"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38eb105f1c59d9eaa6b5cdc92b859d85b926e82cb2e0945cd0c9259faa6fe9fb"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "winapi"
version = "0.3.9"
@ -277,6 +866,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
"winapi",
]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"

@ -1,6 +1,6 @@
[package]
name = "bromine"
version = "0.5.0"
version = "0.11.0"
authors = ["trivernis <trivernis@protonmail.com>"]
edition = "2018"
readme = "README.md"
@ -8,14 +8,26 @@ license = "Apache-2.0"
repository = "https://github.com/Trivernis/bromine/"
description = "A flexible ipc protocol (previously rmp-ipc)"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
bench=false
[[bench]]
name = "serialization_benchmark"
harness = false
[[bench]]
name = "deserialization_benchmark"
harness = false
[dependencies]
thiserror = "1.0.30"
rmp-serde = "0.15.4"
log = "0.4.14"
tracing = "0.1.29"
lazy_static = "1.4.0"
typemap_rev = "0.1.5"
byteorder = "1.4.3"
async-trait = "0.1.51"
futures = "0.3.17"
[dependencies.serde]
version = "1.0.130"
@ -25,6 +37,9 @@ features = ["serde_derive"]
version = "1.12.0"
features = ["net", "io-std", "io-util", "sync", "time"]
[dev-dependencies.criterion]
version = "0.3.5"
features = ["async_tokio", "html_reports"]
[dev-dependencies.tokio]
version = "1.12.0"

@ -12,10 +12,11 @@ Asynchronous event driven interprocess communication supporting tcp and unix dom
**Client:**
```rust
use bromine::{callback, Event, context::Context, IPCBuilder, error::Result};
use bromine::prelude::*;
use tokio::net::TcpListener;
/// Callback ping function
async fn handle_ping(ctx: &Context, event: Event) -> Result<()> {
async fn handle_ping<S: AsyncProtocolStream>(ctx: &Context<S>, event: Event) -> Result<()> {
println!("Received ping event.");
ctx.emitter.emit_response(event.id(), "pong", ()).await?;
Ok(())
@ -24,7 +25,7 @@ async fn handle_ping(ctx: &Context, event: Event) -> Result<()> {
#[tokio::main]
async fn main() {
// create the client
let ctx = IPCBuilder::new()
let ctx = IPCBuilder::<TcpListener>::new()
.address("127.0.0.1:2020")
// register callback
.on("ping", callback!(handle_ping))
@ -38,12 +39,13 @@ async fn main() {
**Server:**
```rust
use bromine::{IPCBuilder, callback};
use bromine::prelude::*;
use tokio::net::TcpListener;
// create the server
#[tokio::main]
async fn main() {
IPCBuilder::new()
IPCBuilder::<TcpListener>::new()
.address("127.0.0.1:2020")
// register callback
.on("ping", callback!(ctx, event, async move {
@ -59,12 +61,13 @@ async fn main() {
**Client:**
```rust
use bromine::IPCBuilder;
use bromine::prelude::*;
use tokio::net::TcpListener;
// create the client
#[tokio::main]
async fn main() {
let ctx = IPCBuilder::new()
let ctx = IPCBuilder::<TcpListener>::new()
.address("127.0.0.1:2020")
// register namespace
.namespace("mainspace-client")
@ -85,29 +88,32 @@ async fn main() {
**Server:**
```rust
use bromine::{IPCBuilder, EventHandler, namespace, command, Event, context::Context};
use bromine::prelude::*;
use tokio::net::TcpListener;
// create the server
pub struct MyNamespace;
impl MyNamespace {
async fn ping(_ctx: &Context, _event: Event) -> Result<()> {
println!("My namespace received a ping");
Ok(())
}
async fn ping<S: AsyncProtocolStream>(_ctx: &Context<S>, _event: Event) -> Result<()> {
println!("My namespace received a ping");
Ok(())
}
}
impl NamespaceProvider for MyNamespace {
fn name() -> String { String::from("my_namespace") }
fn register(handler: &mut EventHandler) {
handler.on("ping", callback!(Self::ping))
}
fn name() -> &'static str {"my_namespace"}
fn register<S: AsyncProtocolStream>(handler: &mut EventHandler<S>) {
events!(handler,
"ping" => Self::ping
);
}
}
#[tokio::main]
async fn main() {
IPCBuilder::new()
IPCBuilder::<TcpListener>::new()
.address("127.0.0.1:2020")
// register namespace
.namespace("mainspace-server")
@ -122,6 +128,10 @@ async fn main() {
}
```
## Benchmarks
Benchmarks are generated on each commit. They can be reviewed [here](https://trivernis.github.io/rmp-ipc/report/).
## License
Apache-2.0

@ -0,0 +1,41 @@
use criterion::{black_box, BenchmarkId, Throughput};
use criterion::{criterion_group, criterion_main};
use criterion::{BatchSize, Criterion};
use std::io::Cursor;
use rmp_ipc::event::Event;
use tokio::runtime::Runtime;
pub const EVENT_NAME: &str = "bench_event";
fn create_event_bytes_reader(data_size: usize) -> Cursor<Vec<u8>> {
let bytes = Event::new(EVENT_NAME.to_string(), vec![0u8; data_size], None)
.into_bytes()
.unwrap();
Cursor::new(bytes)
}
fn event_deserialization(c: &mut Criterion) {
let runtime = Runtime::new().unwrap();
let mut group = c.benchmark_group("event_deserialization");
for size in (0..10)
.step_by(2)
.map(|i| 1024 * 2u32.pow(i as u32) as usize)
{
group.throughput(Throughput::Bytes(size as u64));
group.bench_with_input(BenchmarkId::from_parameter(size), &size, |b, &size| {
b.to_async(&runtime).iter_batched(
|| black_box(create_event_bytes_reader(size)),
|mut reader| async move {
Event::from_async_read(&mut reader).await.unwrap();
},
BatchSize::LargeInput,
)
});
}
group.finish()
}
criterion_group!(benches, event_deserialization);
criterion_main!(benches);

@ -0,0 +1,32 @@
use criterion::{
black_box, criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput,
};
use rmp_ipc::event::Event;
pub const EVENT_NAME: &str = "bench_event";
fn create_event(data_size: usize) -> Event {
Event::new(EVENT_NAME.to_string(), vec![0u8; data_size], None)
}
fn event_serialization(c: &mut Criterion) {
let mut group = c.benchmark_group("event_serialization");
for size in (0..10)
.step_by(2)
.map(|i| 1024 * 2u32.pow(i as u32) as usize)
{
group.throughput(Throughput::Bytes(size as u64));
group.bench_with_input(BenchmarkId::from_parameter(size), &size, |b, &size| {
b.iter_batched(
|| black_box(create_event(size)),
|event| event.into_bytes().unwrap(),
BatchSize::LargeInput,
)
});
}
group.finish();
}
criterion_group!(benches, event_serialization);
criterion_main!(benches);

@ -1,3 +1,4 @@
use crate::error_event::ErrorEventData;
use thiserror::Error;
use tokio::sync::oneshot;
@ -25,6 +26,12 @@ pub enum Error {
#[error("Send Error")]
SendError,
#[error("Error response: {0}")]
ErrorEvent(#[from] ErrorEventData),
#[error("Timed out")]
Timeout,
}
impl From<String> for Error {

@ -1,4 +1,6 @@
use serde::{Deserialize, Serialize};
use std::error::Error;
use std::fmt::{Display, Formatter};
pub static ERROR_EVENT_NAME: &str = "error";
@ -11,3 +13,11 @@ pub struct ErrorEventData {
pub code: u16,
pub message: String,
}
impl Error for ErrorEventData {}
impl Display for ErrorEventData {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "IPC Code {}: '{}'", self.code, self.message)
}
}

@ -1,52 +1,72 @@
use crate::error::Result;
use crate::events::generate_event_id;
use serde::de::DeserializeOwned;
use crate::events::payload::EventReceivePayload;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use tokio::io::{AsyncRead, AsyncReadExt};
/// A container representing an event and underlying binary data.
/// The data can be decoded into an object representation or read
/// as raw binary data.
#[derive(Serialize, Deserialize)]
#[derive(Debug)]
pub struct Event {
header: EventHeader,
data: Vec<u8>,
}
#[derive(Debug, Serialize, Deserialize)]
struct EventHeader {
id: u64,
ref_id: Option<u64>,
namespace: Option<String>,
name: String,
data: Vec<u8>,
}
impl Event {
/// Creates a new event with a namespace
#[tracing::instrument(level = "trace", skip(data))]
pub fn with_namespace(
namespace: String,
name: String,
data: Vec<u8>,
ref_id: Option<u64>,
) -> Self {
Self {
let header = EventHeader {
id: generate_event_id(),
ref_id,
namespace: Some(namespace),
name,
data,
}
};
Self { header, data }
}
/// Creates a new event
#[tracing::instrument(level = "trace", skip(data))]
pub fn new(name: String, data: Vec<u8>, ref_id: Option<u64>) -> Self {
Self {
let header = EventHeader {
id: generate_event_id(),
ref_id,
namespace: None,
name,
data,
}
};
Self { header, data }
}
/// The identifier of the message
pub fn id(&self) -> u64 {
self.header.id
}
/// The ID of the message referenced by this message.
/// It represents the message that is replied to and can be None.
pub fn reference_id(&self) -> Option<u64> {
self.header.ref_id.clone()
}
/// Decodes the data to the given type
pub fn data<T: DeserializeOwned>(&self) -> Result<T> {
let data = rmp_serde::from_read(&self.data[..])?;
#[tracing::instrument(level = "trace", skip(self))]
pub fn data<T: EventReceivePayload>(&self) -> Result<T> {
let data = T::from_payload_bytes(&self.data[..])?;
Ok(data)
}
@ -58,46 +78,49 @@ impl Event {
/// Returns a reference to the namespace
pub fn namespace(&self) -> &Option<String> {
&self.namespace
&self.header.namespace
}
/// Returns the name of the event
pub fn name(&self) -> &str {
&self.name
&self.header.name
}
/// Reads an event message
#[tracing::instrument(level = "trace", skip(reader))]
pub async fn from_async_read<R: AsyncRead + Unpin>(reader: &mut R) -> Result<Self> {
let length = reader.read_u32().await?;
log::trace!("Parsing event of length {}", length);
let mut data = vec![0u8; length as usize];
let total_length = reader.read_u64().await?;
let header_length = reader.read_u16().await?;
let data_length = total_length - header_length as u64;
tracing::trace!(total_length, header_length, data_length);
let header: EventHeader = {
let mut header_bytes = vec![0u8; header_length as usize];
reader.read_exact(&mut header_bytes).await?;
rmp_serde::from_read(&header_bytes[..])?
};
let mut data = vec![0u8; data_length as usize];
reader.read_exact(&mut data).await?;
let event = rmp_serde::from_read(&data[..])?;
let event = Event { header, data };
Ok(event)
}
/// Encodes the event into bytes
pub fn to_bytes(&self) -> Result<Vec<u8>> {
let mut event_bytes = rmp_serde::to_vec(&self)?;
let mut length_bytes = (event_bytes.len() as u32).to_be_bytes().to_vec();
length_bytes.reverse();
for byte in length_bytes {
event_bytes.insert(0, byte);
}
#[tracing::instrument(level = "trace", skip(self))]
pub fn into_bytes(mut self) -> Result<Vec<u8>> {
let mut header_bytes = rmp_serde::to_vec(&self.header)?;
let header_length = header_bytes.len() as u16;
let data_length = self.data.len();
let total_length = header_length as u64 + data_length as u64;
tracing::trace!(total_length, header_length, data_length);
Ok(event_bytes)
}
let mut buf = Vec::with_capacity(total_length as usize);
buf.append(&mut total_length.to_be_bytes().to_vec());
buf.append(&mut header_length.to_be_bytes().to_vec());
buf.append(&mut header_bytes);
buf.append(&mut self.data);
/// The identifier of the message
pub fn id(&self) -> u64 {
self.id
}
/// The ID of the message referenced by this message.
/// It represents the message that is replied to and can be None.
pub fn reference_id(&self) -> Option<u64> {
self.ref_id.clone()
Ok(buf)
}
}

@ -1,24 +1,54 @@
use crate::error::Result;
use crate::events::event::Event;
use crate::ipc::context::Context;
use crate::protocol::AsyncProtocolStream;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
type EventCallback = Arc<
dyn for<'a> Fn(&'a Context, Event) -> Pin<Box<(dyn Future<Output = Result<()>> + Send + 'a)>>
type EventCallback<P> = Arc<
dyn for<'a> Fn(&'a Context<P>, Event) -> Pin<Box<(dyn Future<Output = Result<()>> + Send + 'a)>>
+ Send
+ Sync,
>;
/// Handler for events
#[derive(Clone)]
pub struct EventHandler {
callbacks: HashMap<String, EventCallback>,
pub struct EventHandler<P: AsyncProtocolStream> {
callbacks: HashMap<String, EventCallback<P>>,
}
impl EventHandler {
impl<S> Clone for EventHandler<S>
where
S: AsyncProtocolStream,
{
fn clone(&self) -> Self {
Self {
callbacks: self.callbacks.clone(),
}
}
}
impl<P> Debug for EventHandler<P>
where
P: AsyncProtocolStream,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let callback_names: String = self
.callbacks
.keys()
.cloned()
.collect::<Vec<String>>()
.join(", ");
format!("EventHandler {{callbacks: [{}]}}", callback_names).fmt(f)
}
}
impl<P> EventHandler<P>
where
P: AsyncProtocolStream,
{
/// Creates a new event handler
pub fn new() -> Self {
Self {
@ -27,10 +57,11 @@ impl EventHandler {
}
/// Adds a new event callback
#[tracing::instrument(skip(self, callback))]
pub fn on<F: 'static>(&mut self, name: &str, callback: F)
where
F: for<'a> Fn(
&'a Context,
&'a Context<P>,
Event,
) -> Pin<Box<(dyn Future<Output = Result<()>> + Send + 'a)>>
+ Send
@ -40,7 +71,8 @@ impl EventHandler {
}
/// Handles a received event
pub async fn handle_event(&self, ctx: &Context, event: Event) -> Result<()> {
#[tracing::instrument(level = "debug", skip(self, ctx, event))]
pub async fn handle_event(&self, ctx: &Context<P>, event: Event) -> Result<()> {
if let Some(cb) = self.callbacks.get(event.name()) {
cb.as_ref()(ctx, event).await?;
}

@ -5,6 +5,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
pub mod error_event;
pub mod event;
pub mod event_handler;
pub mod payload;
/// Generates a new event id
pub(crate) fn generate_event_id() -> u64 {

@ -0,0 +1,134 @@
use crate::prelude::IPCResult;
use byteorder::{BigEndian, ReadBytesExt};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::io::Read;
/// Trait to convert event data into sending bytes
/// It is implemented for all types that implement Serialize
pub trait EventSendPayload {
fn to_payload_bytes(self) -> IPCResult<Vec<u8>>;
}
impl<T> EventSendPayload for T
where
T: Serialize,
{
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
let bytes = rmp_serde::to_vec(&self)?;
Ok(bytes)
}
}
/// Trait to get the event data from receiving bytes.
/// It is implemented for all types that are DeserializeOwned
pub trait EventReceivePayload: Sized {
fn from_payload_bytes<R: Read>(reader: R) -> IPCResult<Self>;
}
impl<T> EventReceivePayload for T
where
T: DeserializeOwned,
{
fn from_payload_bytes<R: Read>(reader: R) -> IPCResult<Self> {
let type_data = rmp_serde::from_read(reader)?;
Ok(type_data)
}
}
/// A payload wrapper type for sending bytes directly without
/// serializing them
#[derive(Clone)]
pub struct BytePayload {
bytes: Vec<u8>,
}
impl BytePayload {
pub fn new(bytes: Vec<u8>) -> Self {
Self { bytes }
}
/// Returns the bytes of the payload
pub fn into_inner(self) -> Vec<u8> {
self.bytes
}
}
impl EventSendPayload for BytePayload {
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
Ok(self.bytes)
}
}
impl EventReceivePayload for BytePayload {
fn from_payload_bytes<R: Read>(mut reader: R) -> IPCResult<Self> {
let mut buf = Vec::new();
reader.read_to_end(&mut buf)?;
Ok(Self::new(buf))
}
}
/// A payload wrapper that allows storing two different payloads
/// independent from each other. For example one payload can be
/// a payload serialized by serde while the other is a raw byte
/// payload
pub struct TandemPayload<P1, P2> {
load1: P1,
load2: P2,
}
impl<P1, P2> TandemPayload<P1, P2> {
pub fn new(load1: P1, load2: P2) -> Self {
Self { load1, load2 }
}
/// Returns both payload stored in the tandem payload
pub fn into_inner(self) -> (P1, P2) {
(self.load1, self.load2)
}
}
impl<P1, P2> EventSendPayload for TandemPayload<P1, P2>
where
P1: EventSendPayload,
P2: EventSendPayload,
{
fn to_payload_bytes(self) -> IPCResult<Vec<u8>> {
let mut p1_bytes = self.load1.to_payload_bytes()?;
let mut p2_bytes = self.load2.to_payload_bytes()?;
let mut p1_length_bytes = (p1_bytes.len() as u64).to_be_bytes().to_vec();
let mut p2_length_bytes = (p2_bytes.len() as u64).to_be_bytes().to_vec();
let mut bytes = Vec::new();
bytes.append(&mut p1_length_bytes);
bytes.append(&mut p1_bytes);
bytes.append(&mut p2_length_bytes);
bytes.append(&mut p2_bytes);
Ok(bytes)
}
}
impl<P1, P2> EventReceivePayload for TandemPayload<P1, P2>
where
P1: EventReceivePayload,
P2: EventReceivePayload,
{
fn from_payload_bytes<R: Read>(mut reader: R) -> IPCResult<Self> {
let p1_length = reader.read_u64::<BigEndian>()?;
let mut load1_bytes = vec![0u8; p1_length as usize];
reader.read_exact(&mut load1_bytes)?;
let p2_length = reader.read_u64::<BigEndian>()?;
let mut load2_bytes = vec![0u8; p2_length as usize];
reader.read_exact(&mut load2_bytes)?;
Ok(Self {
load1: P1::from_payload_bytes(load1_bytes.as_slice())?,
load2: P2::from_payload_bytes(load2_bytes.as_slice())?,
})
}
}

@ -3,19 +3,25 @@ use crate::events::error_event::{ErrorEventData, ERROR_EVENT_NAME};
use crate::events::event::Event;
use crate::events::event_handler::EventHandler;
use crate::ipc::client::IPCClient;
use crate::ipc::context::Context;
use crate::ipc::context::{Context, PooledContext, ReplyListeners};
use crate::ipc::server::IPCServer;
use crate::namespaces::builder::NamespaceBuilder;
use crate::namespaces::namespace::Namespace;
use crate::protocol::AsyncStreamProtocolListener;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use typemap_rev::{TypeMap, TypeMapKey};
/// A builder for the IPC server or client.
/// ```no_run
///use typemap_rev::TypeMapKey;
/// use std::net::ToSocketAddrs;
/// use typemap_rev::TypeMapKey;
/// use bromine::IPCBuilder;
/// use tokio::net::TcpListener;
///
/// struct CustomKey;
///
@ -24,8 +30,8 @@ use typemap_rev::{TypeMap, TypeMapKey};
/// }
///
///# async fn a() {
/// IPCBuilder::new()
/// .address("127.0.0.1:2020")
/// IPCBuilder::<TcpListener>::new()
/// .address("127.0.0.1:2020".to_socket_addrs().unwrap().next().unwrap())
/// // register callback
/// .on("ping", |_ctx, _event| Box::pin(async move {
/// println!("Received ping event.");
@ -44,24 +50,25 @@ use typemap_rev::{TypeMap, TypeMapKey};
/// .build_server().await.unwrap();
///# }
/// ```
pub struct IPCBuilder {
handler: EventHandler,
address: Option<String>,
namespaces: HashMap<String, Namespace>,
pub struct IPCBuilder<L: AsyncStreamProtocolListener> {
handler: EventHandler<L::Stream>,
address: Option<L::AddressType>,
namespaces: HashMap<String, Namespace<L::Stream>>,
data: TypeMap,
timeout: Duration,
}
impl IPCBuilder {
impl<L> IPCBuilder<L>
where
L: AsyncStreamProtocolListener,
{
pub fn new() -> Self {
let mut handler = EventHandler::new();
handler.on(ERROR_EVENT_NAME, |_, event| {
Box::pin(async move {
let error_data = event.data::<ErrorEventData>()?;
log::warn!(
"Received Error Response from Server: {} - {}",
error_data.code,
error_data.message
);
tracing::warn!(error_data.code);
tracing::warn!("error_data.message = '{}'", error_data.message);
Ok(())
})
@ -71,6 +78,7 @@ impl IPCBuilder {
address: None,
namespaces: HashMap::new(),
data: TypeMap::new(),
timeout: Duration::from_secs(60),
}
}
@ -85,7 +93,7 @@ impl IPCBuilder {
pub fn on<F: 'static>(mut self, event: &str, callback: F) -> Self
where
F: for<'a> Fn(
&'a Context,
&'a Context<L::Stream>,
Event,
) -> Pin<Box<(dyn Future<Output = Result<()>> + Send + 'a)>>
+ Send
@ -97,53 +105,99 @@ impl IPCBuilder {
}
/// Adds the address to connect to
pub fn address<S: ToString>(mut self, address: S) -> Self {
self.address = Some(address.to_string());
pub fn address(mut self, address: L::AddressType) -> Self {
self.address = Some(address);
self
}
/// Adds a namespace
pub fn namespace<S: ToString>(self, name: S) -> NamespaceBuilder {
pub fn namespace<S: ToString>(self, name: S) -> NamespaceBuilder<L> {
NamespaceBuilder::new(self, name.to_string())
}
/// Adds a namespace to the ipc server
pub fn add_namespace(mut self, namespace: Namespace) -> Self {
pub fn add_namespace(mut self, namespace: Namespace<L::Stream>) -> Self {
self.namespaces
.insert(namespace.name().to_owned(), namespace);
self
}
/// Sets the timeout when listening for a response
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
/// Builds an ipc server
#[tracing::instrument(skip(self))]
pub async fn build_server(self) -> Result<()> {
self.validate()?;
let server = IPCServer {
let server = IPCServer::<L> {
namespaces: self.namespaces,
handler: self.handler,
data: self.data,
timeout: self.timeout,
};
server.start(&self.address.unwrap()).await?;
server.start(self.address.unwrap()).await?;
Ok(())
}
/// Builds an ipc client
pub async fn build_client(self) -> Result<Context> {
#[tracing::instrument(skip(self))]
pub async fn build_client(self) -> Result<Context<L::Stream>> {
self.validate()?;
let data = Arc::new(RwLock::new(self.data));
let reply_listeners = ReplyListeners::default();
let client = IPCClient {
namespaces: self.namespaces,
handler: self.handler,
data: self.data,
data,
reply_listeners,
timeout: self.timeout,
};
let ctx = client.connect(&self.address.unwrap()).await?;
let ctx = client.connect(self.address.unwrap()).await?;
Ok(ctx)
}
/// Builds a pooled IPC client
/// This causes the builder to actually create `pool_size` clients and
/// return a [crate::context::PooledContext] that allows one to [crate::context::PooledContext::acquire] a single context
/// to emit events.
#[tracing::instrument(skip(self))]
pub async fn build_pooled_client(self, pool_size: usize) -> Result<PooledContext<L::Stream>> {
if pool_size == 0 {
Error::BuildError("Pool size must be greater than 0".to_string());
}
self.validate()?;
let data = Arc::new(RwLock::new(self.data));
let mut contexts = Vec::new();
let address = self.address.unwrap();
let reply_listeners = ReplyListeners::default();
for _ in 0..pool_size {
let client = IPCClient {
namespaces: self.namespaces.clone(),
handler: self.handler.clone(),
data: Arc::clone(&data),
reply_listeners: Arc::clone(&reply_listeners),
timeout: self.timeout.clone(),
};
let ctx = client.connect(address.clone()).await?;
contexts.push(ctx);
}
Ok(PooledContext::new(contexts))
}
/// Validates that all required fields have been provided
#[tracing::instrument(skip(self))]
fn validate(&self) -> Result<()> {
if self.address.is_none() {
Err(Error::BuildError("Missing Address".to_string()))

@ -1,12 +1,13 @@
use super::handle_connection;
use crate::error::Result;
use crate::events::event_handler::EventHandler;
use crate::ipc::context::Context;
use crate::ipc::context::{Context, ReplyListeners};
use crate::ipc::stream_emitter::StreamEmitter;
use crate::namespaces::namespace::Namespace;
use crate::protocol::AsyncProtocolStream;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::net::TcpStream;
use std::time::Duration;
use tokio::sync::oneshot;
use tokio::sync::RwLock;
use typemap_rev::TypeMap;
@ -14,28 +15,36 @@ use typemap_rev::TypeMap;
/// The IPC Client to connect to an IPC Server.
/// Use the [IPCBuilder](crate::builder::IPCBuilder) to create the client.
/// Usually one does not need to use the IPCClient object directly.
pub struct IPCClient {
pub(crate) handler: EventHandler,
pub(crate) namespaces: HashMap<String, Namespace>,
pub(crate) data: TypeMap,
#[derive(Clone)]
pub struct IPCClient<S: AsyncProtocolStream> {
pub(crate) handler: EventHandler<S>,
pub(crate) namespaces: HashMap<String, Namespace<S>>,
pub(crate) data: Arc<RwLock<TypeMap>>,
pub(crate) reply_listeners: ReplyListeners,
pub(crate) timeout: Duration,
}
impl IPCClient {
impl<S> IPCClient<S>
where
S: 'static + AsyncProtocolStream,
{
/// Connects to a given address and returns an emitter for events to that address.
/// Invoked by [IPCBuilder::build_client](crate::builder::IPCBuilder::build_client)
pub async fn connect(self, address: &str) -> Result<Context> {
let stream = TcpStream::connect(address).await?;
let (read_half, write_half) = stream.into_split();
#[tracing::instrument(skip(self))]
pub async fn connect(self, address: S::AddressType) -> Result<Context<S>> {
let stream = S::protocol_connect(address).await?;
let (read_half, write_half) = stream.protocol_into_split();
let emitter = StreamEmitter::new(write_half);
let (tx, rx) = oneshot::channel();
let ctx = Context::new(
StreamEmitter::clone(&emitter),
Arc::new(RwLock::new(self.data)),
self.data,
Some(tx),
self.reply_listeners,
self.timeout,
);
let handler = Arc::new(self.handler);
let namespaces = Arc::new(self.namespaces);
log::debug!("IPC client connected to {}", address);
let handle = tokio::spawn({
let ctx = Context::clone(&ctx);

@ -1,65 +1,110 @@
use crate::error::{Error, Result};
use crate::event::Event;
use crate::ipc::stream_emitter::StreamEmitter;
use crate::protocol::AsyncProtocolStream;
use futures::future;
use futures::future::Either;
use std::collections::HashMap;
use std::mem;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::oneshot::Sender;
use tokio::sync::{oneshot, Mutex, RwLock};
use tokio::time::Duration;
use typemap_rev::TypeMap;
pub(crate) type ReplyListeners = Arc<Mutex<HashMap<u64, oneshot::Sender<Event>>>>;
/// An object provided to each callback function.
/// Currently it only holds the event emitter to emit response events in event callbacks.
/// ```rust
/// use bromine::prelude::*;
///
/// async fn my_callback(ctx: &Context, _event: Event) -> IPCResult<()> {
/// async fn my_callback<S: AsyncProtocolStream>(ctx: &Context<S>, _event: Event) -> IPCResult<()> {
/// // use the emitter on the context object to emit events
/// // inside callbacks
/// ctx.emitter.emit("ping", ()).await?;
/// Ok(())
/// }
/// ```
#[derive(Clone)]
pub struct Context {
pub struct Context<S: AsyncProtocolStream> {
/// The event emitter
pub emitter: StreamEmitter,
pub emitter: StreamEmitter<S>,
/// Field to store additional context data
pub data: Arc<RwLock<TypeMap>>,
stop_sender: Arc<Mutex<Option<Sender<()>>>>,
reply_listeners: Arc<Mutex<HashMap<u64, oneshot::Sender<Event>>>>,
reply_listeners: ReplyListeners,
reply_timeout: Duration,
}
impl<S> Clone for Context<S>
where
S: AsyncProtocolStream,
{
fn clone(&self) -> Self {
Self {
emitter: self.emitter.clone(),
data: Arc::clone(&self.data),
stop_sender: Arc::clone(&self.stop_sender),
reply_listeners: Arc::clone(&self.reply_listeners),
reply_timeout: self.reply_timeout.clone(),
}
}
}
impl Context {
impl<P> Context<P>
where
P: AsyncProtocolStream,
{
pub(crate) fn new(
emitter: StreamEmitter,
emitter: StreamEmitter<P>,
data: Arc<RwLock<TypeMap>>,
stop_sender: Option<Sender<()>>,
reply_listeners: ReplyListeners,
reply_timeout: Duration,
) -> Self {
Self {
emitter,
reply_listeners: Arc::new(Mutex::new(HashMap::new())),
reply_listeners,
data,
stop_sender: Arc::new(Mutex::new(stop_sender)),
reply_timeout,
}
}
/// Waits for a reply to the given message ID
#[tracing::instrument(level = "debug", skip(self))]
pub async fn await_reply(&self, message_id: u64) -> Result<Event> {
let (rx, tx) = oneshot::channel();
{
let mut listeners = self.reply_listeners.lock().await;
listeners.insert(message_id, rx);
}
let event = tx.await?;
let result = future::select(
Box::pin(tx),
Box::pin(tokio::time::sleep(self.reply_timeout)),
)
.await;
let event = match result {
Either::Left((tx_result, _)) => Ok(tx_result?),
Either::Right(_) => {
let mut listeners = self.reply_listeners.lock().await;
listeners.remove(&message_id);
Err(Error::Timeout)
}
}?;
Ok(event)
}
/// Stops the listener and closes the connection
#[tracing::instrument(level = "debug", skip(self))]
pub async fn stop(self) -> Result<()> {
let mut sender = self.stop_sender.lock().await;
if let Some(sender) = mem::take(&mut *sender) {
@ -75,3 +120,122 @@ impl Context {
listeners.remove(&ref_id)
}
}
pub struct PooledContext<S: AsyncProtocolStream> {
contexts: Vec<PoolGuard<Context<S>>>,
}
impl<S> Clone for PooledContext<S>
where
S: AsyncProtocolStream,
{
fn clone(&self) -> Self {
Self {
contexts: self.contexts.clone(),
}
}
}
pub struct PoolGuard<T>
where
T: Clone,
{
inner: T,
count: Arc<AtomicUsize>,
}
impl<T> Deref for PoolGuard<T>
where
T: Clone,
{
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T> DerefMut for PoolGuard<T>
where
T: Clone,
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl<T> Clone for PoolGuard<T>
where
T: Clone,
{
fn clone(&self) -> Self {
self.acquire();
Self {
inner: self.inner.clone(),
count: Arc::clone(&self.count),
}
}
}
impl<T> Drop for PoolGuard<T>
where
T: Clone,
{
fn drop(&mut self) {
self.release();
}
}
impl<T> PoolGuard<T>
where
T: Clone,
{
pub(crate) fn new(inner: T) -> Self {
Self {
inner,
count: Arc::new(AtomicUsize::new(0)),
}
}
/// Acquires the context by adding 1 to the count
#[tracing::instrument(level = "trace", skip_all)]
pub(crate) fn acquire(&self) {
let count = self.count.fetch_add(1, Ordering::Relaxed);
tracing::trace!(count);
}
/// Releases the connection by subtracting from the stored count
#[tracing::instrument(level = "trace", skip_all)]
pub(crate) fn release(&self) {
let count = self.count.fetch_sub(1, Ordering::Relaxed);
tracing::trace!(count);
}
pub(crate) fn count(&self) -> usize {
self.count.load(Ordering::Relaxed)
}
}
impl<P> PooledContext<P>
where
P: AsyncProtocolStream,
{
/// Creates a new pooled context from a list of contexts
pub(crate) fn new(contexts: Vec<Context<P>>) -> Self {
Self {
contexts: contexts.into_iter().map(PoolGuard::new).collect(),
}
}
/// Acquires a context from the pool
/// It always chooses the one that is used the least
#[tracing::instrument(level = "trace", skip_all)]
pub fn acquire(&self) -> PoolGuard<Context<P>> {
self.contexts
.iter()
.min_by_key(|c| c.count())
.unwrap()
.clone()
}
}

@ -2,9 +2,9 @@ use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME};
use crate::events::event_handler::EventHandler;
use crate::namespaces::namespace::Namespace;
use crate::prelude::*;
use crate::protocol::AsyncProtocolStream;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::net::tcp::OwnedReadHalf;
pub mod builder;
pub mod client;
@ -13,18 +13,23 @@ pub mod server;
pub mod stream_emitter;
/// Handles listening to a connection and triggering the corresponding event functions
async fn handle_connection(
namespaces: Arc<HashMap<String, Namespace>>,
handler: Arc<EventHandler>,
mut read_half: OwnedReadHalf,
ctx: Context,
async fn handle_connection<S: 'static + AsyncProtocolStream>(
namespaces: Arc<HashMap<String, Namespace<S>>>,
handler: Arc<EventHandler<S>>,
mut read_half: S::OwnedSplitReadHalf,
ctx: Context<S>,
) {
while let Ok(event) = Event::from_async_read(&mut read_half).await {
log::debug!("Received {:?}:{} event", event.namespace(), event.name());
tracing::trace!(
"event.name = {:?}, event.namespace = {:?}, event.reference_id = {:?}",
event.name(),
event.namespace(),
event.reference_id()
);
// check if the event is a reply
if let Some(ref_id) = event.reference_id() {
tracing::trace!("Event has reference id. Passing to reply listener");
// get the listener for replies
log::trace!("Event is response to {}", ref_id);
if let Some(sender) = ctx.get_reply_sender(ref_id).await {
// try sending the event to the listener for replies
if let Err(event) = sender.send(event) {
@ -32,22 +37,26 @@ async fn handle_connection(
}
continue;
}
log::trace!("No response listener found for event. Passing to regular listener.");
tracing::trace!("No response listener found for event. Passing to regular listener.");
}
if let Some(namespace) = event.namespace().clone().and_then(|n| namespaces.get(&n)) {
log::trace!("Passing event to namespace listener");
tracing::trace!("Passing event to namespace listener");
let handler = Arc::clone(&namespace.handler);
handle_event(Context::clone(&ctx), handler, event);
} else {
log::trace!("Passing event to global listener");
tracing::trace!("Passing event to global listener");
handle_event(Context::clone(&ctx), Arc::clone(&handler), event);
}
}
log::debug!("Connection closed.");
tracing::debug!("Connection closed.");
}
/// Handles a single event in a different tokio context
fn handle_event(ctx: Context, handler: Arc<EventHandler>, event: Event) {
fn handle_event<S: 'static + AsyncProtocolStream>(
ctx: Context<S>,
handler: Arc<EventHandler<S>>,
event: Event,
) {
tokio::spawn(async move {
let id = event.id();
if let Err(e) = handler.handle_event(&ctx, event).await {
@ -64,9 +73,9 @@ fn handle_event(ctx: Context, handler: Arc<EventHandler>, event: Event) {
)
.await
{
log::error!("Error occurred when sending error response: {:?}", e);
tracing::error!("Error occurred when sending error response: {:?}", e);
}
log::error!("Failed to handle event: {:?}", e);
tracing::error!("Failed to handle event: {:?}", e);
}
});
}

@ -1,43 +1,58 @@
use super::handle_connection;
use crate::error::Result;
use crate::events::event_handler::EventHandler;
use crate::ipc::context::Context;
use crate::ipc::context::{Context, ReplyListeners};
use crate::ipc::stream_emitter::StreamEmitter;
use crate::namespaces::namespace::Namespace;
use crate::protocol::{AsyncProtocolStreamSplit, AsyncStreamProtocolListener};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::net::TcpListener;
use std::time::Duration;
use tokio::sync::RwLock;
use typemap_rev::TypeMap;
/// The IPC Server listening for connections.
/// Use the [IPCBuilder](crate::builder::IPCBuilder) to create a server.
/// Usually one does not need to use the IPCServer object directly.
pub struct IPCServer {
pub(crate) handler: EventHandler,
pub(crate) namespaces: HashMap<String, Namespace>,
pub struct IPCServer<L: AsyncStreamProtocolListener> {
pub(crate) handler: EventHandler<L::Stream>,
pub(crate) namespaces: HashMap<String, Namespace<L::Stream>>,
pub(crate) data: TypeMap,
pub(crate) timeout: Duration,
}
impl IPCServer {
impl<L> IPCServer<L>
where
L: AsyncStreamProtocolListener,
{
/// Starts the IPC Server.
/// Invoked by [IPCBuilder::build_server](crate::builder::IPCBuilder::build_server)
pub async fn start(self, address: &str) -> Result<()> {
let listener = TcpListener::bind(address).await?;
#[tracing::instrument(skip(self))]
pub async fn start(self, address: L::AddressType) -> Result<()> {
let listener = L::protocol_bind(address.clone()).await?;
let handler = Arc::new(self.handler);
let namespaces = Arc::new(self.namespaces);
let data = Arc::new(RwLock::new(self.data));
log::debug!("IPC server listening on {}", address);
tracing::info!("address = {:?}", address);
while let Ok((stream, _)) = listener.accept().await {
while let Ok((stream, remote_address)) = listener.protocol_accept().await {
tracing::debug!("remote_address = {:?}", remote_address);
let handler = Arc::clone(&handler);
let namespaces = Arc::clone(&namespaces);
let data = Arc::clone(&data);
let timeout = self.timeout.clone();
tokio::spawn(async {
let (read_half, write_half) = stream.into_split();
tokio::spawn(async move {
let (read_half, write_half) = stream.protocol_into_split();
let emitter = StreamEmitter::new(write_half);
let ctx = Context::new(StreamEmitter::clone(&emitter), data, None);
let reply_listeners = ReplyListeners::default();
let ctx = Context::new(
StreamEmitter::clone(&emitter),
data,
None,
reply_listeners,
timeout.into(),
);
handle_connection(namespaces, handler, read_half, ctx).await;
});

@ -1,36 +1,50 @@
use crate::error::Result;
use crate::error_event::{ErrorEventData, ERROR_EVENT_NAME};
use crate::events::event::Event;
use crate::events::payload::EventSendPayload;
use crate::ipc::context::Context;
use serde::Serialize;
use crate::protocol::AsyncProtocolStream;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::OwnedWriteHalf;
use tokio::sync::Mutex;
/// An abstraction over the raw tokio tcp stream
/// to emit events and share a connection across multiple
/// contexts.
#[derive(Clone)]
pub struct StreamEmitter {
stream: Arc<Mutex<OwnedWriteHalf>>,
pub struct StreamEmitter<S: AsyncProtocolStream> {
stream: Arc<Mutex<S::OwnedSplitWriteHalf>>,
}
impl StreamEmitter {
pub fn new(stream: OwnedWriteHalf) -> Self {
impl<S> Clone for StreamEmitter<S>
where
S: AsyncProtocolStream,
{
fn clone(&self) -> Self {
Self {
stream: Arc::clone(&self.stream),
}
}
}
impl<P> StreamEmitter<P>
where
P: AsyncProtocolStream,
{
pub fn new(stream: P::OwnedSplitWriteHalf) -> Self {
Self {
stream: Arc::new(Mutex::new(stream)),
}
}
pub async fn _emit<T: Serialize>(
#[tracing::instrument(level = "trace", skip(self, data))]
pub async fn _emit<T: EventSendPayload>(
&self,
namespace: Option<&str>,
event: &str,
data: T,
res_id: Option<u64>,
) -> Result<EmitMetadata> {
let data_bytes = rmp_serde::to_vec(&data)?;
log::debug!("Emitting event {:?}:{}", namespace, event);
let data_bytes = data.to_payload_bytes()?;
let event = if let Some(namespace) = namespace {
Event::with_namespace(namespace.to_string(), event.to_string(), data_bytes, res_id)
@ -38,18 +52,20 @@ impl StreamEmitter {
Event::new(event.to_string(), data_bytes, res_id)
};
let event_bytes = event.to_bytes()?;
let event_id = event.id();
let event_bytes = event.into_bytes()?;
{
log::trace!("Writing {} bytes", event_bytes.len());
let mut stream = self.stream.lock().await;
(*stream).write_all(&event_bytes[..]).await?;
tracing::trace!(bytes_len = event_bytes.len());
}
Ok(EmitMetadata::new(event.id()))
Ok(EmitMetadata::new(event_id))
}
/// Emits an event
pub async fn emit<S: AsRef<str>, T: Serialize>(
pub async fn emit<S: AsRef<str>, T: EventSendPayload>(
&self,
event: S,
data: T,
@ -58,7 +74,7 @@ impl StreamEmitter {
}
/// Emits an event to a specific namespace
pub async fn emit_to<S1: AsRef<str>, S2: AsRef<str>, T: Serialize>(
pub async fn emit_to<S1: AsRef<str>, S2: AsRef<str>, T: EventSendPayload>(
&self,
namespace: S1,
event: S2,
@ -69,7 +85,7 @@ impl StreamEmitter {
}
/// Emits a response to an event
pub async fn emit_response<S: AsRef<str>, T: Serialize>(
pub async fn emit_response<S: AsRef<str>, T: EventSendPayload>(
&self,
event_id: u64,
event: S,
@ -79,7 +95,7 @@ impl StreamEmitter {
}
/// Emits a response to an event to a namespace
pub async fn emit_response_to<S1: AsRef<str>, S2: AsRef<str>, T: Serialize>(
pub async fn emit_response_to<S1: AsRef<str>, S2: AsRef<str>, T: EventSendPayload>(
&self,
event_id: u64,
namespace: S1,
@ -113,8 +129,13 @@ impl EmitMetadata {
}
/// Waits for a reply to the given message.
pub async fn await_reply(&self, ctx: &Context) -> Result<Event> {
#[tracing::instrument(skip(self, ctx), fields(self.message_id))]
pub async fn await_reply<P: AsyncProtocolStream>(&self, ctx: &Context<P>) -> Result<Event> {
let reply = ctx.await_reply(self.message_id).await?;
Ok(reply)
if reply.name() == ERROR_EVENT_NAME {
Err(reply.data::<ErrorEventData>()?.into())
} else {
Ok(reply)
}
}
}

@ -3,9 +3,10 @@
//! Client Example:
//! ```no_run
//! use bromine::prelude::*;
//! use tokio::net::TcpListener;
//!
//! /// Callback ping function
//! async fn handle_ping(ctx: &Context, event: Event) -> IPCResult<()> {
//! async fn handle_ping<S: AsyncProtocolStream>(ctx: &Context<S>, event: Event) -> IPCResult<()> {
//! println!("Received ping event.");
//! ctx.emitter.emit_response(event.id(), "pong", ()).await?;
//!
@ -15,25 +16,29 @@
//! pub struct MyNamespace;
//!
//! impl MyNamespace {
//! async fn ping(_ctx: &Context, _event: Event) -> IPCResult<()> {
//! async fn ping<S: AsyncProtocolStream>(_ctx: &Context<S>, _event: Event) -> IPCResult<()> {
//! println!("My namespace received a ping");
//! Ok(())
//! }
//! }
//!
//! impl NamespaceProvider for MyNamespace {
//! fn name() -> String {String::from("my_namespace")}
//! fn name() -> &'static str {"my_namespace"}
//!
//! fn register(handler: &mut EventHandler) {
//! handler.on("ping", callback!(Self::ping))
//! fn register<S: AsyncProtocolStream>(handler: &mut EventHandler<S>) {
//! events!(handler,
//! "ping" => Self::ping,
//! "ping2" => Self::ping
//! );
//! }
//!}
//!
//! #[tokio::main]
//! async fn main() {
//! // create the client
//! let ctx = IPCBuilder::new()
//! .address("127.0.0.1:2020")
//! use std::net::ToSocketAddrs;
//! let ctx = IPCBuilder::<TcpListener>::new()
//! .address("127.0.0.1:2020".to_socket_addrs().unwrap().next().unwrap())
//! // register callback
//! .on("ping", callback!(handle_ping))
//! .namespace("mainspace-client")
@ -55,9 +60,11 @@
//!
//! Server Example:
//! ```no_run
//! use std::net::ToSocketAddrs;
//! use typemap_rev::TypeMapKey;
//! use bromine::IPCBuilder;
//! use bromine::callback;
//! use tokio::net::TcpListener;
//!
//! struct MyKey;
//!
@ -67,8 +74,8 @@
//!
//! // create the server
//!# async fn a() {
//! IPCBuilder::new()
//! .address("127.0.0.1:2020")
//! IPCBuilder::<TcpListener>::new()
//! .address("127.0.0.1:2020".to_socket_addrs().unwrap().next().unwrap())
//! // register callback
//! .on("ping", callback!(ctx, event, async move {
//! println!("Received ping event.");
@ -102,11 +109,14 @@ mod events;
pub mod ipc;
mod macros;
mod namespaces;
pub mod protocol;
pub use events::error_event;
pub use events::event;
pub use events::event_handler;
pub use events::payload;
pub use ipc::builder::IPCBuilder;
pub use ipc::context;
pub use macros::*;
pub use namespaces::builder::NamespaceBuilder;
pub use namespaces::namespace;
@ -118,10 +128,13 @@ pub mod prelude {
pub use crate::event::Event;
pub use crate::event_handler::EventHandler;
pub use crate::ipc::context::Context;
pub use crate::ipc::context::{PoolGuard, PooledContext};
pub use crate::ipc::*;
pub use crate::macros::*;
pub use crate::namespace::Namespace;
pub use crate::namespaces::builder::NamespaceBuilder;
pub use crate::namespaces::provider_trait::*;
pub use crate::payload::*;
pub use crate::protocol::*;
pub use crate::*;
}

@ -17,3 +17,17 @@ macro_rules! namespace {
Namespace::from_provider::<$nsp>()
};
}
#[macro_export]
macro_rules! events{
($handler:expr, $($name:expr => $cb:ident), *) => {
$(
$handler.on($name, callback!($cb));
)*
};
($handler:expr, $($name:expr => $cb:path), *) => {
$(
$handler.on($name, callback!($cb));
)*
}
}

@ -3,18 +3,22 @@ use crate::event::Event;
use crate::events::event_handler::EventHandler;
use crate::ipc::context::Context;
use crate::namespaces::namespace::Namespace;
use crate::protocol::AsyncStreamProtocolListener;
use crate::IPCBuilder;
use std::future::Future;
use std::pin::Pin;
pub struct NamespaceBuilder {
pub struct NamespaceBuilder<L: AsyncStreamProtocolListener> {
name: String,
handler: EventHandler,
ipc_builder: IPCBuilder,
handler: EventHandler<L::Stream>,
ipc_builder: IPCBuilder<L>,
}
impl NamespaceBuilder {
pub(crate) fn new(ipc_builder: IPCBuilder, name: String) -> Self {
impl<L> NamespaceBuilder<L>
where
L: AsyncStreamProtocolListener,
{
pub(crate) fn new(ipc_builder: IPCBuilder<L>, name: String) -> Self {
Self {
name,
handler: EventHandler::new(),
@ -26,7 +30,7 @@ impl NamespaceBuilder {
pub fn on<F: 'static>(mut self, event: &str, callback: F) -> Self
where
F: for<'a> Fn(
&'a Context,
&'a Context<L::Stream>,
Event,
) -> Pin<Box<(dyn Future<Output = Result<()>> + Send + 'a)>>
+ Send
@ -38,7 +42,8 @@ impl NamespaceBuilder {
}
/// Builds the namespace
pub fn build(self) -> IPCBuilder {
#[tracing::instrument(skip(self))]
pub fn build(self) -> IPCBuilder<L> {
let namespace = Namespace::new(self.name, self.handler);
self.ipc_builder.add_namespace(namespace)
}

@ -1,15 +1,31 @@
use crate::events::event_handler::EventHandler;
use crate::protocol::AsyncProtocolStream;
use std::sync::Arc;
#[derive(Clone)]
pub struct Namespace {
#[derive(Debug)]
pub struct Namespace<S: AsyncProtocolStream> {
name: String,
pub(crate) handler: Arc<EventHandler>,
pub(crate) handler: Arc<EventHandler<S>>,
}
impl Namespace {
impl<S> Clone for Namespace<S>
where
S: AsyncProtocolStream,
{
fn clone(&self) -> Self {
Self {
name: self.name.clone(),
handler: Arc::clone(&self.handler),
}
}
}
impl<S> Namespace<S>
where
S: AsyncProtocolStream,
{
/// Creates a new namespace with an event handler to register event callbacks on
pub fn new<S: ToString>(name: S, handler: EventHandler) -> Self {
pub fn new<S2: ToString>(name: S2, handler: EventHandler<S>) -> Self {
Self {
name: name.to_string(),
handler: Arc::new(handler),

@ -1,12 +1,16 @@
use crate::events::event_handler::EventHandler;
use crate::namespace::Namespace;
use crate::protocol::AsyncProtocolStream;
pub trait NamespaceProvider {
fn name() -> String;
fn register(handler: &mut EventHandler);
fn name() -> &'static str;
fn register<S: AsyncProtocolStream>(handler: &mut EventHandler<S>);
}
impl Namespace {
impl<S> Namespace<S>
where
S: AsyncProtocolStream,
{
pub fn from_provider<N: NamespaceProvider>() -> Self {
let name = N::name();
let mut handler = EventHandler::new();

@ -0,0 +1,36 @@
pub mod tcp;
#[cfg(unix)]
pub mod unix_socket;
use crate::prelude::IPCResult;
use async_trait::async_trait;
use std::fmt::Debug;
use tokio::io::{AsyncRead, AsyncWrite};
#[async_trait]
pub trait AsyncStreamProtocolListener: Sized + Send + Sync {
type AddressType: Clone + Debug + Send + Sync;
type RemoteAddressType: Debug + Send + Sync;
type Stream: 'static + AsyncProtocolStream<AddressType = Self::AddressType> + Send + Sync;
async fn protocol_bind(address: Self::AddressType) -> IPCResult<Self>;
async fn protocol_accept(&self) -> IPCResult<(Self::Stream, Self::RemoteAddressType)>;
}
pub trait AsyncProtocolStreamSplit {
type OwnedSplitReadHalf: AsyncRead + Send + Sync + Unpin;
type OwnedSplitWriteHalf: AsyncWrite + Send + Sync + Unpin;
fn protocol_into_split(self) -> (Self::OwnedSplitReadHalf, Self::OwnedSplitWriteHalf);
}
#[async_trait]
pub trait AsyncProtocolStream:
AsyncRead + AsyncWrite + Send + Sync + AsyncProtocolStreamSplit + Sized
{
type AddressType: Clone + Debug + Send + Sync;
async fn protocol_connect(address: Self::AddressType) -> IPCResult<Self>;
}

@ -0,0 +1,45 @@
use crate::prelude::IPCResult;
use crate::protocol::{AsyncProtocolStream, AsyncProtocolStreamSplit, AsyncStreamProtocolListener};
use async_trait::async_trait;
use std::net::SocketAddr;
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::{TcpListener, TcpStream};
#[async_trait]
impl AsyncStreamProtocolListener for TcpListener {
type AddressType = SocketAddr;
type RemoteAddressType = SocketAddr;
type Stream = TcpStream;
async fn protocol_bind(address: Self::AddressType) -> IPCResult<Self> {
let listener = TcpListener::bind(address).await?;
Ok(listener)
}
async fn protocol_accept(&self) -> IPCResult<(Self::Stream, Self::RemoteAddressType)> {
let connection = self.accept().await?;
Ok(connection)
}
}
impl AsyncProtocolStreamSplit for TcpStream {
type OwnedSplitReadHalf = OwnedReadHalf;
type OwnedSplitWriteHalf = OwnedWriteHalf;
fn protocol_into_split(self) -> (Self::OwnedSplitReadHalf, Self::OwnedSplitWriteHalf) {
self.into_split()
}
}
#[async_trait]
impl AsyncProtocolStream for TcpStream {
type AddressType = SocketAddr;
async fn protocol_connect(address: Self::AddressType) -> IPCResult<Self> {
let stream = TcpStream::connect(address).await?;
Ok(stream)
}
}

@ -0,0 +1,51 @@
use crate::error::Result;
use crate::prelude::IPCResult;
use crate::protocol::{AsyncProtocolStream, AsyncProtocolStreamSplit, AsyncStreamProtocolListener};
use async_trait::async_trait;
use std::path::PathBuf;
use tokio::io::Interest;
use tokio::net::unix::OwnedWriteHalf;
use tokio::net::unix::{OwnedReadHalf, SocketAddr};
use tokio::net::{UnixListener, UnixStream};
#[async_trait]
impl AsyncStreamProtocolListener for UnixListener {
type AddressType = PathBuf;
type RemoteAddressType = SocketAddr;
type Stream = UnixStream;
async fn protocol_bind(address: Self::AddressType) -> Result<Self> {
let listener = UnixListener::bind(address)?;
Ok(listener)
}
async fn protocol_accept(&self) -> Result<(Self::Stream, Self::RemoteAddressType)> {
let connection = self.accept().await?;
Ok(connection)
}
}
impl AsyncProtocolStreamSplit for UnixStream {
type OwnedSplitReadHalf = OwnedReadHalf;
type OwnedSplitWriteHalf = OwnedWriteHalf;
fn protocol_into_split(self) -> (Self::OwnedSplitReadHalf, Self::OwnedSplitWriteHalf) {
self.into_split()
}
}
#[async_trait]
impl AsyncProtocolStream for UnixStream {
type AddressType = PathBuf;
async fn protocol_connect(address: Self::AddressType) -> IPCResult<Self> {
let stream = UnixStream::connect(address).await?;
stream
.ready(Interest::READABLE | Interest::WRITABLE)
.await?;
Ok(stream)
}
}

@ -1,12 +1,17 @@
use self::super::utils::PingEventData;
use super::utils::PingEventData;
use crate::prelude::*;
use crate::protocol::AsyncProtocolStream;
use crate::tests::utils::start_test_server;
use std::net::ToSocketAddrs;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::net::TcpListener;
use typemap_rev::TypeMapKey;
async fn handle_ping_event(ctx: &Context, e: Event) -> IPCResult<()> {
async fn handle_ping_event<P: AsyncProtocolStream>(ctx: &Context<P>, e: Event) -> IPCResult<()> {
tokio::time::sleep(Duration::from_secs(1)).await;
let mut ping_data = e.data::<PingEventData>()?;
ping_data.time = SystemTime::now();
ping_data.ttl -= 1;
@ -18,19 +23,36 @@ async fn handle_ping_event(ctx: &Context, e: Event) -> IPCResult<()> {
Ok(())
}
fn get_builder_with_ping(address: &str) -> IPCBuilder {
fn get_builder_with_ping<L: AsyncStreamProtocolListener>(address: L::AddressType) -> IPCBuilder<L> {
IPCBuilder::new()
.on("ping", |ctx, e| Box::pin(handle_ping_event(ctx, e)))
.timeout(Duration::from_secs(10))
.address(address)
}
#[tokio::test]
async fn it_receives_events() {
let builder = get_builder_with_ping("127.0.0.1:8281");
async fn it_receives_tcp_events() {
let socket_address = "127.0.0.1:8281".to_socket_addrs().unwrap().next().unwrap();
it_receives_events::<TcpListener>(socket_address).await;
}
#[cfg(unix)]
#[tokio::test]
async fn it_receives_unix_socket_events() {
let socket_path = PathBuf::from("/tmp/test_socket");
if socket_path.exists() {
std::fs::remove_file(&socket_path).unwrap();
}
it_receives_events::<tokio::net::UnixListener>(socket_path).await;
}
async fn it_receives_events<L: 'static + AsyncStreamProtocolListener>(address: L::AddressType) {
let builder = get_builder_with_ping::<L>(address.clone());
let server_running = Arc::new(AtomicBool::new(false));
tokio::spawn({
let server_running = Arc::clone(&server_running);
let builder = get_builder_with_ping("127.0.0.1:8281");
let builder = get_builder_with_ping::<L>(address);
async move {
server_running.store(true, Ordering::SeqCst);
builder.build_server().await.unwrap();
@ -39,8 +61,9 @@ async fn it_receives_events() {
while !server_running.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
let ctx = builder.build_client().await.unwrap();
let reply = ctx
let pool = builder.build_pooled_client(8).await.unwrap();
let reply = pool
.acquire()
.emitter
.emit(
"ping",
@ -51,46 +74,49 @@ async fn it_receives_events() {
)
.await
.unwrap()
.await_reply(&ctx)
.await_reply(&pool.acquire())
.await
.unwrap();
assert_eq!(reply.name(), "pong");
}
fn get_builder_with_ping_mainspace(address: &str) -> IPCBuilder {
fn get_builder_with_ping_namespace(address: &str) -> IPCBuilder<TcpListener> {
IPCBuilder::new()
.namespace("mainspace")
.on("ping", callback!(handle_ping_event))
.build()
.address(address)
.address(address.to_socket_addrs().unwrap().next().unwrap())
}
pub struct TestNamespace;
impl TestNamespace {
async fn ping(_c: &Context, _e: Event) -> IPCResult<()> {
async fn ping<P: AsyncProtocolStream>(_c: &Context<P>, _e: Event) -> IPCResult<()> {
println!("Ping received");
Ok(())
}
}
impl NamespaceProvider for TestNamespace {
fn name() -> String {
String::from("Test")
fn name() -> &'static str {
"Test"
}
fn register(handler: &mut EventHandler) {
handler.on("ping", callback!(Self::ping))
fn register<S: AsyncProtocolStream>(handler: &mut EventHandler<S>) {
events!(handler,
"ping" => Self::ping,
"ping2" => Self::ping
);
}
}
#[tokio::test]
async fn it_receives_namespaced_events() {
let builder = get_builder_with_ping_mainspace("127.0.0.1:8282");
let builder = get_builder_with_ping_namespace("127.0.0.1:8282");
let server_running = Arc::new(AtomicBool::new(false));
tokio::spawn({
let server_running = Arc::clone(&server_running);
let builder = get_builder_with_ping_mainspace("127.0.0.1:8282");
let builder = get_builder_with_ping_namespace("127.0.0.1:8282");
async move {
server_running.store(true, Ordering::SeqCst);
builder.build_server().await.unwrap();
@ -128,7 +154,10 @@ impl TypeMapKey for ErrorOccurredKey {
type Value = Arc<AtomicBool>;
}
fn get_builder_with_error_handling(error_occurred: Arc<AtomicBool>, address: &str) -> IPCBuilder {
fn get_builder_with_error_handling(
error_occurred: Arc<AtomicBool>,
address: &str,
) -> IPCBuilder<TcpListener> {
IPCBuilder::new()
.insert::<ErrorOccurredKey>(error_occurred)
.on("ping", move |_, _| {
@ -148,7 +177,7 @@ fn get_builder_with_error_handling(error_occurred: Arc<AtomicBool>, address: &st
Ok(())
}),
)
.address(address)
.address(address.to_socket_addrs().unwrap().next().unwrap())
}
#[tokio::test]
@ -181,8 +210,8 @@ async fn it_handles_errors() {
async fn test_error_responses() {
static ADDRESS: &str = "127.0.0.1:8284";
start_test_server(ADDRESS).await.unwrap();
let ctx = IPCBuilder::new()
.address(ADDRESS)
let ctx = IPCBuilder::<TcpListener>::new()
.address(ADDRESS.to_socket_addrs().unwrap().next().unwrap())
.build_client()
.await
.unwrap();
@ -202,7 +231,6 @@ async fn test_error_responses() {
.await
.unwrap()
.await_reply(&ctx)
.await
.unwrap();
assert_eq!(reply.name(), "error");
.await;
assert!(reply.is_err());
}

@ -1,3 +1,3 @@
mod event_tests;
mod ipc_tests;
mod utils;
mod event_tests;

@ -1,10 +1,12 @@
use crate::error::Error;
use crate::IPCBuilder;
use serde::{Deserialize, Serialize};
use std::net::ToSocketAddrs;
use std::time::SystemTime;
use tokio::net::TcpListener;
use tokio::sync::oneshot;
#[derive(Clone, Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct PingEventData {
pub time: SystemTime,
pub ttl: u8,
@ -15,8 +17,8 @@ pub fn start_test_server(address: &'static str) -> oneshot::Receiver<bool> {
let (tx, rx) = oneshot::channel();
tokio::task::spawn(async move {
tx.send(true).unwrap();
IPCBuilder::new()
.address(address)
IPCBuilder::<TcpListener>::new()
.address(address.to_socket_addrs().unwrap().next().unwrap())
.on("ping", |ctx, event| {
Box::pin(async move {
ctx.emitter.emit_response(event.id(), "pong", ()).await?;

Loading…
Cancel
Save