『博客开发日记-后台』之WebSocket实时记录在线人数功能的实现

本文最后更新于 2026年6月6日 下午

WebSocket实时记录在线人数功能的实现


WebSocket实时记录在线人数功能的需求

根据用户token来标记和记录实时用户在线数

在线数同步至redis中

建立多个窗口会话时只记录一个在线用户

只有当该用户所有 WebSocket 会话都断开后才会从在线用户集合中移除


下面说一下这个功能依据什么原理来运行的

通过 WebSocket + STOMP 实时统计并更新当前在线用户数量

当后台用户建立 WebSocket 连接时

服务端会根据连接时

用户会携带的 JWT token 完成身份校验并将当前 WebSocket 会话与登录用户进行绑定

当连接断开时

服务端会清理对应会话信息

在线人数发生变化后会更新最新在线人数


代码实现

添加WebSocket依赖


创建 WebSocket配置类 WebSocketConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//WebSocket配置类
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer
{
@Autowired
private WebSocketAuthInterceptor webSocketAuthInterceptor;

@Override
public void registerStompEndpoints(StompEndpointRegistry registry)
{
//注册后台WebSocket连接端点
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*");
}

@Override
public void configureMessageBroker(MessageBrokerRegistry registry)
{
//客户端订阅消息的前缀
registry.enableSimpleBroker("/topic", "/queue");
//客户端发送消息到服务端的前缀
registry.setApplicationDestinationPrefixes("/app");
//点对点消息前缀
registry.setUserDestinationPrefix("/user");
}

@Override
public void configureClientInboundChannel(ChannelRegistration registration)
{
//校验STOMP CONNECT Header中的token
registration.interceptors(webSocketAuthInterceptor);
}
}

WebSocket连接鉴权拦截器 WebSocketAuthInterceptor 用于拦截不合规的访问请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
//WebSocket连接鉴权拦截器
@Component
public class WebSocketAuthInterceptor implements ChannelInterceptor
{
@Autowired
private RedisCache redisCache;

@Override
public Message<?> preSend(@NotNull Message<?> message, @NotNull MessageChannel channel)
{
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (Objects.isNull(accessor) || !StompCommand.CONNECT.equals(accessor.getCommand())){
return message;
}

//获取STOMP CONNECT Header中的token
String token = accessor.getFirstNativeHeader(WebSocketConstants.HEADER_TOKEN);
if (!StringUtils.hasText(token)){
String visitorId = accessor.getFirstNativeHeader(WebSocketConstants.HEADER_VISITOR_ID);
if (!StringUtils.hasText(visitorId)){
visitorId = UUID.randomUUID().toString().replace("-", "");
}
VisitorPrincipal principal = new VisitorPrincipal(visitorId);
accessor.setUser(principal);
accessor.setHeader(WebSocketConstants.HEADER_LOGIN_TYPE, WebSocketConstants.LOGIN_TYPE_VISITOR);
accessor.setHeader(WebSocketConstants.HEADER_VISITOR_ID, visitorId);
return message;
}

Claims claims;
try {
claims = JwtUtil.parseJWT(token);
} catch (Exception e) {
throw new IllegalArgumentException("token无效或过期!");
}

String userId = claims.getSubject();
LoginUser loginUser = redisCache.getCacheObject(WebSocketConstants.REDIS_ADMIN_LOGIN_KEY_PREFIX + userId);
String loginType = WebSocketConstants.LOGIN_TYPE_ADMIN;
if (Objects.isNull(loginUser)){
loginUser = redisCache.getCacheObject(WebSocketConstants.REDIS_BLOG_LOGIN_KEY_PREFIX + userId);
loginType = WebSocketConstants.LOGIN_TYPE_BLOG;
}
if (Objects.isNull(loginUser)){
throw new IllegalArgumentException("token无效或过期!");
}

//绑定当前登录用户身份,后续可通过Principal获取当前用户
WebSocketPrincipal principal = new WebSocketPrincipal(userId, loginUser, loginType);
accessor.setUser(principal);
accessor.setHeader(WebSocketConstants.HEADER_LOGIN_USER, loginUser);
accessor.setHeader(WebSocketConstants.HEADER_LOGIN_TYPE, loginType);
accessor.setHeader(WebSocketConstants.HEADER_AUTHENTICATION, new UsernamePasswordAuthenticationToken(loginUser, null, null));
return message;
}

//WebSocket用户身份信息
public static class WebSocketPrincipal implements Principal
{
private final String name;

@Getter
private final LoginUser loginUser;

@Getter
private final String loginType;

public WebSocketPrincipal(String name, LoginUser loginUser, String loginType)
{
this.name = name;
this.loginUser = loginUser;
this.loginType = loginType;
}

@Override
public String getName()
{
return name;
}

}

//游客WebSocket身份信息
public static class VisitorPrincipal implements Principal
{
private final String name;

public VisitorPrincipal(String name)
{
this.name = name;
}

@Override
public String getName()
{
return name;
}
}
}


