Cluster improvements

- added resource logging for clusters
- changed handling of logging
- moved to yarn for package management
pull/3/head
Trivernis 5 years ago
parent 74e2b81b07
commit 61451882aa

7954
package-lock.json generated

File diff suppressed because it is too large Load Diff

@ -30,7 +30,6 @@
"@types/express-session": "^1.15.14",
"@types/express-socket.io-session": "^1.3.2",
"@types/fs-extra": "^8.0.0",
"@types/graphql": "^14.2.3",
"@types/http-status": "^0.2.30",
"@types/js-yaml": "^3.12.1",
"@types/markdown-it": "0.0.9",
@ -38,8 +37,8 @@
"@types/pg": "^7.11.0",
"@types/sequelize": "^4.28.5",
"@types/socket.io": "^2.1.2",
"@types/socket.io-redis": "^1.0.25",
"@types/validator": "^10.11.3",
"@types/winston": "^2.4.4",
"delete": "^1.1.0",
"gulp": "^4.0.2",
"gulp-minify": "^3.1.0",
@ -48,10 +47,9 @@
"ts-lint": "^4.5.1",
"tsc": "^1.20150623.0",
"tslint": "^5.19.0",
"typescript": "^3.5.3"
"typescript": "^3.7.2"
},
"dependencies": {
"@types/socket.io-redis": "^1.0.25",
"compression": "^1.7.4",
"connect-session-sequelize": "^6.0.0",
"cookie-parser": "^1.4.4",

@ -16,7 +16,7 @@ import {Sequelize} from "sequelize-typescript";
import * as socketIo from "socket.io";
import * as socketIoRedis from "socket.io-redis";
import {resolver} from "./graphql/resolvers";
import dataaccess from "./lib/dataaccess";
import dataaccess from "./lib/dataAccess";
import globals from "./lib/globals";
import routes from "./routes";
@ -41,12 +41,13 @@ class App {
/**
* initializes everything that needs to be initialized asynchronous.
*/
public async init() {
public async init(): Promise<void> {
await dataaccess.init(this.sequelize);
const appSession = session({
cookie: {
maxAge: Number(globals.config.session.cookieMaxAge) || 604800000,
// @ts-ignore
secure: "auto",
},
resave: false,
@ -56,12 +57,14 @@ class App {
});
const force = fsx.existsSync("sqz-force");
logger.info(`Sequelize Table force: ${force}`);
logger.info(`Syncinc database. Sequelize Table force: ${force}.`);
await this.sequelize.sync({force, logging: (msg) => logger.silly(msg)});
logger.info("Setting up socket.io");
await routes.ioListeners(this.io);
this.io.adapter(socketIoRedis());
this.io.use(sharedsession(appSession, {autoSave: true}));
logger.info("Configuring express app.");
this.app.set("views", path.join(__dirname, "views"));
this.app.set("view engine", "pug");
this.app.set("trust proxy", 1);
@ -73,11 +76,12 @@ class App {
this.app.use(cookieParser());
this.app.use(appSession);
// enable cross origin requests if enabled in the config
if (globals.config.server.cors) {
if (globals.config.server?.cors) {
this.app.use(cors());
}
this.app.use((req, res, next) => {
logger.verbose(`${req.method} ${req.url}`);
process.send({cmd: "notifyRequest"});
next();
});
this.app.use(routes.router);
@ -117,13 +121,14 @@ class App {
res.status(httpStatus.INTERNAL_SERVER_ERROR);
res.render("errors/500.pug");
});
logger.info("Server configured.");
}
/**
* Starts the web server.
*/
public start() {
if (globals.config.server.port) {
public start(): void {
if (globals.config.server?.port) {
logger.info(`Starting server...`);
this.app.listen(globals.config.server.port);
logger.info(`Server running on port ${globals.config.server.port}`);

@ -1,6 +1,6 @@
import {GraphQLError} from "graphql";
import * as status from "http-status";
import dataaccess from "../lib/dataaccess";
import dataaccess from "../lib/dataAccess";
import {NotLoggedInGqlError, PostNotFoundGqlError} from "../lib/errors/graphqlErrors";
import globals from "../lib/globals";
import {InternalEvents} from "../lib/InternalEvents";

@ -3,21 +3,90 @@ import * as cluster from "cluster";
import App from "./app";
const numCPUs = require("os").cpus().length;
interface IResourceUsage {
mem: {rss: number, heapTotal: number, heapUsed: number, external: number};
cpu: {user: number, system: number};
}
interface IClusterData {
reqCount: number;
workerCount: () => number;
workerRes: {[key: string]: IResourceUsage};
}
if (cluster.isMaster) {
console.log(`[CLUSTER] Master ${process.pid} is running`);
console.log(`[CLUSTER-M] Master ${process.pid} is running`);
const clusterData: IClusterData = {
reqCount: 0,
workerCount: () => Object.keys(cluster.workers).length,
// @ts-ignore
workerRes: {},
};
setInterval(() => {
clusterData.workerRes.M = {
cpu: process.cpuUsage(),
mem: process.memoryUsage(),
};
}, 1000);
const log = (msg: string) => {
process.stdout.write(" ".padEnd(50) + "\r");
process.stdout.write(msg);
process.stdout.write(
`[C] W: ${clusterData.workerCount()}, Rq: ${clusterData.reqCount}, Mem: ${(() => {
let usageString = "";
for (const [key, value] of Object.entries(clusterData.workerRes)) {
usageString += `[${key}] ${
Math.round((value as IResourceUsage).mem.heapUsed / 10000) / 100}MB,`.padEnd(13);
}
return usageString;
})()}`.padEnd(49) + "\r");
};
cluster.settings.silent = true;
cluster.on("exit", (worker, code, signal) => {
log(`[CLUSTER-M] Worker ${worker.process.pid} died!\n`);
log("[CLUSTER-M] Starting new worker\n");
cluster.fork();
});
cluster.on("online", (worker) => {
worker.process.stdout.on("data", (data) => {
log(`[CLUSTER-${worker.id}] ${data}`);
});
});
cluster.on("message", (worker, message) => {
switch (message.cmd) {
case "notifyRequest":
clusterData.reqCount++;
log("");
break;
case "notifyResources":
// @ts-ignore
clusterData.workerRes[worker.id] = message.data;
log("");
break;
default:
break;
}
});
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on("exit", (worker, code, signal) => {
console.log(`[CLUSTER] Worker ${worker.process.pid} died!`);
});
} else {
/**
* async main function wrapper.
*/
(async () => {
const app = new App(process.pid);
setInterval(() => {
process.send({cmd: "notifyResources", data: {
cpu: process.cpuUsage(),
mem: process.memoryUsage(),
}});
}, 1000);
const app = new App(cluster.worker.id);
await app.init();
app.start();
})();

@ -35,7 +35,7 @@ namespace dataaccess {
/**
* Initializes everything that needs to be initialized asynchronous.
*/
export async function init(seq: Sequelize) {
export async function init(seq: Sequelize): Promise<void> {
sequelize = seq;
try {
await sequelize.addModels([
@ -131,7 +131,7 @@ namespace dataaccess {
* @param offset
* @param sort
*/
export async function getPosts(first: number, offset: number, sort: SortType) {
export async function getPosts(first: number, offset: number, sort: SortType): Promise<models.Post[]> {
if (sort === SortType.NEW) {
return models.Post.findAll({
include: [{association: "rVotes"}],
@ -140,6 +140,7 @@ namespace dataaccess {
order: [["createdAt", "DESC"]],
});
} else {
// more performant way to get the votes with plain sql
return await sequelize.query(
`SELECT * FROM (
SELECT *,

@ -1,4 +1,4 @@
import dataaccess from "../dataaccess";
import dataaccess from "../dataAccess";
import {BaseError} from "./BaseError";
export class RequestNotFoundError extends BaseError {

@ -1,12 +1,13 @@
import {EventEmitter} from "events";
import * as fsx from "fs-extra";
import * as yaml from "js-yaml";
import * as path from "path";
import * as winston from "winston";
require("winston-daily-rotate-file");
const configPath = "config.yaml";
const defaultConfig = __dirname + "/../default-config.yaml";
const defaultConfig = path.join(__dirname, "/../default-config.yaml");
// ensure that the config exists by copying the default config.
if (!(fsx.pathExistsSync(configPath))) {
@ -21,9 +22,9 @@ if (!(fsx.pathExistsSync(configPath))) {
* Defines global variables to be used.
*/
namespace globals {
export const config = yaml.safeLoad(fsx.readFileSync("config.yaml", "utf-8"));
export const config: IConfig = yaml.safeLoad(fsx.readFileSync("config.yaml", "utf-8"));
// @ts-ignore
export const logger = winston.createLogger({
export const logger: winston.Logger = winston.createLogger({
transports: [
new winston.transports.Console({
format: winston.format.combine(
@ -33,7 +34,7 @@ namespace globals {
return `${timestamp} ${level}: ${message}`;
}),
),
level: config.logging.level,
level: config.logging?.level ?? "info",
}),
// @ts-ignore
new (winston.transports.DailyRotateFile)({
@ -46,13 +47,13 @@ namespace globals {
}),
),
json: false,
level: config.logging.level,
level: config.logging?.level ?? "info",
maxFiles: "7d",
zippedArchive: true,
}),
],
});
export const internalEmitter = new EventEmitter();
export const internalEmitter: EventEmitter = new EventEmitter();
}
export default globals;

@ -0,0 +1,67 @@
/**
* An interface for the configuration file
*/
interface IConfig {
/**
* Database connection info
*/
database: {
/**
* A connection uri for the database. <type>://<user>:<password>@<ip/domain>/<database>
*/
connectionUri: string;
};
/**
* Configuration for the http server
*/
server?: {
/**
* The port to listen on
*/
port?: number;
/**
* If cross origin requests should be enabled
*/
cors?: false;
};
/**
* The session configuration
*/
session: {
/**
* A secure secret to be used for sessions
*/
secret: string;
/**
* The maximum cookie age before the session gets deleted
*/
cookieMaxAge: number;
};
/**
* Configuration for markdown parsing
*/
markdown?: {
/**
* The plugins to use for parsing
*/
plugins: string[];
};
/**
* Logging configuration
*/
logging?: {
/**
* The loglevel that is used for the console and logfiles
*/
level?: ("silly" | "debug" | "verbose" | "info" | "warn" | "error");
};
/**
* The frontend configuration
*/
frontend?: {
/**
* Points to the index.html which is loaded as a fallback for angular to work
*/
angularIndex?: string;
};
}

@ -20,7 +20,7 @@ namespace markdown {
* Renders the markdown string inline (without blocks).
* @param markdownString
*/
export function renderInline(markdownString: string) {
export function renderInline(markdownString: string): string {
return md.renderInline(markdownString);
}
@ -28,7 +28,7 @@ namespace markdown {
* Renders the markdown string.
* @param markdownString
*/
export function render(markdownString: string) {
export function render(markdownString: string): string {
return md.render(markdownString);
}
}

@ -29,8 +29,8 @@ export class Event extends Model<Event> {
}
public async participants({first, offset}: {first: number, offset: number}): Promise<User[]> {
const limit = first || 10;
offset = offset || 0;
const limit = first ?? 10;
offset = offset ?? 0;
return await this.$get("rParticipants", {limit, offset}) as User[];
}
}

@ -41,14 +41,14 @@ export class Group extends Model<Group> {
}
public async admins({first, offset}: { first: number, offset: number }): Promise<User[]> {
const limit = first || 10;
offset = offset || 0;
const limit = first ?? 10;
offset = offset ?? 0;
return await this.$get("rAdmins", {limit, offset}) as User[];
}
public async members({first, offset}: { first: number, offset: number }): Promise<User[]> {
const limit = first || 10;
offset = offset || 0;
const limit = first ?? 10;
offset = offset ?? 0;
return await this.$get("rMembers", {limit, offset}) as User[];
}
@ -57,8 +57,8 @@ export class Group extends Model<Group> {
}
public async events({first, offset}: { first: number, offset: number }): Promise<Event[]> {
const limit = first || 10;
offset = offset || 0;
const limit = first ?? 10;
offset = offset ?? 0;
return await this.$get("rEvents", {limit, offset}) as Event[];
}
}

@ -45,14 +45,14 @@ export class Post extends Model<Post> {
}
public async vote(userId: number, type: VoteType): Promise<VoteType> {
type = type || VoteType.UPVOTE;
type = type ?? VoteType.UPVOTE;
let votes = await this.$get("rVotes", {where: {id: userId}}) as Array<User & {PostVote: PostVote}>;
let vote = votes[0] || null;
let vote = votes[0] ?? null;
let created = false;
if (!vote) {
await this.$add("rVote", userId);
votes = await this.$get("rVotes", {where: {id: userId}}) as Array<User & {PostVote: PostVote}>;
vote = votes[0] || null;
vote = votes[0] ?? null;
created = true;
}
if (vote) {

@ -125,8 +125,8 @@ export class User extends Model<User> {
* @param offset
*/
public async friends({first, offset}: { first: number, offset: number }): Promise<User[]> {
const limit = first || 10;
offset = offset || 0;
const limit = first ?? 10;
offset = offset ?? 0;
return await this.$get("rFriendOf", {limit, offset}) as User[];
}
@ -143,8 +143,8 @@ export class User extends Model<User> {
* @param offset
*/
public async chats({first, offset}: { first: number, offset: number }): Promise<ChatRoom[]> {
const limit = first || 10;
offset = offset || 0;
const limit = first ?? 10;
offset = offset ?? 0;
return await this.$get("rChats", {limit, offset}) as ChatRoom[];
}
@ -170,8 +170,8 @@ export class User extends Model<User> {
}
public async posts({first, offset}: { first: number, offset: number }): Promise<Post[]> {
const limit = first || 10;
offset = offset || 0;
const limit = first ?? 10;
offset = offset ?? 0;
return await this.$get("rPosts", {limit, offset}) as Post[];
}
@ -210,8 +210,8 @@ export class User extends Model<User> {
* @param offset
*/
public async groups({first, offset}: { first: number, offset: number }): Promise<Group[]> {
const limit = first || 10;
offset = offset || 0;
const limit = first ?? 10;
offset = offset ?? 0;
return await this.$get("rGroups", {limit, offset}) as Group[];
}

@ -1,6 +1,6 @@
import {Router} from "express";
import {Namespace, Server} from "socket.io";
import dataaccess from "../lib/dataaccess";
import dataaccess from "../lib/dataAccess";
import globals from "../lib/globals";
import {InternalEvents} from "../lib/InternalEvents";
import {ChatMessage, ChatRoom, Post, Request, User} from "../lib/models";

@ -2,13 +2,16 @@
"compileOnSave": true,
"compilerOptions": {
"noImplicitAny": true,
"noImplicitThis": true,
"removeComments": true,
"preserveConstEnums": true,
"allowSyntheticDefaultImports": true,
"outDir": "./dist",
"sourceMap": true,
"target": "es2018",
"allowJs": true,
"allowJs": false,
"forceConsistentCasingInFileNames": true,
"strictFunctionTypes": true,
"moduleResolution": "node",
"module": "commonjs",
"experimentalDecorators": true,

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save