feat(notify): base setup for notifications module

This commit is contained in:
Matthieu Haineault 2025-08-08 13:30:32 -04:00
parent 109a80a0f0
commit dc8c4d048c
10 changed files with 94 additions and 159 deletions

View File

@ -2398,7 +2398,8 @@
"UNPAID",
"BEREAVEMENT",
"PARENTAL",
"LEGAL"
"LEGAL",
"WEDDING"
],
"description": "type of leave request for an accounting perception"
},

View File

@ -8,7 +8,6 @@ import { BusinessLogicsModule } from './modules/business-logics/business-logics.
import { CsvExportModule } from './modules/exports/csv-exports.module';
import { CustomersModule } from './modules/customers/customers.module';
import { EmployeesModule } from './modules/employees/employees.module';
import { EventEmitterModule } from '@nestjs/event-emitter';
import { ExpensesModule } from './modules/expenses/expenses.module';
import { HealthModule } from './health/health.module';
import { HealthController } from './health/health.controller';
@ -31,7 +30,6 @@ import { UsersModule } from './modules/users-management/users.module';
CsvExportModule,
CustomersModule,
EmployeesModule,
EventEmitterModule.forRoot(),
ExpensesModule,
HealthModule,
LeaveRequestsModule,

View File

@ -0,0 +1,23 @@
import { Controller, Get, Req, Sse, UseGuards,
MessageEvent as NestMessageEvent } from "@nestjs/common";
import { JwtAuthGuard } from "../authentication/guards/jwt-auth.guard";
import { NotificationsService } from "./notifications.service";
import { Observable } from "rxjs";
import { map } from 'rxjs/operators';
@UseGuards(JwtAuthGuard)
@Controller('notifications')
export class NotificationsController {
constructor(private readonly notificationsService: NotificationsService) {}
@Get('summary')
async summary(@Req() req) {
return this.notificationsService.summary(String(req.user.id));
}
@Sse('stream')
stream(@Req() req): Observable<NestMessageEvent> {
const userId = String(req.user.id);
return this.notificationsService.stream(userId).pipe(map((data): NestMessageEvent => ({ data })))
}
}

View File

@ -0,0 +1,10 @@
import { Module } from "@nestjs/common";
import { NotificationsController } from "./notifications.controller";
import { NotificationsService } from "./notifications.service";
@Module({
providers: [NotificationsService],
controllers: [NotificationsController],
exports: [NotificationsService],
})
export class NotificationsModule {}

View File