给WebSocket连接端点放行


在 RedisCache 中新建方法用于服务WebSocket连接的数据缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 从 Set 中移除一个元素
*
* @param key Redis键
* @param value 值
* @return 移除数量
*/
public <T> long removeFromSet(final String key, final T value)
{
Long result = redisTemplate.opsForSet().remove(key, value);
return result == null ? 0 : result;
}

/**
* 获取 Set 元素数量
*
* @param key Redis键
* @return 元素数量
*/
public long getSetSize(final String key)
{
Long result = redisTemplate.opsForSet().size(key);
return result == null ? 0 : result;
}

新建 OnlineCountVo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 在线用户数量信息Vo
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OnlineCountVo implements Serializable
{
//当前在线用户数
private Long count;

//服务端更新时间戳
private Long timestamp;
}

新建 OnlineUserServiceImpl 和 OnlineUserService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
/**
* WebSocket在线用户服务实现
*/
@Service
public class OnlineUserServiceImpl implements OnlineUserService
{
@Autowired
private RedisCache redisCache;

@Autowired
private SimpMessagingTemplate messagingTemplate;

//标记用户为上线
//建立 WebSocket 连接后记录 sessionId 与 userId 的关系
//然后维护用户的会话集合和分类在线用户集合,最后更新最新在线人数
@Override
public void online(String sessionId, String userId, String loginType)
{
//会话 ID 或用户 ID 为空时不处理,避免写入无效在线状态
if (!StringUtils.hasText(sessionId) || !StringUtils.hasText(userId) || !StringUtils.hasText(loginType)){
return;
}

String normalizedLoginType = normalizeLoginType(loginType);
String userSessionsKey = getUserSessionsKey(userId, normalizedLoginType);
String onlineUsersKey = getOnlineUsersKey(normalizedLoginType);

//将用户加入 redis对应在线用户集合(去重后)
redisCache.setCacheMapValue(WebSocketConstants.REDIS_ONLINE_SESSIONS_KEY, sessionId, buildSessionValue(userId, normalizedLoginType));
redisCache.addToSet(userSessionsKey, sessionId);
redisCache.addToSet(onlineUsersKey, userId);
pushOnlineCount(normalizedLoginType);
}

//标记用户为离线
//WebSocket 连接断开后,根据 sessionId 删除会话映射
//如果该用户已经没有其他活跃会话则将其从在线用户集合中移除。
@Override
public void offline(String sessionId)
{
//会话 ID 为空时不处理。
if (!StringUtils.hasText(sessionId)){
return;
}

//通过 sessionId 查 userId 与 loginType,用于清理该用户的会话集合
String sessionValue = redisCache.getCacheMapValue(WebSocketConstants.REDIS_ONLINE_SESSIONS_KEY, sessionId);

//删除全局 sessionId -> userId/loginType 映射
redisCache.delCacheMapValue(WebSocketConstants.REDIS_ONLINE_SESSIONS_KEY, sessionId);
if (StringUtils.hasText(sessionValue)){
String[] sessionParts = sessionValue.split(":", 2);
if (sessionParts.length == 2 && StringUtils.hasText(sessionParts[0]) && StringUtils.hasText(sessionParts[1])){
String userId = sessionParts[0];
String loginType = sessionParts[1];
String userSessionsKey = getUserSessionsKey(userId, loginType);
String onlineUsersKey = getOnlineUsersKey(loginType);

//从用户会话集合中移除当前断开的 sessionId
redisCache.removeFromSet(userSessionsKey, sessionId);
long sessionCount = redisCache.getSetSize(userSessionsKey);

//如果该用户没有任何剩余会话,则认为用户真正离线。
if (sessionCount == 0){
redisCache.deleteObject(userSessionsKey);
redisCache.removeFromSet(onlineUsersKey, userId);
}

//离线状态变化后更新最新在线人数
pushOnlineCount(loginType);
}
}
}

//获取当前在线人数(默认返回后台在线人数)
@Override
public OnlineCountVo getOnlineCount()
{
return getOnlineCount(WebSocketConstants.LOGIN_TYPE_ADMIN);
}

@Override
public OnlineCountVo getOnlineCount(String loginType)
{
Set<String> onlineUsers = redisCache.getCacheSet(getOnlineUsersKey(normalizeLoginType(loginType)));
long count = onlineUsers == null ? 0L : onlineUsers.size();
return new OnlineCountVo(count, System.currentTimeMillis());
}

//更新当前在线人数(默认推送后台在线人数)
@Override
public void pushOnlineCount()
{
pushOnlineCount(WebSocketConstants.LOGIN_TYPE_ADMIN);
}

@Override
public void pushOnlineCount(String loginType)
{
String normalizedLoginType = normalizeLoginType(loginType);
String topic = getOnlineCountTopic(normalizedLoginType);
messagingTemplate.convertAndSend(topic, getOnlineCount(normalizedLoginType));
}

//拼接指定用户的 WebSocket 会话集合 Key
private String getUserSessionsKey(String userId, String loginType)
{
return WebSocketConstants.REDIS_USER_SESSIONS_KEY_PREFIX + loginType + ":" + userId;
}

private String getOnlineUsersKey(String loginType)
{
return WebSocketConstants.REDIS_ONLINE_USERS_KEY_PREFIX + loginType;
}

//获取在线数 topic
private String getOnlineCountTopic(String loginType)
{
if (WebSocketConstants.LOGIN_TYPE_VISITOR.equals(loginType)){
return WebSocketConstants.TOPIC_VISITOR_ONLINE_COUNT;
}
return WebSocketConstants.LOGIN_TYPE_BLOG.equals(loginType) ? WebSocketConstants.TOPIC_BLOG_ONLINE_COUNT : WebSocketConstants.TOPIC_ADMIN_ONLINE_COUNT;
}

private String buildSessionValue(String userId, String loginType)
{
return userId + ":" + loginType;
}

//登录用户类型
private String normalizeLoginType(String loginType)
{
if (!StringUtils.hasText(loginType)){
return WebSocketConstants.DEFAULT_LOGIN_TYPE;
}
return loginType;
}
}

