本文最后更新于 2026年6月3日 晚上
WebSocket实时记录在线人数功能的实现
WebSocket实时记录在线人数功能的需求
根据用户token来标记和记录实时用户在线数
在线数同步至redis中
建立多个窗口会话时只记录一个在线用户
只有当该用户所有 WebSocket 会话都断开后才会从在线用户集合中移除
下面说一下这个功能依据什么原理来运行的
通过 WebSocket + STOMP 实时统计并更新当前在线用户数量
当后台用户建立 WebSocket 连接时
服务端会根据连接时
用户会携带的 JWT token 完成身份校验并将当前 WebSocket 会话与登录用户进行绑定
当连接断开时
服务端会清理对应会话信息
在线人数发生变化后会更新最新在线人数
代码实现
添加WebSocket依赖
创建 WebSocket配置类 WebSocketConfig
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Autowired private WebSocketAuthInterceptor webSocketAuthInterceptor;
@Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/ws") .setAllowedOriginPatterns("*"); }
@Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker("/topic", "/queue"); registry.setApplicationDestinationPrefixes("/app"); registry.setUserDestinationPrefix("/user"); }
@Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(webSocketAuthInterceptor); } }
|
WebSocket连接鉴权拦截器 WebSocketAuthInterceptor 用于拦截不合规的访问请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| @Component public class WebSocketAuthInterceptor implements ChannelInterceptor { @Autowired private RedisCache redisCache;
@Override public Message<?> preSend(@NotNull Message<?> message, @NotNull MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); if (Objects.isNull(accessor) || !StompCommand.CONNECT.equals(accessor.getCommand())){ return message; }
String token = accessor.getFirstNativeHeader("token"); if (!StringUtils.hasText(token)){ throw new IllegalArgumentException("token不能为空"); }
Claims claims; try { claims = JwtUtil.parseJWT(token); } catch (Exception e) { throw new IllegalArgumentException("token无效或过期!"); }
String userId = claims.getSubject(); LoginUser loginUser = redisCache.getCacheObject("adminLogin:" + userId); if (Objects.isNull(loginUser)){ throw new IllegalArgumentException("token无效或过期!"); }
WebSocketPrincipal principal = new WebSocketPrincipal(userId, loginUser); accessor.setUser(principal); accessor.setHeader("loginUser", loginUser); accessor.setHeader("authentication", new UsernamePasswordAuthenticationToken(loginUser, null, null)); return message; }
public static class WebSocketPrincipal implements Principal { private final String name;
@Getter private final LoginUser loginUser;
public WebSocketPrincipal(String name, LoginUser loginUser) { this.name = name; this.loginUser = loginUser; }
@Override public String getName() { return name; }
} }
|
给WebSocket连接端点放行
在 RedisCache 中新建方法用于服务WebSocket连接的数据缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
public <T> long removeFromSet(final String key, final T value) { Long result = redisTemplate.opsForSet().remove(key, value); return result == null ? 0 : result; }
public long getSetSize(final String key) { Long result = redisTemplate.opsForSet().size(key); return result == null ? 0 : result; }
|
新建 OnlineCountVo
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
@Data @NoArgsConstructor @AllArgsConstructor public class OnlineCountVo implements Serializable { private Long count;
private Long timestamp; }
|
新建 OnlineUserServiceImpl 和 OnlineUserService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
|
@Service public class OnlineUserServiceImpl implements OnlineUserService { private static final String ONLINE_COUNT_TOPIC = "/topic/online-count";
private static final String ONLINE_USERS_KEY = "ws:online:users";
private static final String ONLINE_SESSIONS_KEY = "ws:online:sessions";
private static final String USER_SESSIONS_KEY_PREFIX = "ws:online:user:sessions:";
@Autowired private RedisCache redisCache;
@Autowired private SimpMessagingTemplate messagingTemplate;
@Override public void online(String sessionId, String userId) { if (!StringUtils.hasText(sessionId) || !StringUtils.hasText(userId)){ return; }
redisCache.setCacheMapValue(ONLINE_SESSIONS_KEY, sessionId, userId);
redisCache.addToSet(getUserSessionsKey(userId), sessionId);
redisCache.addToSet(ONLINE_USERS_KEY, userId);
pushOnlineCount(); }
@Override public void offline(String sessionId) { if (!StringUtils.hasText(sessionId)){ return; }
String userId = redisCache.getCacheMapValue(ONLINE_SESSIONS_KEY, sessionId);
redisCache.delCacheMapValue(ONLINE_SESSIONS_KEY, sessionId); if (StringUtils.hasText(userId)){ String userSessionsKey = getUserSessionsKey(userId);
redisCache.removeFromSet(userSessionsKey, sessionId); long sessionCount = redisCache.getSetSize(userSessionsKey);
if (sessionCount == 0){ redisCache.deleteObject(userSessionsKey); redisCache.removeFromSet(ONLINE_USERS_KEY, userId); } } pushOnlineCount(); }
@Override public OnlineCountVo getOnlineCount() { Set<String> onlineUsers = redisCache.getCacheSet(ONLINE_USERS_KEY); long count = onlineUsers == null ? 0L : onlineUsers.size(); return new OnlineCountVo(count, System.currentTimeMillis()); }
@Override public void pushOnlineCount() { messagingTemplate.convertAndSend(ONLINE_COUNT_TOPIC, getOnlineCount()); }
private String getUserSessionsKey(String userId) { return USER_SESSIONS_KEY_PREFIX + userId; } }
|
OnlineUserService
1 2 3 4 5 6 7 8 9 10 11 12 13
|
public interface OnlineUserService { void online(String sessionId, String userId);
void offline(String sessionId);
OnlineCountVo getOnlineCount();
void pushOnlineCount(); }
|
使用 WebSocketEventListener(WebSocket连接事件监听器) 来检测用户在线和离线状态并实时更新
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| @Component public class WebSocketEventListener { private static final String ONLINE_COUNT_TOPIC = "/topic/online-count";
@Autowired private OnlineUserService onlineUserService;
@EventListener public void handleSessionConnectEvent(SessionConnectEvent event) { StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage()); String sessionId = accessor.getSessionId(); Principal principal = accessor.getUser(); if (Objects.nonNull(principal)){ onlineUserService.online(sessionId, principal.getName()); } }
@EventListener public void handleSessionSubscribeEvent(SessionSubscribeEvent event) { StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage()); String destination = accessor.getDestination(); if (ONLINE_COUNT_TOPIC.equals(destination)){ onlineUserService.pushOnlineCount(); } }
@EventListener public void handleSessionDisconnectEvent(SessionDisconnectEvent event) { onlineUserService.offline(event.getSessionId()); } }
|
PS:该系列只做为作者学习开发项目做的笔记用
不一定符合读者来学习,仅供参考
预告
后续会记录博客的开发过程
每次学习会做一份笔记来进行发表
“一花一世界,一叶一菩提”
版权所有 © 2026 云梦泽
欢迎访问我的个人网站:https://hgt12.github.io/