@ -1,73 +1,62 @@
import { InjectQueue } from "@nestjs/bullmq";
import { Injectable, Logger } from "@nestjs/common";
import { Queue } from "bullmq";
import { TimesheetsService } from "../timesheets/services/timesheets.service";
import { ShiftsService } from "../shifts/services/shifts.service";
import { ExpensesService } from "../expenses/services/expenses.service";
import { EmployeesService } from "../employees/services/employees.service";
import { LeaveRequestsService } from "../leave-requests/services/leave-requests.service";
export interface DigestItem {
title: string;
description: string;
link?: string;
}
import { Subject } from "rxjs";
import { NotificationCard } from "./notifications.types";
@Injectable()
export class NotificationsService {
private readonly logger = new Logger(NotificationsService.name);
constructor(
@InjectQueue('notifications') private readonly queue: Queue,
private readonly timesheetsService : TimesheetsService,
private readonly shiftsService : ShiftsService,
private readonly expensesService : ExpensesService,
private readonly employeesService : EmployeesService,
private readonly leaveRequestsService: LeaveRequestsService,
) {}
async queueNotification(channel: string, payload: Record<string,any>): Promise<void> {
await this.queue.add(channel, payload);
this.logger.debug(`Enqueued notification on channel= "${channel}"`);
//Server-Sent Events FLUX and a buffer per user
private streams = new Map<string, Subject<NotificationCard>>();
private buffers = new Map<string, NotificationCard[]>();
private readonly BUFFER_MAX = Number(process.env.NOTIF_BUFFER_MAX ?? 50);
private getOrCreateStream(userId: string): Subject<NotificationCard> {
let stream = this.streams.get(userId);
if (!stream){
stream = new Subject<NotificationCard>();
this.streams.set(userId, stream);
}
return stream;
}
private getOrCreateBuffer(userId: string){
let buffer = this.buffers.get(userId);
if(!buffer) {
buffer = [];
this.buffers.set(userId, buffer);
}
return buffer;
}
async buildWeeklyDigest(): Promise<{recipients: string[], items: DigestItem[]}> {
//TO DO add logic of missing shifts, overtime alert, vacation alerts, leave-requests, etc...
//fetching all business datas
//const missingShifts = await this.timesheetsService.findMissingShftsLastWeek();
const items: DigestItem[] = [
//example:
{title: 'Carte de temps incomplete', description: 'Des employes n`ont pas saisi leurs quarts de travail'},
{title: 'Overtime détecté', description: '....'},
];
const recipients = [
//exemple : await this.userService.findSupervisorsEmails();
'supervisor@targointernet.com',
'accounting@targointernet.com',
];
return {recipients, items}
//in-app pushes and keep a small history
notify(userId: string, card: NotificationCard) {
const buffer = this.getOrCreateBuffer(userId);
buffer.unshift(card);
if (buffer.length > this.BUFFER_MAX) {
buffer.length = this.BUFFER_MAX;
}
this.getOrCreateStream(userId).next(card);
this.logger.debug(`Notification in-app => user: ${userId} (${card.type})`);
}
async buildMonthlyDigest(): Promise<{ recipients: string[], items: DigestItem[]}> {
//const anniversaries = await this.employeesService.findAnniversariesthisMonth();
//const totalOvertime = await this.timesheetsService.calculateTotalOvertimeThisMonth();
const items: DigestItem[] = [
{title: '5 ans d`ancienneté', description:'Marc-André, Jessy'},
{title: '10 ans d`ancienneté', description:'Kadi, Maxime'},
{title: 'Calendrier Annuel', description: 'Nouveau calendrier de l`an prochain maintenant disponible!'},
// ...
];
const recipients = [
'allemployees@targointernent.com',
];
return { recipients, items };
//SSE flux for current user
stream(userId: string) {
return this.getOrCreateStream(userId).asObservable();
}
//return a summary of notifications kept in memory
async summary(userId: string): Promise<NotificationCard[]> {
return this.getOrCreateBuffer(userId);
}
//clear buffers from memory
clear(userId: string) {
this.buffers.set(userId, []);
}
onModuleDestroy() {
for (const stream of this.streams.values()) stream.complete();
this.streams.clear();
this.buffers.clear();
}
}

View File

@ -1,94 +0,0 @@
import { Injectable, Logger } from "@nestjs/common";
import { Cron, Interval, SchedulerRegistry } from "@nestjs/schedule";
import { NotificationsService as Orchestrator } from './notifications.service';
@Injectable()
export class NotificationsService {
private readonly logger = new Logger(NotificationsService.name);
constructor(
private readonly schedulerRegistry: SchedulerRegistry,
private readonly orchestrator: Orchestrator,
) {}
//cache purging
//@TimeOut(15_000)
async onStartup() {
this.logger.debug('Startup cleanup: initial verifications');
//clean up of useless cache on start up
}
//Q monitoring
@Interval(300_000)
async monitorQueueHealth() {
this.logger.debug('monitoring notification queue')
//this.orchestrator.checkQueueLength();
//monitor backlog for overload and such
}
//weekly cron jobs
@Cron('0 0 8 * * 1', {
name: 'weeklyDigest',
timeZone: 'America/Toronto',
})
async sendWeeklyDigest() {
this.logger.debug('Building Weekly digest');
const { recipients, items } = await this.orchestrator.buildWeeklyDigest();
await this.orchestrator.queueNotification('email', {
to: recipients,
subject: '[Journal Hebdo] Sommaire de la semaine dernière',
template: 'weekly-digest',
context: { items },
});
this.logger.debug('Weekly digest Queued');
}
async disableWeeklyDigest() {
const job = this.schedulerRegistry.getCronJob('weeklyDigest');
job.stop();
this.logger.debug(`Weekly digest stopped`);
}
async enableWeeklyDigest() {
const job = this.schedulerRegistry.getCronJob('weeklyDigest');
job.start();
this.logger.debug(`Weekly digest started`);
}
//monthly cron jobs
@Cron('0 0 9 1 * *', {
name: 'monthlyDigest',
timeZone: 'America/Toronto',
})
async sendMonthlyDigest() {
this.logger.debug('Building Monthly digest');
const {recipients, items} = await this.orchestrator.buildMonthlyDigest();
await this.orchestrator.queueNotification('email', {
to: recipients,
subject: '[Journal Mensuel] Sommaire du mois',
template: 'monthly-digest',
context: { items },
})
this.logger.debug('Monthly digest queued');
}
async disableMonthlyDigest() {
const job = this.schedulerRegistry.getCronJob('monthlyDigest');
job.stop();
this.logger.debug(`Monthly digest stopped`);
}
async enableMonthlyDigest() {
const job = this.schedulerRegistry.getCronJob('monthlyDigest');
job.start();
this.logger.debug(`Monthly digest stopped`);
}
}
function TimeOut(arg0: number): (target: Orchestrator, propertyKey: "onStartup", descriptor: TypedPropertyDescriptor<() => Promise<void>>) => void | TypedPropertyDescriptor<() => Promise<void>> {
throw new Error("Function not implemented.");
}

View File

@ -0,0 +1,9 @@
export type NotificationCard = {
type: string;
message: string;
severity?: 'info'|'warn'|'error';
icon?: string;
link?: string;
meta?: Record<string, any>
ts: string; //new Date().toISOString()
};

View File

@ -36,7 +36,6 @@ export class PayPeriodsController {
@ApiOperation({ summary: 'detailed view of a pay period'})
@ApiResponse({ status: 200,description: 'Pay period overview found', type: PayPeriodOverviewDto })
@ApiResponse({status: 400, description: 'Pay period not found' })
async getOverview(@Param('periodNumber', ParseIntPipe) periodNumber: number):
Promise<PayPeriodOverviewDto> {
return this.overviewService.getOverview(periodNumber);