OnlineUserService

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* WebSocket在线用户服务
*/
public interface OnlineUserService
{
void online(String sessionId, String userId);

void offline(String sessionId);

OnlineCountVo getOnlineCount();

void pushOnlineCount();
}

使用 WebSocketEventListener(WebSocket连接事件监听器) 来检测用户在线和离线状态并实时更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
//WebSocket连接事件监听器
@Component
public class WebSocketEventListener
{
@Autowired
private OnlineUserService onlineUserService;

//STOMP连接成功后记录在线用户并更新在线人数
@EventListener
public void handleSessionConnectEvent(SessionConnectEvent event)
{
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage());
String sessionId = accessor.getSessionId();
Principal principal = accessor.getUser();
if (principal instanceof WebSocketPrincipal webSocketPrincipal){
onlineUserService.online(sessionId, webSocketPrincipal.getName(), webSocketPrincipal.getLoginType());
return;
}
if (principal instanceof VisitorPrincipal visitorPrincipal){
onlineUserService.online(sessionId, visitorPrincipal.getName(), WebSocketConstants.LOGIN_TYPE_VISITOR);
}
}

//有人在预览页面则将人数推送当前在线人数
@EventListener
public void handleSessionSubscribeEvent(SessionSubscribeEvent event)
{
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage());
String destination = accessor.getDestination();
if (WebSocketConstants.TOPIC_ADMIN_ONLINE_COUNT.equals(destination)){
onlineUserService.pushOnlineCount(WebSocketConstants.LOGIN_TYPE_ADMIN);
return;
}
if (WebSocketConstants.TOPIC_BLOG_ONLINE_COUNT.equals(destination)){
onlineUserService.pushOnlineCount(WebSocketConstants.LOGIN_TYPE_BLOG);
return;
}
if (WebSocketConstants.TOPIC_VISITOR_ONLINE_COUNT.equals(destination)){
onlineUserService.pushOnlineCount(WebSocketConstants.LOGIN_TYPE_VISITOR);
}
}

