『博客开发日记』之升级WebSocket实时记录在线人数功能

本文最后更新于 2026年6月20日 凌晨

升级WebSocket实时记录在线人数功能


前言

在之前的开发中

WebSocket 对在线人数的观察是没有分前台和后台的

这样就会造成混乱

现在区分一下前后台的在线人数

还要给 ws 在线用户加上半小时的心跳周期

避免用户异常断开后的脏数据残留

在查 redis 的时候根据不同的分类去查

前台的就查 blogLogin

后台的就查 adminLogin


代码实现

下面之间贴出来升级后类的源码了

WebSocketAuthInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
/**
* WebSocket 连接鉴权拦截器。
*
* 作用:
* 1. 在 STOMP CONNECT 阶段校验 token。
* 2. 根据握手阶段写入的端点类型,限制前台/后台用户只能连接对应端点。
* 3. 如果没有 token,则自动按游客身份处理。
*/
@Component
public class WebSocketAuthInterceptor implements ChannelInterceptor
{
@Autowired
private RedisCache redisCache;

@Override
public Message<?> preSend(@NotNull Message<?> message, @NotNull MessageChannel channel)
{
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (Objects.isNull(accessor) || !StompCommand.CONNECT.equals(accessor.getCommand())){
return message;
}

// 从握手阶段保存的会话属性中读取端点类型,用于区分前台和后台连接
String endpointType = (String) accessor.getSessionAttributes().get(WebSocketConstants.ATTR_ENDPOINT_TYPE);
if (WebSocketConstants.ENDPOINT_TYPE_ADMIN.equals(endpointType)){
accessor.setHeader(WebSocketConstants.HEADER_LOGIN_TYPE, WebSocketConstants.LOGIN_TYPE_ADMIN);
} else if (WebSocketConstants.ENDPOINT_TYPE_BLOG.equals(endpointType)){
accessor.setHeader(WebSocketConstants.HEADER_LOGIN_TYPE, WebSocketConstants.LOGIN_TYPE_BLOG);
}

// 获取 STOMP CONNECT Header 中携带的 token
String token = accessor.getFirstNativeHeader(WebSocketConstants.HEADER_TOKEN);
if (!StringUtils.hasText(token)){
// 没有 token 时,按游客处理;如果前端传了 visitorId,则沿用,否则随机生成一个
String visitorId = accessor.getFirstNativeHeader(WebSocketConstants.HEADER_VISITOR_ID);
if (!StringUtils.hasText(visitorId)){
visitorId = UUID.randomUUID().toString().replace("-", "");
}
VisitorPrincipal principal = new VisitorPrincipal(visitorId);
accessor.setUser(principal);
accessor.setHeader(WebSocketConstants.HEADER_LOGIN_TYPE, WebSocketConstants.LOGIN_TYPE_VISITOR);
accessor.setHeader(WebSocketConstants.HEADER_VISITOR_ID, visitorId);
return message;
}

Claims claims;
try {
claims = JwtUtil.parseJWT(token);
} catch (Exception e) {
throw new IllegalArgumentException("token无效或过期!");
}

String userId = claims.getSubject();
LoginUser loginUser;
String loginType;

// 根据端点类型只查询对应的登录缓存,避免前台/后台用户 ID 串号导致冲突
if (WebSocketConstants.ENDPOINT_TYPE_ADMIN.equals(endpointType)){
loginUser = redisCache.getCacheObject(RedisKeyConstants.ADMIN_LOGIN_KEY_PREFIX + userId);
loginType = WebSocketConstants.LOGIN_TYPE_ADMIN;
} else if (WebSocketConstants.ENDPOINT_TYPE_BLOG.equals(endpointType)){
loginUser = redisCache.getCacheObject(RedisKeyConstants.BLOG_LOGIN_KEY_PREFIX + userId);
loginType = WebSocketConstants.LOGIN_TYPE_BLOG;
} else {
throw new IllegalArgumentException("未知的 WebSocket 端点类型!");
}

if (Objects.isNull(loginUser)){
throw new IllegalArgumentException("token无效或过期!");
}

// 绑定当前登录用户身份,后续可通过 Principal 获取当前用户
WebSocketPrincipal principal = new WebSocketPrincipal(userId, loginUser, loginType);
accessor.setUser(principal);
accessor.setHeader(WebSocketConstants.HEADER_LOGIN_USER, loginUser);
accessor.setHeader(WebSocketConstants.HEADER_LOGIN_TYPE, loginType);
accessor.setHeader(WebSocketConstants.HEADER_AUTHENTICATION, new UsernamePasswordAuthenticationToken(loginUser, null, null));
return message;
}

//WebSocket用户身份信息
public static class WebSocketPrincipal implements Principal
{
private final String name;

@Getter
private final LoginUser loginUser;

@Getter
private final String loginType;

public WebSocketPrincipal(String name, LoginUser loginUser, String loginType)
{
this.name = name;
this.loginUser = loginUser;
this.loginType = loginType;
}

@Override
public String getName()
{
return name;
}

}

//游客WebSocket身份信息
public static class VisitorPrincipal implements Principal
{
private final String name;

public VisitorPrincipal(String name)
{
this.name = name;
}

@Override
public String getName()
{
return name;
}
}
}


WebSocketConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* WebSocket 配置类
*
* 说明:
* 1. 前台与后台分别使用不同的 WebSocket 端点,便于按来源隔离连接。
* 2. 在握手阶段将端点类型写入会话属性,供后续鉴权拦截器使用。
*/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer
{
@Autowired
private WebSocketAuthInterceptor webSocketAuthInterceptor;

@Override
public void registerStompEndpoints(StompEndpointRegistry registry)
{
// 注册博客前台 WebSocket 端点,前台页面通过该地址建立连接
registry.addEndpoint("/ws/blog")
.addInterceptors(endpointTypeHandshakeInterceptor(WebSocketConstants.ENDPOINT_TYPE_BLOG))
.setAllowedOriginPatterns("*");

// 注册管理后台 WebSocket 端点,后台页面通过该地址建立连接
registry.addEndpoint("/ws/admin")
.addInterceptors(endpointTypeHandshakeInterceptor(WebSocketConstants.ENDPOINT_TYPE_ADMIN))
.setAllowedOriginPatterns("*");
}

@Override
public void configureMessageBroker(MessageBrokerRegistry registry)
{
// 客户端订阅消息的前缀,例如 /topic/**、/queue/**
registry.enableSimpleBroker("/topic", "/queue");
// 客户端发送消息到服务端的前缀,例如 /app/**
registry.setApplicationDestinationPrefixes("/app");
// 点对点消息前缀,例如 /user/**
registry.setUserDestinationPrefix("/user");
}

@Override
public void configureClientInboundChannel(ChannelRegistration registration)
{
// 拦截 STOMP CONNECT 请求,统一处理 token、游客身份以及端点权限校验
registration.interceptors(webSocketAuthInterceptor);
}

//握手拦截器,用于在 WebSocket 握手阶段写入端点类型。
private HandshakeInterceptor endpointTypeHandshakeInterceptor(String endpointType)
{
return new HandshakeInterceptor()
{
@Override
public boolean beforeHandshake(org.springframework.http.server.ServerHttpRequest request,
org.springframework.http.server.ServerHttpResponse response,
org.springframework.web.socket.WebSocketHandler wsHandler,
Map<String, Object> attributes)
{
// 将当前端点类型写入会话属性,供后续鉴权时判断连接来源
attributes.put(WebSocketConstants.ATTR_ENDPOINT_TYPE, endpointType);
return true;
}

@Override
public void afterHandshake(org.springframework.http.server.ServerHttpRequest request,
org.springframework.http.server.ServerHttpResponse response,
org.springframework.web.socket.WebSocketHandler wsHandler,
Exception exception)
{
// 这里无需额外处理,握手完成后端点类型已保存在 attributes 中
}
};
}
}

在 WebSocketConstants 中添加常量

1
2
3
4
5
6
7
8
9
/** WebSocket 端点类型 - 后台 */
public static final String ENDPOINT_TYPE_ADMIN = "admin";
/** WebSocket 端点类型 - 前台 */
public static final String ENDPOINT_TYPE_BLOG = "blog";
/** 握手阶段保存端点类型的属性名 */
public static final String ATTR_ENDPOINT_TYPE = "webSocketEndpointType";

/** WebSocket 端点心跳 在线记录的兜底过期时间,防止 WebSocket 异常断开后 Redis 脏数据长期存在*/
private static final long ONLINE_TTL_SECONDS = 1800L;

WebSocketEventListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
//WebSocket连接事件监听器
@Component
public class WebSocketEventListener
{
@Autowired
private OnlineUserService onlineUserService;

// STOMP 连接成功后记录在线用户,并在 Redis 中写入带过期时间的在线状态
@EventListener
public void handleSessionConnectEvent(SessionConnectEvent event)
{
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage());
String sessionId = accessor.getSessionId();
Principal principal = accessor.getUser();
if (principal instanceof WebSocketPrincipal webSocketPrincipal){
onlineUserService.online(sessionId, webSocketPrincipal.getName(), webSocketPrincipal.getLoginType());
return;
}
if (principal instanceof VisitorPrincipal visitorPrincipal){
onlineUserService.online(sessionId, visitorPrincipal.getName(), WebSocketConstants.LOGIN_TYPE_VISITOR);
}
}

