『博客开发日记-后台』之WebSocket实时记录在线人数功能的实现

本文最后更新于 2026年6月3日 晚上

WebSocket实时记录在线人数功能的实现


WebSocket实时记录在线人数功能的需求

根据用户token来标记和记录实时用户在线数

在线数同步至redis中

建立多个窗口会话时只记录一个在线用户

只有当该用户所有 WebSocket 会话都断开后才会从在线用户集合中移除


下面说一下这个功能依据什么原理来运行的

通过 WebSocket + STOMP 实时统计并更新当前在线用户数量

当后台用户建立 WebSocket 连接时

服务端会根据连接时

用户会携带的 JWT token 完成身份校验并将当前 WebSocket 会话与登录用户进行绑定

当连接断开时

服务端会清理对应会话信息

在线人数发生变化后会更新最新在线人数


代码实现

添加WebSocket依赖


创建 WebSocket配置类 WebSocketConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//WebSocket配置类
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer
{
@Autowired
private WebSocketAuthInterceptor webSocketAuthInterceptor;

@Override
public void registerStompEndpoints(StompEndpointRegistry registry)
{
//注册后台WebSocket连接端点
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*");
}

@Override
public void configureMessageBroker(MessageBrokerRegistry registry)
{
//客户端订阅消息的前缀
registry.enableSimpleBroker("/topic", "/queue");
//客户端发送消息到服务端的前缀
registry.setApplicationDestinationPrefixes("/app");
//点对点消息前缀
registry.setUserDestinationPrefix("/user");
}

@Override
public void configureClientInboundChannel(ChannelRegistration registration)
{
//校验STOMP CONNECT Header中的token
registration.interceptors(webSocketAuthInterceptor);
}
}

WebSocket连接鉴权拦截器 WebSocketAuthInterceptor 用于拦截不合规的访问请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
//WebSocket连接鉴权拦截器
@Component
public class WebSocketAuthInterceptor implements ChannelInterceptor
{
@Autowired
private RedisCache redisCache;

@Override
public Message<?> preSend(@NotNull Message<?> message, @NotNull MessageChannel channel)
{
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (Objects.isNull(accessor) || !StompCommand.CONNECT.equals(accessor.getCommand())){
return message;
}

//获取STOMP CONNECT Header中的token
String token = accessor.getFirstNativeHeader("token");
if (!StringUtils.hasText(token)){
throw new IllegalArgumentException("token不能为空");
}

Claims claims;
try {
claims = JwtUtil.parseJWT(token);
} catch (Exception e) {
throw new IllegalArgumentException("token无效或过期!");
}

String userId = claims.getSubject();
LoginUser loginUser = redisCache.getCacheObject("adminLogin:" + userId);
if (Objects.isNull(loginUser)){
throw new IllegalArgumentException("token无效或过期!");
}

//绑定当前登录用户身份,后续可通过Principal获取当前用户
WebSocketPrincipal principal = new WebSocketPrincipal(userId, loginUser);
accessor.setUser(principal);
accessor.setHeader("loginUser", loginUser);
accessor.setHeader("authentication", new UsernamePasswordAuthenticationToken(loginUser, null, null));
return message;
}

//WebSocket用户身份信息
public static class WebSocketPrincipal implements Principal
{
private final String name;

@Getter
private final LoginUser loginUser;

public WebSocketPrincipal(String name, LoginUser loginUser)
{
this.name = name;
this.loginUser = loginUser;
}

@Override
public String getName()
{
return name;
}

}
}

给WebSocket连接端点放行


在 RedisCache 中新建方法用于服务WebSocket连接的数据缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 从 Set 中移除一个元素
*
* @param key Redis键
* @param value 值
* @return 移除数量
*/
public <T> long removeFromSet(final String key, final T value)
{
Long result = redisTemplate.opsForSet().remove(key, value);
return result == null ? 0 : result;
}

/**
* 获取 Set 元素数量
*
* @param key Redis键
* @return 元素数量
*/
public long getSetSize(final String key)
{
Long result = redisTemplate.opsForSet().size(key);
return result == null ? 0 : result;
}

新建 OnlineCountVo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 在线用户数量信息Vo
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OnlineCountVo implements Serializable
{
//当前在线用户数
private Long count;

//服务端更新时间戳
private Long timestamp;
}