//STOMP断开连接后移除在线用户并更新在线人数
@EventListener
public void handleSessionDisconnectEvent(SessionDisconnectEvent event)
{
onlineUserService.offline(event.getSessionId());
}
}


创建 WebSocketConstants 常量类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* WebSocket 相关常量
*/
public final class WebSocketConstants
{
//防止被误当成普通对象来创建
private WebSocketConstants() {}

/** 管理后台登录类型标识 */
public static final String LOGIN_TYPE_ADMIN = "admin";
/** 博客前台登录类型标识 */
public static final String LOGIN_TYPE_BLOG = "blog";
/** 游客登录类型标识 */
public static final String LOGIN_TYPE_VISITOR = "visitor";
/** WebSocket CONNECT 中携带的 token 请求头 */
public static final String HEADER_TOKEN = "token";
/** WebSocket CONNECT 中携带的访客 ID 请求头 */
public static final String HEADER_VISITOR_ID = "visitorId";
/** WebSocket CONNECT 中携带的登录类型请求头 */
public static final String HEADER_LOGIN_TYPE = "loginType";
/** WebSocket CONNECT 中携带的登录用户请求头 */
public static final String HEADER_LOGIN_USER = "loginUser";
/** WebSocket CONNECT 中携带的认证信息请求头 */
public static final String HEADER_AUTHENTICATION = "authentication";

/** 管理后台登录用户 Redis Key 前缀 */
public static final String REDIS_ADMIN_LOGIN_KEY_PREFIX = "adminLogin:";
/** 博客前台登录用户 Redis Key 前缀 */
public static final String REDIS_BLOG_LOGIN_KEY_PREFIX = "blogLogin:";

/** 在线用户统计 Redis Key 前缀 */
public static final String REDIS_ONLINE_USERS_KEY_PREFIX = "ws:online:users:";
/** 在线会话集合 Redis Key */
public static final String REDIS_ONLINE_SESSIONS_KEY = "ws:online:sessions";
/** 用户会话集合 Redis Key 前缀 */
public static final String REDIS_USER_SESSIONS_KEY_PREFIX = "ws:online:user:sessions:";

/** 管理后台在线人数订阅主题 */
public static final String TOPIC_ADMIN_ONLINE_COUNT = "/topic/admin/online-count";
/** 博客前台在线人数订阅主题 */
public static final String TOPIC_BLOG_ONLINE_COUNT = "/topic/blog/online-count";
/** 游客在线人数订阅主题 */
public static final String TOPIC_VISITOR_ONLINE_COUNT = "/topic/blog/visitor/online-count";

/** 默认登录类型 */
public static final String DEFAULT_LOGIN_TYPE = LOGIN_TYPE_ADMIN;
}



PS:该系列只做为作者学习开发项目做的笔记用

不一定符合读者来学习,仅供参考


预告

后续会记录博客的开发过程

每次学习会做一份笔记来进行发表

“一花一世界,一叶一菩提”


版权所有 © 2026 云梦泽
欢迎访问我的个人网站:https://hgt12.github.io/


『博客开发日记-后台』之WebSocket实时记录在线人数功能的实现
http://example.com/2026/06/03/『博客开发日记-后台』之WebSocket实时记录在线人数功能的实现/
作者
云梦泽
发布于
2026年6月3日
更新于
2026年6月6日
许可协议