Add RPC Client implementation

- Add Message class to serialize and deserialize rpc messages
- Add Method enum for all rpc methods represented as 32 bit unsigned integers
- Add responses file to map rpc responses to types
- Add implementation of userserver api with PromiseSocket

Signed-off-by: trivernis <trivernis@protonmail.com>
pull/3/head
trivernis 4 years ago
parent 3164404218
commit d475851bb3
Signed by: Trivernis
GPG Key ID: DFFFCC2C7A02DB45

2
.gitignore vendored

@ -1,2 +1,4 @@
node_modules
dist
.env
.idea

60
package-lock.json generated

@ -306,6 +306,14 @@
"@types/express": "*"
}
},
"@types/crc": {
"version": "3.4.0",
"resolved": "https://registry.npmjs.org/@types/crc/-/crc-3.4.0.tgz",
"integrity": "sha1-I2a+tDmc1zSzPkLHrICVduYX1Io=",
"requires": {
"@types/node": "*"
}
},
"@types/express": {
"version": "4.17.8",
"resolved": "https://registry.npmjs.org/@types/express/-/express-4.17.8.tgz",
@ -2071,6 +2079,14 @@
"vary": "^1"
}
},
"crc": {
"version": "3.8.0",
"resolved": "https://registry.npmjs.org/crc/-/crc-3.8.0.tgz",
"integrity": "sha512-iX3mfgcTMIq3ZKLIsVFAbv7+Mc10kxabAGQb8HvjA1o3T1PIYprbakQ65d3I+2HGHt6nSKkM9PYjgoJO2KcFBQ==",
"requires": {
"buffer": "^5.1.0"
}
},
"cross-spawn": {
"version": "7.0.3",
"resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.3.tgz",
@ -4958,6 +4974,11 @@
"integrity": "sha512-8q7VEgMJW4J8tcfVPy8g09NcQwZdbwFEqhe/WZkoIzjn/3TGDwtOCYtXGxA3O8tPzpczCCDgv+P2P5y00ZJOOg==",
"dev": true
},
"messagepack": {
"version": "1.1.12",
"resolved": "https://registry.npmjs.org/messagepack/-/messagepack-1.1.12.tgz",
"integrity": "sha512-pNB6K4q4VMLRXdvlGZkTtQhmKFntvLisnOQnL0VhKpZooL8B8Wsv5TXuidIJil0bCH6V172p3+Onfyow0usPYQ=="
},
"methods": {
"version": "1.1.2",
"resolved": "https://registry.npmjs.org/methods/-/methods-1.1.2.tgz",
@ -5893,6 +5914,45 @@
"integrity": "sha512-7PiHtLll5LdnKIMw100I+8xJXR5gW2QwWYkT6iJva0bXitZKa/XMrSbdmg3r2Xnaidz9Qumd0VPaMrZlF9V9sA==",
"dev": true
},
"promise-duplex": {
"version": "6.0.0",
"resolved": "https://registry.npmjs.org/promise-duplex/-/promise-duplex-6.0.0.tgz",
"integrity": "sha512-ZL7rquzjTFzInDBeWYcsT+qddolNvzigahk6MI6qLSbQvlyRRCJkU3JztgaVunzvkH28smRa2Qu/cY9RXtSkgA==",
"requires": {
"core-js": "^3.6.5",
"promise-readable": "^6.0.0",
"promise-writable": "^6.0.0"
}
},
"promise-readable": {
"version": "6.0.0",
"resolved": "https://registry.npmjs.org/promise-readable/-/promise-readable-6.0.0.tgz",
"integrity": "sha512-5NxtmUswijvX5cAM0zPSy6yiCXH/eKBpiiBq6JfAUrmngMquMbzcBhF2qA+ocs4rYYKdvAfv3cOvZxADLtL1CA==",
"requires": {
"core-js": "^3.6.5"
}
},
"promise-socket": {
"version": "7.0.0",
"resolved": "https://registry.npmjs.org/promise-socket/-/promise-socket-7.0.0.tgz",
"integrity": "sha512-Oic9BrxmcHOPEnzKp2Js+ehFyvsbd0WxsE5khweCTHuRvdzbXjHUZmSDT6F9TW8SIkAJ0lCzoHjMYnb0WQJPiw==",
"requires": {
"promise-duplex": "^6.0.0",
"tslib": "^2.0.1"
},
"dependencies": {
"tslib": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/tslib/-/tslib-2.0.1.tgz",
"integrity": "sha512-SgIkNheinmEBgx1IUNirK0TUD4X9yjjBRTqqjggWCU3pUEqIk3/Uwl3yRixYKT6WjQuGiwDv4NomL3wqRCj+CQ=="
}
}
},
"promise-writable": {
"version": "6.0.0",
"resolved": "https://registry.npmjs.org/promise-writable/-/promise-writable-6.0.0.tgz",
"integrity": "sha512-b81zre/itgJFS7dwWzIdKNVVqvLiUxYRS/wolUB0H1YY/tAaS146XGKa4Q/5wCbsnXLyn0MCeV6f8HHe4iUHLg=="
},
"proxy-addr": {
"version": "2.0.6",
"resolved": "https://registry.npmjs.org/proxy-addr/-/proxy-addr-2.0.6.tgz",

@ -30,9 +30,14 @@
"typescript": "^4.0.2"
},
"dependencies": {
"@types/crc": "^3.4.0",
"apollo-server": "^2.17.0",
"crc": "^3.8.0",
"dotenv": "^8.2.0",
"graphql": "^15.3.0",
"messagepack": "^1.1.12",
"pg": "^8.3.3",
"promise-socket": "^7.0.0",
"reflect-metadata": "^0.1.13",
"typeorm": "^0.2.26"
}