新建 OnlineUserServiceImpl 和 OnlineUserService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
/**
* WebSocket在线用户服务实现
*/
@Service
public class OnlineUserServiceImpl implements OnlineUserService
{
private static final String ONLINE_COUNT_TOPIC = "/topic/online-count";

//在线用户集合 Key,集合成员为 userId。
private static final String ONLINE_USERS_KEY = "ws:online:users";

//WebSocket 会话与用户的映射 Key,用于断开连接时反查用户
private static final String ONLINE_SESSIONS_KEY = "ws:online:sessions";

//单个用户对应的 WebSocket 会话集合 Key 前缀
private static final String USER_SESSIONS_KEY_PREFIX = "ws:online:user:sessions:";

@Autowired
private RedisCache redisCache;

@Autowired
private SimpMessagingTemplate messagingTemplate;

//标记用户为上线
//建立 WebSocket 连接后记录 sessionId 与 userId 的关系
//然后维护用户的会话集合和全局在线用户集合,最后更新最新在线人数
@Override
public void online(String sessionId, String userId)
{
//会话 ID 或用户 ID 为空时不处理,避免写入无效在线状态
if (!StringUtils.hasText(sessionId) || !StringUtils.hasText(userId)){
return;
}

//保存 sessionId -> userId 映射,方便离线时根据 sessionId 找到对应用户
redisCache.setCacheMapValue(ONLINE_SESSIONS_KEY, sessionId, userId);

//保存当前用户拥有的 sessionId(支持同一用户多端或多标签页同时在线)
redisCache.addToSet(getUserSessionsKey(userId), sessionId);

//将用户加入在线用户集合(去重后)
redisCache.addToSet(ONLINE_USERS_KEY, userId);

//在线状态变化后向前端更新最新在线人数
pushOnlineCount();
}

//标记用户为离线
//WebSocket 连接断开后,根据 sessionId 删除会话映射
//如果该用户已经没有其他活跃会话则将其从在线用户集合中移除。
@Override
public void offline(String sessionId)
{
//会话 ID 为空时不处理。
if (!StringUtils.hasText(sessionId)){
return;
}

//通过 sessionId 查 userId,用于清理该用户的会话集合
String userId = redisCache.getCacheMapValue(ONLINE_SESSIONS_KEY, sessionId);

//删除全局 sessionId -> userId 映射
redisCache.delCacheMapValue(ONLINE_SESSIONS_KEY, sessionId);
if (StringUtils.hasText(userId)){
String userSessionsKey = getUserSessionsKey(userId);

//从用户会话集合中移除当前断开的 sessionId
redisCache.removeFromSet(userSessionsKey, sessionId);
long sessionCount = redisCache.getSetSize(userSessionsKey);

//如果该用户没有任何剩余会话,则认为用户真正离线。
if (sessionCount == 0){
redisCache.deleteObject(userSessionsKey);
redisCache.removeFromSet(ONLINE_USERS_KEY, userId);
}
}
//离线状态变化后更新最新在线人数
pushOnlineCount();
}

//获取当前在线人数
@Override
public OnlineCountVo getOnlineCount()
{
Set<String> onlineUsers = redisCache.getCacheSet(ONLINE_USERS_KEY);
long count = onlineUsers == null ? 0L : onlineUsers.size();
return new OnlineCountVo(count, System.currentTimeMillis());
}

//更新当前在线人数
@Override
public void pushOnlineCount()
{
messagingTemplate.convertAndSend(ONLINE_COUNT_TOPIC, getOnlineCount());
}

//拼接指定用户的 WebSocket 会话集合 Key
private String getUserSessionsKey(String userId)
{
return USER_SESSIONS_KEY_PREFIX + userId;
}
}

OnlineUserService

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* WebSocket在线用户服务
*/
public interface OnlineUserService
{
void online(String sessionId, String userId);

void offline(String sessionId);

OnlineCountVo getOnlineCount();

void pushOnlineCount();
}

使用 WebSocketEventListener(WebSocket连接事件监听器) 来检测用户在线和离线状态并实时更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//WebSocket连接事件监听器
@Component
public class WebSocketEventListener
{
private static final String ONLINE_COUNT_TOPIC = "/topic/online-count";

@Autowired
private OnlineUserService onlineUserService;

//STOMP连接成功后记录在线用户并更新在线人数
@EventListener
public void handleSessionConnectEvent(SessionConnectEvent event)
{
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage());
String sessionId = accessor.getSessionId();
Principal principal = accessor.getUser();
if (Objects.nonNull(principal)){
onlineUserService.online(sessionId, principal.getName());
}
}

//前端有人访问页面后立即给当前会话更新一次当前在线人数
@EventListener
public void handleSessionSubscribeEvent(SessionSubscribeEvent event)
{
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage());
String destination = accessor.getDestination();
if (ONLINE_COUNT_TOPIC.equals(destination)){
onlineUserService.pushOnlineCount();
}
}

//STOMP断开连接后移除在线用户并更新在线人数
@EventListener
public void handleSessionDisconnectEvent(SessionDisconnectEvent event)
{
onlineUserService.offline(event.getSessionId());
}
}




PS:该系列只做为作者学习开发项目做的笔记用

不一定符合读者来学习,仅供参考


预告

后续会记录博客的开发过程

每次学习会做一份笔记来进行发表

“一花一世界,一叶一菩提”


版权所有 © 2026 云梦泽
欢迎访问我的个人网站:https://hgt12.github.io/


『博客开发日记-后台』之WebSocket实时记录在线人数功能的实现
http://example.com/2026/06/03/『博客开发日记-后台』之WebSocket实时记录在线人数功能的实现/
作者
云梦泽
发布于
2026年6月3日
更新于
2026年6月3日
许可协议