//有人订阅在线人数主题时,补一次心跳续期,避免游客/用户连接在异常断开时长期残留
@EventListener
public void handleSessionSubscribeEvent(SessionSubscribeEvent event)
{
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage());
String destination = accessor.getDestination();
String sessionId = accessor.getSessionId();
// 订阅在线人数主题时顺手刷新一次 TTL,降低异常断开后脏数据残留的概率
if (WebSocketConstants.TOPIC_ADMIN_ONLINE_COUNT.equals(destination)){
onlineUserService.heartbeat(sessionId);
onlineUserService.pushOnlineCount(WebSocketConstants.LOGIN_TYPE_ADMIN);
return;
}
if (WebSocketConstants.TOPIC_BLOG_ONLINE_COUNT.equals(destination)){
onlineUserService.heartbeat(sessionId);
onlineUserService.pushOnlineCount(WebSocketConstants.LOGIN_TYPE_BLOG);
return;
}
if (WebSocketConstants.TOPIC_VISITOR_ONLINE_COUNT.equals(destination)){
onlineUserService.heartbeat(sessionId);
onlineUserService.pushOnlineCount(WebSocketConstants.LOGIN_TYPE_VISITOR);
}
}

// STOMP 断开连接后移除在线用户并更新在线人数
@EventListener
public void handleSessionDisconnectEvent(SessionDisconnectEvent event)
{
onlineUserService.offline(event.getSessionId());
}
}

OnlineUserServiceImpl 中的 online 和 heartbeat 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
//标记用户为上线
//建立 WebSocket 连接后记录 sessionId 与 userId 的关系
//然后维护用户的会话集合和分类在线用户集合,最后更新最新在线人数
@Override
public void online(String sessionId, String userId, String loginType)
{
//会话 ID 或用户 ID 为空时不处理,避免写入无效在线状态
if (!StringUtils.hasText(sessionId) || !StringUtils.hasText(userId) || !StringUtils.hasText(loginType)){
return;
}

String normalizedLoginType = normalizeLoginType(loginType);
String userSessionsKey = getUserSessionsKey(userId, normalizedLoginType);
String onlineUsersKey = getOnlineUsersKey(normalizedLoginType);

//将用户加入 redis对应在线用户集合(去重后)
// 全局 session 映射:用于后续离线时快速定位用户和登录类型
redisCache.setCacheMapValue(RedisKeyConstants.WS_ONLINE_SESSIONS_KEY, sessionId, buildSessionValue(userId, normalizedLoginType));
redisCache.expire(RedisKeyConstants.WS_ONLINE_SESSIONS_KEY, WebSocketConstants.ONLINE_TTL_SECONDS, TimeUnit.SECONDS);
// 用户会话集合:用于判断该用户是否还有其他活跃连接
redisCache.addToSet(userSessionsKey, sessionId);
redisCache.expire(userSessionsKey, WebSocketConstants.ONLINE_TTL_SECONDS, TimeUnit.SECONDS);
// 在线用户集合:用于统计指定登录类型的在线人数
redisCache.addToSet(onlineUsersKey, userId);
redisCache.expire(onlineUsersKey, WebSocketConstants.ONLINE_TTL_SECONDS, TimeUnit.SECONDS);
pushOnlineCount(normalizedLoginType);
}

@Override
public void heartbeat(String sessionId)
{
// 心跳只负责续期,不负责新增在线关系,避免无效 session 被误写入
if (!StringUtils.hasText(sessionId)){
return;
}

String sessionValue = redisCache.getCacheMapValue(RedisKeyConstants.WS_ONLINE_SESSIONS_KEY, sessionId);
if (!StringUtils.hasText(sessionValue)){
return;
}

// sessionValue 格式为 userId:loginType,用于定位对应的会话集合与在线集合
String[] sessionParts = sessionValue.split(":", 2);
if (sessionParts.length != 2 || !StringUtils.hasText(sessionParts[0]) || !StringUtils.hasText(sessionParts[1])){
return;
}

String userId = sessionParts[0];
String loginType = sessionParts[1];
String userSessionsKey = getUserSessionsKey(userId, loginType);
String onlineUsersKey = getOnlineUsersKey(loginType);

redisCache.expire(RedisKeyConstants.WS_ONLINE_SESSIONS_KEY, WebSocketConstants.ONLINE_TTL_SECONDS, TimeUnit.SECONDS);
redisCache.expire(userSessionsKey, WebSocketConstants.ONLINE_TTL_SECONDS, TimeUnit.SECONDS);
redisCache.expire(onlineUsersKey, WebSocketConstants.ONLINE_TTL_SECONDS, TimeUnit.SECONDS);
}



PS:该系列只做为作者学习开发项目做的笔记用

不一定符合读者来学习,仅供参考


预告

后续会记录博客的开发过程

每次学习会做一份笔记来进行发表

“一花一世界,一叶一菩提”


版权所有 © 2026 云梦泽
欢迎访问我的个人网站:https://hgt12.github.io/


『博客开发日记』之升级WebSocket实时记录在线人数功能
http://example.com/2026/06/14/『博客开发日记』之升级WebSocket实时记录在线人数功能/
作者
云梦泽
发布于
2026年6月14日
更新于
2026年6月20日
许可协议