@ -0,0 +1,43 @@
import { encode, decode } from 'messagepack'
import { Method } from './method'
import { crc32 } from 'crc'
export class RPCMessage<T> {
private readonly method: Method
readonly data: T
constructor (method: Method, data: any) {
this.method = method
this.data = data
}
static fromBuffer<T> (raw: Buffer): RPCMessage<T> {
const length = raw.readUInt32BE()
if (raw.length !== length) {
throw new Error('Invalid Buffer length')
}
const crcNum = raw.readUInt32BE(length - 4)
if (crc32(raw.slice(0, length - 4)) !== crcNum) {
throw new Error('Validation check failed')
}
const method = raw.readUInt32BE(4)
const msgData = decode(raw.slice(8, length))
return new RPCMessage(method, msgData)
}
public toBuffer (): Buffer {
const msgData = encode(this.data)
const length = msgData.length + 12
const buffer = Buffer.alloc(length - 4)
buffer.writeUInt32BE(length)
buffer.writeUInt32BE(this.method, 4)
buffer.fill(msgData, 8)
const crcNum = crc32(buffer)
const resultBuffer = Buffer.alloc(length, buffer)
resultBuffer.writeUInt32BE(crcNum, length - 4)
return resultBuffer
}
}

@ -0,0 +1,11 @@
/* eslint no-unused-vars: 0 */
export enum Method {
Null = 0x00,
Error = 0x0F_0F_0F_0F,
Info = 0x49_4e_46_4f,
ValidateToken = 0x56_41_4c_49,
GetRoles = 0x52_4f_4c_45,
GetRolePermissions = 0x50_45_52_4d,
CreateRole = 0x43_52_4f_4c,
CreatePermissionn = 0x43_50_45_52
}

@ -0,0 +1,5 @@
// Response in the form of [valid, ttl]
export type ValidateTokenResponse = [boolean, number]
// Info in the form of [name, binary, description, request_structure][]
export type GetInfoResponse = [string, string, string][]

@ -1,13 +1,64 @@
import { DataSource } from 'apollo-datasource'
import { Socket } from 'net'
import { PromiseSocket } from 'promise-socket'
import { RPCMessage } from './message'
import { Method } from './method'
import { GetInfoResponse, ValidateTokenResponse } from './responses'
/**
* fetches datafrom user server, especially validates user tokens
*/
export class UserServerAPI extends DataSource {
port: number
host: string
constructor (address: string) {
super()
const parts = address.split(':')
this.host = parts[0]
if (parts.length === 2) {
this.port = Number(parts[1])
} else {
this.port = 5000
}
}
async getInfo (): Promise<GetInfoResponse> {
const response = await this.send<GetInfoResponse>(new RPCMessage(Method.Info, null))
return response.data
}
/**
* validates user token
*/
async validateToken (token:string) {
return true
async validateToken (token:string): Promise<boolean> {
const response = await this.send<ValidateTokenResponse>(new RPCMessage(Method.ValidateToken, { token }))
if (response) {
return response.data[0]
} else {
return false
}
}
/**
* Connects to the socket and returns the client
*/
async getSocket (): Promise<PromiseSocket<Socket>> {
const socket = new Socket()
const promiseSocket = new PromiseSocket(socket)
await promiseSocket.connect(this.port, this.host)
return promiseSocket
}
async send<T> (message: RPCMessage<any>): Promise<RPCMessage<T>> {
const socket = await this.getSocket()
await socket.writeAll(message.toBuffer())
const response = await socket.readAll() as Buffer
if (response?.length > 0) {
return RPCMessage.fromBuffer<T>(response)
}
}
}

@ -4,6 +4,9 @@ import { CargoBikeAPI } from './datasources/db/cargobikeAPI'
import typeDefs from './schema/type-defs'
import 'reflect-metadata'
import { createConnection } from 'typeorm'
import { UserServerAPI } from './datasources/userserver/userserviceAPI'
require('dotenv').config()
createConnection().then(async () => {
console.log('connected to db')
@ -13,7 +16,8 @@ const server = new ApolloServer({
resolvers: [bikeresolver],
typeDefs,
dataSources: () => ({
cargoBikeAPI: new CargoBikeAPI()
cargoBikeAPI: new CargoBikeAPI(),
userAPI: new UserServerAPI(process.env.RPC_HOST)
})
})

Loading…
Cancel
Save