本文最后更新于 2026年6月6日 下午
WebSocket实时记录在线人数功能的实现
WebSocket实时记录在线人数功能的需求
根据用户token来标记和记录实时用户在线数
在线数同步至redis中
建立多个窗口会话时只记录一个在线用户
只有当该用户所有 WebSocket 会话都断开后才会从在线用户集合中移除
下面说一下这个功能依据什么原理来运行的
通过 WebSocket + STOMP 实时统计并更新当前在线用户数量
当后台用户建立 WebSocket 连接时
服务端会根据连接时
用户会携带的 JWT token 完成身份校验并将当前 WebSocket 会话与登录用户进行绑定
当连接断开时
服务端会清理对应会话信息
在线人数发生变化后会更新最新在线人数
代码实现
添加WebSocket依赖
创建 WebSocket配置类 WebSocketConfig
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Autowired private WebSocketAuthInterceptor webSocketAuthInterceptor;
@Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/ws") .setAllowedOriginPatterns("*"); }
@Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker("/topic", "/queue"); registry.setApplicationDestinationPrefixes("/app"); registry.setUserDestinationPrefix("/user"); }
@Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(webSocketAuthInterceptor); } }
|
WebSocket连接鉴权拦截器 WebSocketAuthInterceptor 用于拦截不合规的访问请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
| @Component public class WebSocketAuthInterceptor implements ChannelInterceptor { @Autowired private RedisCache redisCache;
@Override public Message<?> preSend(@NotNull Message<?> message, @NotNull MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); if (Objects.isNull(accessor) || !StompCommand.CONNECT.equals(accessor.getCommand())){ return message; }
String token = accessor.getFirstNativeHeader(WebSocketConstants.HEADER_TOKEN); if (!StringUtils.hasText(token)){ String visitorId = accessor.getFirstNativeHeader(WebSocketConstants.HEADER_VISITOR_ID); if (!StringUtils.hasText(visitorId)){ visitorId = UUID.randomUUID().toString().replace("-", ""); } VisitorPrincipal principal = new VisitorPrincipal(visitorId); accessor.setUser(principal); accessor.setHeader(WebSocketConstants.HEADER_LOGIN_TYPE, WebSocketConstants.LOGIN_TYPE_VISITOR); accessor.setHeader(WebSocketConstants.HEADER_VISITOR_ID, visitorId); return message; }
Claims claims; try { claims = JwtUtil.parseJWT(token); } catch (Exception e) { throw new IllegalArgumentException("token无效或过期!"); }
String userId = claims.getSubject(); LoginUser loginUser = redisCache.getCacheObject(WebSocketConstants.REDIS_ADMIN_LOGIN_KEY_PREFIX + userId); String loginType = WebSocketConstants.LOGIN_TYPE_ADMIN; if (Objects.isNull(loginUser)){ loginUser = redisCache.getCacheObject(WebSocketConstants.REDIS_BLOG_LOGIN_KEY_PREFIX + userId); loginType = WebSocketConstants.LOGIN_TYPE_BLOG; } if (Objects.isNull(loginUser)){ throw new IllegalArgumentException("token无效或过期!"); }
WebSocketPrincipal principal = new WebSocketPrincipal(userId, loginUser, loginType); accessor.setUser(principal); accessor.setHeader(WebSocketConstants.HEADER_LOGIN_USER, loginUser); accessor.setHeader(WebSocketConstants.HEADER_LOGIN_TYPE, loginType); accessor.setHeader(WebSocketConstants.HEADER_AUTHENTICATION, new UsernamePasswordAuthenticationToken(loginUser, null, null)); return message; }
public static class WebSocketPrincipal implements Principal { private final String name;
@Getter private final LoginUser loginUser;
@Getter private final String loginType;
public WebSocketPrincipal(String name, LoginUser loginUser, String loginType) { this.name = name; this.loginUser = loginUser; this.loginType = loginType; }
@Override public String getName() { return name; }
}
public static class VisitorPrincipal implements Principal { private final String name;
public VisitorPrincipal(String name) { this.name = name; }
@Override public String getName() { return name; } } }
|
给WebSocket连接端点放行
在 RedisCache 中新建方法用于服务WebSocket连接的数据缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
public <T> long removeFromSet(final String key, final T value) { Long result = redisTemplate.opsForSet().remove(key, value); return result == null ? 0 : result; }
public long getSetSize(final String key) { Long result = redisTemplate.opsForSet().size(key); return result == null ? 0 : result; }
|
新建 OnlineCountVo
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
@Data @NoArgsConstructor @AllArgsConstructor public class OnlineCountVo implements Serializable { private Long count;
private Long timestamp; }
|
新建 OnlineUserServiceImpl 和 OnlineUserService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
|
@Service public class OnlineUserServiceImpl implements OnlineUserService { @Autowired private RedisCache redisCache;
@Autowired private SimpMessagingTemplate messagingTemplate;
@Override public void online(String sessionId, String userId, String loginType) { if (!StringUtils.hasText(sessionId) || !StringUtils.hasText(userId) || !StringUtils.hasText(loginType)){ return; }
String normalizedLoginType = normalizeLoginType(loginType); String userSessionsKey = getUserSessionsKey(userId, normalizedLoginType); String onlineUsersKey = getOnlineUsersKey(normalizedLoginType);
redisCache.setCacheMapValue(WebSocketConstants.REDIS_ONLINE_SESSIONS_KEY, sessionId, buildSessionValue(userId, normalizedLoginType)); redisCache.addToSet(userSessionsKey, sessionId); redisCache.addToSet(onlineUsersKey, userId); pushOnlineCount(normalizedLoginType); }
@Override public void offline(String sessionId) { if (!StringUtils.hasText(sessionId)){ return; }
String sessionValue = redisCache.getCacheMapValue(WebSocketConstants.REDIS_ONLINE_SESSIONS_KEY, sessionId);
redisCache.delCacheMapValue(WebSocketConstants.REDIS_ONLINE_SESSIONS_KEY, sessionId); if (StringUtils.hasText(sessionValue)){ String[] sessionParts = sessionValue.split(":", 2); if (sessionParts.length == 2 && StringUtils.hasText(sessionParts[0]) && StringUtils.hasText(sessionParts[1])){ String userId = sessionParts[0]; String loginType = sessionParts[1]; String userSessionsKey = getUserSessionsKey(userId, loginType); String onlineUsersKey = getOnlineUsersKey(loginType);
redisCache.removeFromSet(userSessionsKey, sessionId); long sessionCount = redisCache.getSetSize(userSessionsKey);
if (sessionCount == 0){ redisCache.deleteObject(userSessionsKey); redisCache.removeFromSet(onlineUsersKey, userId); }
pushOnlineCount(loginType); } } }
@Override public OnlineCountVo getOnlineCount() { return getOnlineCount(WebSocketConstants.LOGIN_TYPE_ADMIN); }
@Override public OnlineCountVo getOnlineCount(String loginType) { Set<String> onlineUsers = redisCache.getCacheSet(getOnlineUsersKey(normalizeLoginType(loginType))); long count = onlineUsers == null ? 0L : onlineUsers.size(); return new OnlineCountVo(count, System.currentTimeMillis()); }
@Override public void pushOnlineCount() { pushOnlineCount(WebSocketConstants.LOGIN_TYPE_ADMIN); }
@Override public void pushOnlineCount(String loginType) { String normalizedLoginType = normalizeLoginType(loginType); String topic = getOnlineCountTopic(normalizedLoginType); messagingTemplate.convertAndSend(topic, getOnlineCount(normalizedLoginType)); }
private String getUserSessionsKey(String userId, String loginType) { return WebSocketConstants.REDIS_USER_SESSIONS_KEY_PREFIX + loginType + ":" + userId; }
private String getOnlineUsersKey(String loginType) { return WebSocketConstants.REDIS_ONLINE_USERS_KEY_PREFIX + loginType; }
private String getOnlineCountTopic(String loginType) { if (WebSocketConstants.LOGIN_TYPE_VISITOR.equals(loginType)){ return WebSocketConstants.TOPIC_VISITOR_ONLINE_COUNT; } return WebSocketConstants.LOGIN_TYPE_BLOG.equals(loginType) ? WebSocketConstants.TOPIC_BLOG_ONLINE_COUNT : WebSocketConstants.TOPIC_ADMIN_ONLINE_COUNT; }
private String buildSessionValue(String userId, String loginType) { return userId + ":" + loginType; }
private String normalizeLoginType(String loginType) { if (!StringUtils.hasText(loginType)){ return WebSocketConstants.DEFAULT_LOGIN_TYPE; } return loginType; } }
|
OnlineUserService
1 2 3 4 5 6 7 8 9 10 11 12 13
|
public interface OnlineUserService { void online(String sessionId, String userId);
void offline(String sessionId);
OnlineCountVo getOnlineCount();
void pushOnlineCount(); }
|
使用 WebSocketEventListener(WebSocket连接事件监听器) 来检测用户在线和离线状态并实时更新
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| @Component public class WebSocketEventListener { @Autowired private OnlineUserService onlineUserService;
@EventListener public void handleSessionConnectEvent(SessionConnectEvent event) { StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage()); String sessionId = accessor.getSessionId(); Principal principal = accessor.getUser(); if (principal instanceof WebSocketPrincipal webSocketPrincipal){ onlineUserService.online(sessionId, webSocketPrincipal.getName(), webSocketPrincipal.getLoginType()); return; } if (principal instanceof VisitorPrincipal visitorPrincipal){ onlineUserService.online(sessionId, visitorPrincipal.getName(), WebSocketConstants.LOGIN_TYPE_VISITOR); } }
@EventListener public void handleSessionSubscribeEvent(SessionSubscribeEvent event) { StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage()); String destination = accessor.getDestination(); if (WebSocketConstants.TOPIC_ADMIN_ONLINE_COUNT.equals(destination)){ onlineUserService.pushOnlineCount(WebSocketConstants.LOGIN_TYPE_ADMIN); return; } if (WebSocketConstants.TOPIC_BLOG_ONLINE_COUNT.equals(destination)){ onlineUserService.pushOnlineCount(WebSocketConstants.LOGIN_TYPE_BLOG); return; } if (WebSocketConstants.TOPIC_VISITOR_ONLINE_COUNT.equals(destination)){ onlineUserService.pushOnlineCount(WebSocketConstants.LOGIN_TYPE_VISITOR); } }
@EventListener public void handleSessionDisconnectEvent(SessionDisconnectEvent event) { onlineUserService.offline(event.getSessionId()); } }
|
创建 WebSocketConstants 常量类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
|
public final class WebSocketConstants { private WebSocketConstants() {}
public static final String LOGIN_TYPE_ADMIN = "admin"; public static final String LOGIN_TYPE_BLOG = "blog"; public static final String LOGIN_TYPE_VISITOR = "visitor"; public static final String HEADER_TOKEN = "token"; public static final String HEADER_VISITOR_ID = "visitorId"; public static final String HEADER_LOGIN_TYPE = "loginType"; public static final String HEADER_LOGIN_USER = "loginUser"; public static final String HEADER_AUTHENTICATION = "authentication";
public static final String REDIS_ADMIN_LOGIN_KEY_PREFIX = "adminLogin:"; public static final String REDIS_BLOG_LOGIN_KEY_PREFIX = "blogLogin:";
public static final String REDIS_ONLINE_USERS_KEY_PREFIX = "ws:online:users:"; public static final String REDIS_ONLINE_SESSIONS_KEY = "ws:online:sessions"; public static final String REDIS_USER_SESSIONS_KEY_PREFIX = "ws:online:user:sessions:";
public static final String TOPIC_ADMIN_ONLINE_COUNT = "/topic/admin/online-count"; public static final String TOPIC_BLOG_ONLINE_COUNT = "/topic/blog/online-count"; public static final String TOPIC_VISITOR_ONLINE_COUNT = "/topic/blog/visitor/online-count";
public static final String DEFAULT_LOGIN_TYPE = LOGIN_TYPE_ADMIN; }
|
PS:该系列只做为作者学习开发项目做的笔记用
不一定符合读者来学习,仅供参考
预告
后续会记录博客的开发过程
每次学习会做一份笔记来进行发表
“一花一世界,一叶一菩提”
版权所有 © 2026 云梦泽
欢迎访问我的个人网站:https://hgt12.github.io/