- 发布时间
Nestjs|webSocket
官网:https://docs.nestjs.cn/websockets/gateways
src/
├── app.module.ts
├── main.ts
├── websocket/
│ ├── websocket.module.ts
│ ├── websocket.gateway.ts
│ ├── websocket.service.ts
│ ├── dto/
│ │ ├── message.dto.ts
│ │ └── join-room.dto.ts
│ ├── interfaces/
│ │ └── socket-user.interface.ts
│ └── adapters/
│ └── redis-adapter.ts
└── chat/
└── chat.controller.ts
安装依赖
npm install @nestjs/websockets @nestjs/platform-socket.io socket.io
npm install -D @types/socket.io
# 可选:Redis适配器
npm install socket.io-redis @types/socket.io-redis
- 适配器(Adapter)在 NestJS 中主要用于协议转换和底层库抽象
- HTTP 适配器 - 适配 Express/Fastify
- WebSocket 适配器 - 适配 Socket.io/ws
- GraphQL 适配器 - 适配 Apollo/Mercurius
服务端代码
# 网关实现
import {
WebSocketGateway,
WebSocketServer,
SubscribeMessage,
MessageBody,
ConnectedSocket,
OnGatewayInit,
OnGatewayConnection,
OnGatewayDisconnect,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { Logger, UseFilters, UseGuards, UsePipes, ValidationPipe } from '@nestjs/common';
import { WebSocketService } from './websocket.service';
import { SendMessageDto } from './dto/message.dto';
import { JoinRoomDto } from './dto/join-room.dto';
import { WsJwtGuard } from './guards/ws-jwt.guard';
import { WsExceptionFilter } from './filters/ws-exception.filter';
@WebSocketGateway({
cors: {
origin: '*', // 生产环境请配置具体域名
credentials: true,
},
namespace: '/chat',
transports: ['websocket', 'polling'],
})
@UseFilters(WsExceptionFilter)
@UsePipes(new ValidationPipe())
export class WebSocketGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer()
server: Server;
private logger: Logger = new Logger('WebSocketGateway');
constructor(private readonly webSocketService: WebSocketService) {}
afterInit(server: Server) {
this.logger.log('WebSocket Gateway Initialized');
// 设置最大监听器数量
server.sockets.setMaxListeners(20);
}
async handleConnection(client: Socket) {
try {
// 从查询参数获取token
const token = client.handshake.query.token as string;
// 验证用户
const user = await this.webSocketService.validateUser(token);
if (!user) {
client.disconnect();
return;
}
// 存储用户信息到socket
client.data.user = user;
// 加入用户专属房间
client.join(`user_${user.id}`);
// 更新在线用户列表
await this.webSocketService.addOnlineUser(user.id, client.id);
// 广播用户上线通知
this.server.emit('user-online', {
userId: user.id,
username: user.username,
timestamp: new Date(),
});
this.logger.log(`Client connected: ${client.id}, User: ${user.username}`);
// 发送连接成功消息
client.emit('connected', {
message: 'Connected successfully',
userId: user.id,
serverTime: new Date(),
});
} catch (error) {
this.logger.error(`Connection error: ${error.message}`);
client.disconnect();
}
}
@SubscribeMessage('join-room')
@UseGuards(WsJwtGuard)
async handleJoinRoom(
@ConnectedSocket() client: Socket,
@MessageBody() joinRoomDto: JoinRoomDto,
) {
const { roomId } = joinRoomDto;
const user = client.data.user;
// 离开之前加入的房间
const previousRooms = Array.from(client.rooms).filter(room => room.startsWith('room_'));
previousRooms.forEach(room => client.leave(room));
// 加入新房间
client.join(`room_${roomId}`);
// 通知房间内其他用户
client.to(`room_${roomId}`).emit('user-joined', {
userId: user.id,
username: user.username,
roomId,
timestamp: new Date(),
});
}
@SubscribeMessage('read-receipt')
@UseGuards(WsJwtGuard)
async handleReadReceipt(
@ConnectedSocket() client: Socket,
@MessageBody() data: { messageId: string; roomId: string },
) {
const user = client.data.user;
// 更新消息已读状态
await this.webSocketService.markMessageAsRead(data.messageId, user.id);
// 广播已读回执
this.server.to(`room_${data.roomId}`).emit('message-read', {
messageId: data.messageId,
userId: user.id,
username: user.username,
readAt: new Date(),
});
}
@SubscribeMessage('get-online-users')
@UseGuards(WsJwtGuard)
async handleGetOnlineUsers(@ConnectedSocket() client: Socket) {
const onlineUsers = await this.webSocketService.getOnlineUsers();
client.emit('online-users-list', onlineUsers);
}
@SubscribeMessage('ping')
handlePing(@ConnectedSocket() client: Socket) {
client.emit('pong', { timestamp: new Date().toISOString() });
}
// 服务端主动推送消息的方法
public sendNotificationToUser(userId: string, notification: any) {
this.server.to(`user_${userId}`).emit('notification', notification);
}
public broadcastToRoom(roomId: string, event: string, data: any) {
this.server.to(`room_${roomId}`).emit(event, data);
}
}
src/websocket/dto/message.dto.ts
import { IsString, IsEnum, IsOptional, IsNotEmpty, MinLength, MaxLength } from 'class-validator';
export enum MessageType {
TEXT = 'text',
IMAGE = 'image',
FILE = 'file',
SYSTEM = 'system',
}
export class SendMessageDto {
@IsString()
@IsNotEmpty()
roomId: string;
@IsString()
@IsNotEmpty()
@MinLength(1)
@MaxLength(2000)
content: string;
@IsEnum(MessageType)
@IsOptional()
type?: MessageType = MessageType.TEXT;
@IsOptional()
metadata?: Record<string, any>;
}
export class JoinRoomDto {
@IsString()
@IsNotEmpty()
roomId: string;
}
src/websocket/websocket.service.ts
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { Message } from './entities/message.entity';
import { RedisService } from '../redis/redis.service';
@Injectable()
export class WebSocketService {
private onlineUsers = new Map<string, Set<string>>(); // userId -> socketIds
constructor(
@InjectRepository(Message)
private messageRepository: Repository<Message>,
private redisService: RedisService,
) {}
async validateUser(token: string): Promise<any> {
// 这里实现JWT验证逻辑
// 返回用户信息或null
try {
// 示例:解码token获取用户信息
// const payload = this.jwtService.verify(token);
// return { id: payload.sub, username: payload.username };
return { id: '1', username: 'testuser', avatar: 'avatar-url' };
} catch {
return null;
}
}
async addOnlineUser(userId: string, socketId: string): Promise<void> {
if (!this.onlineUsers.has(userId)) {
this.onlineUsers.set(userId, new Set());
}
this.onlineUsers.get(userId).add(socketId);
// 同时存储到Redis(用于集群部署)
await this.redisService.addOnlineUser(userId, socketId);
}
async removeOnlineUser(userId: string, socketId: string): Promise<void> {
const sockets = this.onlineUsers.get(userId);
if (sockets) {
sockets.delete(socketId);
if (sockets.size === 0) {
this.onlineUsers.delete(userId);
}
}
// 从Redis移除
await this.redisService.removeOnlineUser(userId, socketId);
}
async getOnlineUsers(): Promise<any[]> {
const users = [];
for (const [userId, sockets] of this.onlineUsers.entries()) {
if (sockets.size > 0) {
users.push({
userId,
socketCount: sockets.size,
// 可以添加更多用户信息
});
}
}
return users;
}
async saveMessage(messageData: {
senderId: string;
roomId: string;
content: string;
type: string;
}): Promise<Message> {
const message = this.messageRepository.create(messageData);
return await this.messageRepository.save(message);
}
async markMessageAsRead(messageId: string, userId: string): Promise<void> {
await this.messageRepository
.createQueryBuilder()
.update(Message)
.set({ readBy: () => `array_append(read_by, '${userId}')` })
.where('id = :messageId', { messageId })
.execute();
}
async getMessageHistory(roomId: string, page = 1, limit = 50): Promise<Message[]> {
return await this.messageRepository.find({
where: { roomId },
order: { createdAt: 'DESC' },
skip: (page - 1) * limit,
take: limit,
});
}
}
src/websocket/websocket.module.ts
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { WebSocketGateway } from './websocket.gateway';
import { WebSocketService } from './websocket.service';
import { Message } from './entities/message.entity';
import { RedisModule } from '../redis/redis.module';
@Module({
imports: [
TypeOrmModule.forFeature([Message]),
RedisModule,
],
providers: [WebSocketGateway, WebSocketService],
exports: [WebSocketGateway, WebSocketService],
})
export class WebSocketModule {}
src/app.module.ts
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { WebSocketModule } from './websocket/websocket.module';
import { ChatModule } from './chat/chat.module';
@Module({
imports: [
ConfigModule.forRoot({
isGlobal: true,
}),
TypeOrmModule.forRootAsync({
imports: [ConfigModule],
useFactory: (configService: ConfigService) => ({
type: 'postgres',
host: configService.get('DB_HOST'),
port: configService.get('DB_PORT'),
username: configService.get('DB_USERNAME'),
password: configService.get('DB_PASSWORD'),
database: configService.get('DB_DATABASE'),
entities: [__dirname + '/**/*.entity{.ts,.js}'],
synchronize: configService.get('NODE_ENV') !== 'production',
}),
inject: [ConfigService],
}),
WebSocketModule,
ChatModule,
],
})
export class AppModule {}
/src/main.ts
# main.ts 注册适配器
app.useWebSocketAdapter(new WsAdapter(app))
客户端代码
import { io } from "socket.io-client";
class WebSocketService {
constructor() {
this.socket = io("http://localhost:3000");
this.setupListeners();
}
setupListeners() {
// 监听服务器返回的消息
this.socket.on("operation-result", (data) => {
console.log("Operation result:", data);
});
this.socket.on("private-message", (data) => {
this.handlePrivateMessage(data);
});
}
// 加入房间
joinRoom(roomId, userId) {
this.socket.emit("join-room", { roomId, userId });
}
// 离开房间
leaveRoom(roomId) {
this.socket.emit("leave-room", roomId);
}
writing() {// 显示用户正在输入
this.socket.on('user-typing', (data) => {
console.log(`${data.username} is typing...`);
});
}
// 发送私信
sendPrivateMessage(toUserId, message) {
this.socket.emit("private-message", {
to: toUserId,
message,
timestamp: new Date().toISOString()
});
}
// 批量操作
sendBulkMessage(userIds, content) {
this.socket.emit("bulk-message", {
userIds,
content,
type: "notification"
});
}
// 高频事件(带节流)
throttledEmit = this.throttle((eventName, data) => {
this.socket.emit(eventName, data);
}, 1000);
// 风险操作(需要确认)
performRiskyOperation(operation, options = {}) {
return new Promise((resolve, reject) => {
this.socket.emit("risky-operation", {
operation,
...options
});
// 监听服务器响应
this.socket.once("operation-success", resolve);
this.socket.once("operation-error", reject);
});
}
}
执行流程
客户端发送流程:
- socket.emit('event-name', data)
↓ - 消息序列化为 WebSocket 帧
↓ - 通过 TCP/IP 发送到服务器
↓ - NestJS WebSocket 适配器接收
↓ - 路由到对应的 @SubscribeMessage 装饰器方法
↓ - 执行业务逻辑
↓ - 返回响应给客户端(可选)
服务器响应:
- 方法中使用 return 返回数据
- 使用 client.emit() 主动发送
- 使用 this.server.emit() 广播
注意:事件的名称要区分大小写,名称要完全一致,否则服务端无法处理事件