feat: initial iShare project code

This commit is contained in:
purovps
2026-02-16 23:20:59 +08:00
parent 8c83a6fd46
commit 6f270a972e
1910 changed files with 218015 additions and 0 deletions

View File

@@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.pig4cloud</groupId>
<artifactId>pigx-common</artifactId>
<version>5.2.0</version>
</parent>
<artifactId>pigx-common-websocket</artifactId>
<packaging>jar</packaging>
<description>pigx websocket</description>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-json</artifactId>
</dependency>
<dependency>
<groupId>com.pig4cloud</groupId>
<artifactId>pigx-common-security</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,30 @@
package com.pig4cloud.pigx.common.websocket.config;
import com.pig4cloud.pigx.common.websocket.distribute.LocalMessageDistributor;
import com.pig4cloud.pigx.common.websocket.distribute.MessageDistributor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 本地的消息分发器配置
*
* @author hccake
*/
@ConditionalOnProperty(prefix = WebSocketProperties.PREFIX, name = "message-distributor",
havingValue = MessageDistributorTypeConstants.LOCAL)
@Configuration(proxyBeanMethods = false)
public class LocalMessageDistributorConfiguration {
/**
* 本地基于内存的消息分发,不支持集群
* @return LocalMessageDistributor
*/
@Bean
@ConditionalOnMissingBean(MessageDistributor.class)
public LocalMessageDistributor messageDistributor() {
return new LocalMessageDistributor();
}
}

View File

@@ -0,0 +1,26 @@
package com.pig4cloud.pigx.common.websocket.config;
/**
* @author hccake
*/
public final class MessageDistributorTypeConstants {
private MessageDistributorTypeConstants() {
}
/**
* 本地
*/
public static final String LOCAL = "local";
/**
* 基于 Redis PUB/SUB
*/
public static final String REDIS = "redis";
/**
* 自定义
*/
public static final String CUSTOM = "custom";
}

View File

@@ -0,0 +1,70 @@
package com.pig4cloud.pigx.common.websocket.config;
import com.pig4cloud.pigx.common.websocket.distribute.MessageDistributor;
import com.pig4cloud.pigx.common.websocket.distribute.RedisMessageDistributor;
import com.pig4cloud.pigx.common.websocket.distribute.RedisWebsocketMessageListener;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import javax.annotation.PostConstruct;
/**
* 基于 Redis Pub/Sub 的消息分发器
*
* @author hccake
*/
@ConditionalOnClass(StringRedisTemplate.class)
@ConditionalOnProperty(prefix = WebSocketProperties.PREFIX, name = "message-distributor",
havingValue = MessageDistributorTypeConstants.REDIS, matchIfMissing = true)
@Configuration(proxyBeanMethods = false)
public class RedisMessageDistributorConfiguration {
@Bean
@ConditionalOnMissingBean(MessageDistributor.class)
public RedisMessageDistributor messageDistributor(StringRedisTemplate stringRedisTemplate) {
return new RedisMessageDistributor(stringRedisTemplate);
}
@Bean
@ConditionalOnBean(RedisMessageDistributor.class)
@ConditionalOnMissingBean
public RedisWebsocketMessageListener redisWebsocketMessageListener(StringRedisTemplate stringRedisTemplate) {
return new RedisWebsocketMessageListener(stringRedisTemplate);
}
@Bean
@ConditionalOnBean(RedisMessageDistributor.class)
@ConditionalOnMissingBean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(MessageDistributor.class)
@RequiredArgsConstructor
static class RedisMessageListenerRegisterConfiguration {
private final RedisMessageListenerContainer redisMessageListenerContainer;
private final RedisWebsocketMessageListener redisWebsocketMessageListener;
@PostConstruct
public void addMessageListener() {
redisMessageListenerContainer.addMessageListener(redisWebsocketMessageListener,
new PatternTopic(RedisWebsocketMessageListener.CHANNEL));
}
}
}

View File

@@ -0,0 +1,50 @@
package com.pig4cloud.pigx.common.websocket.config;
import com.pig4cloud.pigx.common.websocket.handler.JsonMessageHandler;
import com.pig4cloud.pigx.common.websocket.holder.JsonMessageHandlerHolder;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.server.HandshakeInterceptor;
import javax.annotation.PostConstruct;
import java.util.List;
/**
* websocket自动配置
*
* @author Yakir
*/
@Import(WebSocketHandlerConfig.class)
@EnableWebSocket
@RequiredArgsConstructor
public class WebSocketAutoConfiguration {
private final WebSocketProperties webSocketProperties;
private final List<JsonMessageHandler> jsonMessageHandlerList;
@Bean
@ConditionalOnMissingBean
public WebSocketConfigurer webSocketConfigurer(List<HandshakeInterceptor> handshakeInterceptor,
WebSocketHandler webSocketHandler) {
return registry -> registry.addHandler(webSocketHandler, webSocketProperties.getPath())
.setAllowedOrigins(webSocketProperties.getAllowOrigins())
.addInterceptors(handshakeInterceptor.toArray(new HandshakeInterceptor[0]));
}
/**
* 初始化时将所有的jsonMessageHandler注册到JsonMessageHandlerHolder中
*/
@PostConstruct
public void initJsonMessageHandlerHolder() {
for (JsonMessageHandler jsonMessageHandler : jsonMessageHandlerList) {
JsonMessageHandlerHolder.addHandler(jsonMessageHandler);
}
}
}

View File

@@ -0,0 +1,78 @@
package com.pig4cloud.pigx.common.websocket.config;
import com.pig4cloud.pigx.common.websocket.custom.PigxSessionKeyGenerator;
import com.pig4cloud.pigx.common.websocket.custom.UserAttributeHandshakeInterceptor;
import com.pig4cloud.pigx.common.websocket.handler.CustomPlanTextMessageHandler;
import com.pig4cloud.pigx.common.websocket.handler.CustomWebSocketHandler;
import com.pig4cloud.pigx.common.websocket.handler.PingJsonMessageHandler;
import com.pig4cloud.pigx.common.websocket.handler.PlanTextMessageHandler;
import com.pig4cloud.pigx.common.websocket.holder.MapSessionWebSocketHandlerDecorator;
import com.pig4cloud.pigx.common.websocket.holder.SessionKeyGenerator;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
/**
* @author Hccake 2021/1/5
* @version 1.0
*/
@RequiredArgsConstructor
@EnableConfigurationProperties(WebSocketProperties.class)
public class WebSocketHandlerConfig {
private final WebSocketProperties webSocketProperties;
@Bean
@ConditionalOnMissingBean(SessionKeyGenerator.class)
public SessionKeyGenerator sessionKeyGenerator() {
return new PigxSessionKeyGenerator();
}
@Bean
public HandshakeInterceptor handshakeInterceptor() {
return new UserAttributeHandshakeInterceptor();
}
@Bean
@ConditionalOnMissingBean(PlanTextMessageHandler.class)
public PlanTextMessageHandler planTextMessageHandler() {
return new CustomPlanTextMessageHandler();
}
@Bean
@ConditionalOnMissingBean({ TextWebSocketHandler.class, PlanTextMessageHandler.class })
public WebSocketHandler webSocketHandler1(@Autowired(required = false) SessionKeyGenerator sessionKeyGenerator) {
CustomWebSocketHandler customWebSocketHandler = new CustomWebSocketHandler();
if (webSocketProperties.isMapSession()) {
return new MapSessionWebSocketHandlerDecorator(customWebSocketHandler, sessionKeyGenerator);
}
return customWebSocketHandler;
}
@Bean
@ConditionalOnBean(PlanTextMessageHandler.class)
@ConditionalOnMissingBean(TextWebSocketHandler.class)
public WebSocketHandler webSocketHandler2(@Autowired(required = false) SessionKeyGenerator sessionKeyGenerator,
PlanTextMessageHandler planTextMessageHandler) {
CustomWebSocketHandler customWebSocketHandler = new CustomWebSocketHandler(planTextMessageHandler);
if (webSocketProperties.isMapSession()) {
return new MapSessionWebSocketHandlerDecorator(customWebSocketHandler, sessionKeyGenerator);
}
return customWebSocketHandler;
}
@Bean
@ConditionalOnProperty(prefix = WebSocketProperties.PREFIX, name = "heartbeat", havingValue = "true",
matchIfMissing = true)
public PingJsonMessageHandler pingJsonMessageHandler() {
return new PingJsonMessageHandler();
}
}

View File

@@ -0,0 +1,61 @@
package com.pig4cloud.pigx.common.websocket.config;
import cn.hutool.json.JSONUtil;
import com.pig4cloud.pigx.common.websocket.holder.WebSocketSessionHolder;
import com.pig4cloud.pigx.common.websocket.message.JsonWebSocketMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.Collection;
/**
* @author Hccake 2021/1/4
* @version 1.0
*/
@Slf4j
public class WebSocketMessageSender {
public static void broadcast(String message) {
Collection<WebSocketSession> sessions = WebSocketSessionHolder.getSessions();
for (WebSocketSession session : sessions) {
send(session, message);
}
}
public static boolean send(Object sessionKey, String message) {
WebSocketSession session = WebSocketSessionHolder.getSession(sessionKey);
if (session == null) {
log.info("[send] 当前 sessionKey{} 对应 session 不在本服务中", sessionKey);
return false;
}
else {
return send(session, message);
}
}
public static void send(WebSocketSession session, JsonWebSocketMessage message) {
send(session, JSONUtil.toJsonStr(message));
}
public static boolean send(WebSocketSession session, String message) {
if (session == null) {
log.error("[send] session 为 null");
return false;
}
if (!session.isOpen()) {
log.error("[send] session 已经关闭");
return false;
}
try {
session.sendMessage(new TextMessage(message));
}
catch (IOException e) {
log.error("[send] session({}) 发送消息({}) 异常", session, message, e);
return false;
}
return true;
}
}

View File

@@ -0,0 +1,49 @@
package com.pig4cloud.pigx.common.websocket.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* websocket props
*
* @author Yakir
*/
@Data
@ConfigurationProperties(WebSocketProperties.PREFIX)
public class WebSocketProperties {
public static final String PREFIX = "pigx.websocket";
/**
* 路径: 无参: /ws 有参: PathVariable: 单参: /ws/{test} 多参: /ws/{test1}/{test2} query:
* /ws?uid=1&name=test
*
*/
private String path = "/ws/info";
/**
* 允许访问源
*/
private String allowOrigins = "*";
/**
* 是否支持部分消息
*/
private boolean supportPartialMessages = false;
/**
* 心跳处理
*/
private boolean heartbeat = true;
/**
* 是否开启对session的映射记录
*/
private boolean mapSession = true;
/**
* 消息分发器local | redis默认 local, 如果自定义的话,可以配置为其他任意值
*/
private String messageDistributor = MessageDistributorTypeConstants.LOCAL;
}

View File

@@ -0,0 +1,36 @@
package com.pig4cloud.pigx.common.websocket.custom;
import com.pig4cloud.pigx.common.security.service.PigxUser;
import com.pig4cloud.pigx.common.websocket.holder.SessionKeyGenerator;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.WebSocketSession;
/**
* @author lengleng
* @date 2021/10/4 websocket session 标识生成规则
*/
@Configuration
@RequiredArgsConstructor
public class PigxSessionKeyGenerator implements SessionKeyGenerator {
/**
* 获取当前session的唯一标识
* @param webSocketSession 当前session
* @return session唯一标识
*/
@Override
public Object sessionKey(WebSocketSession webSocketSession) {
Object obj = webSocketSession.getAttributes().get("USER_KEY_ATTR_NAME");
if (obj instanceof PigxUser) {
PigxUser user = (PigxUser) obj;
// userId 作为唯一区分
return String.valueOf(user.getId());
}
return null;
}
}

View File

@@ -0,0 +1,52 @@
package com.pig4cloud.pigx.common.websocket.custom;
import com.pig4cloud.pigx.common.security.service.PigxUser;
import com.pig4cloud.pigx.common.security.util.SecurityUtils;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.Map;
/**
* @author lengleng
* @date 2021/10/4
*/
public class UserAttributeHandshakeInterceptor implements HandshakeInterceptor {
/**
* Invoked before the handshake is processed.
* @param request the current request
* @param response the current response
* @param wsHandler the target WebSocket handler
* @param attributes the attributes from the HTTP handshake to associate with the
* WebSocket session; the provided attributes are copied, the original map is not
* used.
* @return whether to proceed with the handshake ({@code true}) or abort
* ({@code false})
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Map<String, Object> attributes) throws Exception {
// 由于 WebSocket 握手是由 http 升级的,携带 token 已经被 Security 拦截验证了,所以可以直接获取到用户
PigxUser user = SecurityUtils.getUser();
attributes.put("USER_KEY_ATTR_NAME", user);
return true;
}
/**
* Invoked after the handshake is done. The response status and headers indicate the
* results of the handshake, i.e. whether it was successful or not.
* @param request the current request
* @param response the current response
* @param wsHandler the target WebSocket handler
* @param exception an exception raised during the handshake, or {@code null} if none
*/
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Exception exception) {
}
}

View File

@@ -0,0 +1,20 @@
package com.pig4cloud.pigx.common.websocket.distribute;
/**
* 本地消息分发,直接进行发送
*
* @author Hccake 2021/1/12
* @version 1.0
*/
public class LocalMessageDistributor implements MessageDistributor, MessageSender {
/**
* 消息分发
* @param messageDO 发送的消息
*/
@Override
public void distribute(MessageDO messageDO) {
doSend(messageDO);
}
}

View File

@@ -0,0 +1,39 @@
package com.pig4cloud.pigx.common.websocket.distribute;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.List;
/**
* @author Hccake 2021/1/12
* @version 1.0
*/
@Data
@Accessors(chain = true)
public class MessageDO {
/**
* 是否广播
*/
private Boolean needBroadcast;
/**
* sessionKeys
*/
private List<Object> sessionKeys;
/**
* 需要发送的消息文本
*/
private String messageText;
/**
* 构建需要广播的message
* @author lingting 2021-03-25 17:28
*/
public static MessageDO broadcastMessage(String text) {
return new MessageDO().setMessageText(text).setNeedBroadcast(true);
}
}

View File

@@ -0,0 +1,17 @@
package com.pig4cloud.pigx.common.websocket.distribute;
/**
* 消息分发器
*
* @author Hccake 2021/1/12
* @version 1.0
*/
public interface MessageDistributor {
/**
* 消息分发
* @param messageDO 发送的消息
*/
void distribute(MessageDO messageDO);
}

View File

@@ -0,0 +1,34 @@
package com.pig4cloud.pigx.common.websocket.distribute;
import cn.hutool.core.collection.CollectionUtil;
import com.pig4cloud.pigx.common.websocket.config.WebSocketMessageSender;
import java.util.List;
/**
* @author Hccake 2021/1/12
* @version 1.0
*/
public interface MessageSender {
/**
* 发送消息
* @param messageDO 发送的消息
*/
default void doSend(MessageDO messageDO) {
Boolean needBroadcast = messageDO.getNeedBroadcast();
String messageText = messageDO.getMessageText();
List<Object> sessionKeys = messageDO.getSessionKeys();
if (needBroadcast != null && needBroadcast) {
// 广播信息
WebSocketMessageSender.broadcast(messageText);
}
else if (CollectionUtil.isNotEmpty(sessionKeys)) {
// 指定用户发送
for (Object sessionKey : sessionKeys) {
WebSocketMessageSender.send(sessionKey, messageText);
}
}
}
}

View File

@@ -0,0 +1,35 @@
package com.pig4cloud.pigx.common.websocket.distribute;
import cn.hutool.json.JSONUtil;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.ArrayList;
import java.util.List;
/**
* 消息分发器
*
* @author Hccake 2021/1/12
* @version 1.0
*/
@RequiredArgsConstructor
public class RedisMessageDistributor implements MessageDistributor {
private final StringRedisTemplate stringRedisTemplate;
/**
* 消息分发
* @param messageDO 发送的消息
*/
@Override
public void distribute(MessageDO messageDO) {
// 包装 sessionKey 适配分布式多环境
List<Object> sessionKeyList = new ArrayList<>(messageDO.getSessionKeys());
messageDO.setSessionKeys(sessionKeyList);
String str = JSONUtil.toJsonStr(messageDO);
stringRedisTemplate.convertAndSend(RedisWebsocketMessageListener.CHANNEL, str);
}
}

View File

@@ -0,0 +1,41 @@
package com.pig4cloud.pigx.common.websocket.distribute;
import cn.hutool.json.JSONUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
/**
* redis订阅 websocket 发送消息,接收到消息时进行推送
*
* @author Hccake 2021/1/12
* @version 1.0
*/
@Slf4j
@RequiredArgsConstructor
public class RedisWebsocketMessageListener implements MessageListener, MessageSender {
public static final String CHANNEL = "websocket-send";
private final StringRedisTemplate stringRedisTemplate;
@Override
public void onMessage(Message message, byte[] bytes) {
log.info("redis channel Listener message send {}", message);
byte[] channelBytes = message.getChannel();
RedisSerializer<String> stringSerializer = stringRedisTemplate.getStringSerializer();
String channel = stringSerializer.deserialize(channelBytes);
// 这里没有使用通配符所以一定是true
if (CHANNEL.equals(channel)) {
byte[] bodyBytes = message.getBody();
String body = stringSerializer.deserialize(bodyBytes);
MessageDO messageDO = JSONUtil.toBean(body, MessageDO.class);
doSend(messageDO);
}
}
}

View File

@@ -0,0 +1,25 @@
package com.pig4cloud.pigx.common.websocket.handler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.WebSocketSession;
/**
* @author lengleng
* @date 2021/11/4
*
* 默认消息处理
*/
@Slf4j
public class CustomPlanTextMessageHandler implements PlanTextMessageHandler {
/**
* 普通文本消息处理
* @param session 当前接收消息的session
* @param message 文本消息
*/
@Override
public void handle(WebSocketSession session, String message) {
log.info("sessionId {} ,msg {}", session.getId(), message);
}
}

View File

@@ -0,0 +1,73 @@
package com.pig4cloud.pigx.common.websocket.handler;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.json.JsonReadFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.pig4cloud.pigx.common.websocket.holder.JsonMessageHandlerHolder;
import com.pig4cloud.pigx.common.websocket.message.AbstractJsonWebSocketMessage;
import com.pig4cloud.pigx.common.websocket.message.JsonWebSocketMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
/**
* @author Hccake 2020/12/31
* @version 1.0
*/
@Slf4j
public class CustomWebSocketHandler extends TextWebSocketHandler {
private static final ObjectMapper MAPPER = new ObjectMapper();
static {
// 有特殊需要转义字符, 不报错
MAPPER.enable(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature());
}
private PlanTextMessageHandler planTextMessageHandler;
public CustomWebSocketHandler() {
}
public CustomWebSocketHandler(PlanTextMessageHandler planTextMessageHandler) {
this.planTextMessageHandler = planTextMessageHandler;
}
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) throws JsonProcessingException {
// 空消息不处理
if (message.getPayloadLength() == 0) {
return;
}
// 消息类型必有一属性type先解析获取该属性
String payload = message.getPayload();
JsonNode jsonNode = MAPPER.readTree(payload);
JsonNode typeNode = jsonNode.get(AbstractJsonWebSocketMessage.TYPE_FIELD);
if (typeNode == null) {
if (planTextMessageHandler != null) {
planTextMessageHandler.handle(session, payload);
}
else {
log.error("[handleTextMessage] 普通文本消息({})没有对应的消息处理器", payload);
}
}
else {
String messageType = typeNode.asText();
// 获得对应的消息处理器
JsonMessageHandler jsonMessageHandler = JsonMessageHandlerHolder.getHandler(messageType);
if (jsonMessageHandler == null) {
log.error("[handleTextMessage] 消息类型({})不存在对应的消息处理器", messageType);
return;
}
// 消息处理
Class<? extends JsonWebSocketMessage> messageClass = jsonMessageHandler.getMessageClass();
JsonWebSocketMessage websocketMessageJson = MAPPER.treeToValue(jsonNode, messageClass);
jsonMessageHandler.handle(session, websocketMessageJson);
}
}
}

View File

@@ -0,0 +1,31 @@
package com.pig4cloud.pigx.common.websocket.handler;
import com.pig4cloud.pigx.common.websocket.message.JsonWebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
/**
* @author Hccake 2021/1/4
* @version 1.0
*/
public interface JsonMessageHandler<T extends JsonWebSocketMessage> {
/**
* JsonWebSocketMessage 类型消息处理
* @param session 当前接收 session
* @param message 当前接收到的 message
*/
void handle(WebSocketSession session, T message);
/**
* 当前处理器处理的消息类型
* @return messageType
*/
String type();
/**
* 当前处理器对应的消息Class
* @return Class<T>
*/
Class<T> getMessageClass();
}

View File

@@ -0,0 +1,34 @@
package com.pig4cloud.pigx.common.websocket.handler;
import com.pig4cloud.pigx.common.websocket.config.WebSocketMessageSender;
import com.pig4cloud.pigx.common.websocket.message.JsonWebSocketMessage;
import com.pig4cloud.pigx.common.websocket.message.PingJsonWebSocketMessage;
import com.pig4cloud.pigx.common.websocket.message.PongJsonWebSocketMessage;
import com.pig4cloud.pigx.common.websocket.message.WebSocketMessageTypeEnum;
import org.springframework.web.socket.WebSocketSession;
/**
* 心跳处理接收到客户端的ping时立刻回复一个pong
*
* @author Hccake 2021/1/4
* @version 1.0
*/
public class PingJsonMessageHandler implements JsonMessageHandler<PingJsonWebSocketMessage> {
@Override
public void handle(WebSocketSession session, PingJsonWebSocketMessage message) {
JsonWebSocketMessage pongJsonWebSocketMessage = new PongJsonWebSocketMessage();
WebSocketMessageSender.send(session, pongJsonWebSocketMessage);
}
@Override
public String type() {
return WebSocketMessageTypeEnum.PING.getValue();
}
@Override
public Class<PingJsonWebSocketMessage> getMessageClass() {
return PingJsonWebSocketMessage.class;
}
}

View File

@@ -0,0 +1,21 @@
package com.pig4cloud.pigx.common.websocket.handler;
import org.springframework.web.socket.WebSocketSession;
/**
* 普通文本类型非指定json类型的消息处理器 即消息不满足于我们定义的Json类型消息时所使用的处理器
*
* @see com.pig4cloud.pigx.common.websocket.message.JsonWebSocketMessage
* @author Hccake 2021/1/5
* @version 1.0
*/
public interface PlanTextMessageHandler {
/**
* 普通文本消息处理
* @param session 当前接收消息的session
* @param message 文本消息
*/
void handle(WebSocketSession session, String message);
}

View File

@@ -0,0 +1,27 @@
package com.pig4cloud.pigx.common.websocket.holder;
import com.pig4cloud.pigx.common.websocket.handler.JsonMessageHandler;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author Hccake 2021/1/4
* @version 1.0
*/
public final class JsonMessageHandlerHolder {
private JsonMessageHandlerHolder() {
}
private static final Map<String, JsonMessageHandler> MESSAGE_HANDLER_MAP = new ConcurrentHashMap<>();
public static JsonMessageHandler getHandler(String type) {
return MESSAGE_HANDLER_MAP.get(type);
}
public static void addHandler(JsonMessageHandler jsonMessageHandler) {
MESSAGE_HANDLER_MAP.put(jsonMessageHandler.type(), jsonMessageHandler);
}
}

View File

@@ -0,0 +1,46 @@
package com.pig4cloud.pigx.common.websocket.holder;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.WebSocketHandlerDecorator;
/**
* WebSocketHandler 装饰器该装饰器主要用于在开启和关闭连接时进行session的映射存储与释放
*
* @author Hccake 2020/12/31
* @version 1.0
*/
public class MapSessionWebSocketHandlerDecorator extends WebSocketHandlerDecorator {
private final SessionKeyGenerator sessionKeyGenerator;
public MapSessionWebSocketHandlerDecorator(WebSocketHandler delegate, SessionKeyGenerator sessionKeyGenerator) {
super(delegate);
this.sessionKeyGenerator = sessionKeyGenerator;
}
/**
* websocket 连接时执行的动作
* @param session websocket session 对象
* @throws Exception 异常对象
*/
@Override
public void afterConnectionEstablished(final WebSocketSession session) throws Exception {
Object sessionKey = sessionKeyGenerator.sessionKey(session);
WebSocketSessionHolder.addSession(sessionKey, session);
}
/**
* websocket 关闭连接时执行的动作
* @param session websocket session 对象
* @param closeStatus 关闭状态对象
* @throws Exception 异常对象
*/
@Override
public void afterConnectionClosed(final WebSocketSession session, CloseStatus closeStatus) throws Exception {
Object sessionKey = sessionKeyGenerator.sessionKey(session);
WebSocketSessionHolder.removeSession(sessionKey);
}
}

View File

@@ -0,0 +1,20 @@
package com.pig4cloud.pigx.common.websocket.holder;
import org.springframework.web.socket.WebSocketSession;
/**
* WebSocketSession 唯一标识生成器
*
* @author Hccake 2021/1/5
* @version 1.0
*/
public interface SessionKeyGenerator {
/**
* 获取当前session的唯一标识
* @param webSocketSession 当前session
* @return session唯一标识
*/
Object sessionKey(WebSocketSession webSocketSession);
}

View File

@@ -0,0 +1,65 @@
package com.pig4cloud.pigx.common.websocket.holder;
import org.springframework.web.socket.WebSocketSession;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* WebSocketSession 持有者 主要用于保存当前所有在线的会话信息
*
* @author Hccake 2021/1/4
* @version 1.0
*/
public final class WebSocketSessionHolder {
private WebSocketSessionHolder() {
}
private static final Map<String, WebSocketSession> USER_SESSION_MAP = new ConcurrentHashMap<>();
/**
* 添加一个 session
* @param sessionKey session 唯一标识
* @param session 待添加的 WebSocketSession
*/
public static void addSession(Object sessionKey, WebSocketSession session) {
USER_SESSION_MAP.put(sessionKey.toString(), session);
}
/**
* 删除一个 session
* @param sessionKey session唯一标识
*/
public static void removeSession(Object sessionKey) {
USER_SESSION_MAP.remove(sessionKey.toString());
}
/**
* 获取指定标识的 session
* @param sessionKey session唯一标识
* @return WebSocketSession 该标识对应的 session
*/
public static WebSocketSession getSession(Object sessionKey) {
return USER_SESSION_MAP.get(sessionKey.toString());
}
/**
* 获取当前所有在线的 session
* @return Collection<WebSocketSession> session集合
*/
public static Collection<WebSocketSession> getSessions() {
return USER_SESSION_MAP.values();
}
/**
* 获取所有在线的用户标识
* @return Set<Object> session唯一标识集合
*/
public static Set<String> getSessionKeys() {
return USER_SESSION_MAP.keySet();
}
}

View File

@@ -0,0 +1,22 @@
package com.pig4cloud.pigx.common.websocket.message;
/**
* @author Hccake 2021/1/4
* @version 1.0
*/
public abstract class AbstractJsonWebSocketMessage implements JsonWebSocketMessage {
public static final String TYPE_FIELD = "type";
private final String type;
protected AbstractJsonWebSocketMessage(String type) {
this.type = type;
}
@Override
public String getType() {
return type;
}
}

View File

@@ -0,0 +1,15 @@
package com.pig4cloud.pigx.common.websocket.message;
/**
* @author Hccake 2021/1/4
* @version 1.0
*/
public interface JsonWebSocketMessage {
/**
* 消息类型,主要用于匹配对应的消息处理器
* @return 当前消息类型
*/
String getType();
}

View File

@@ -0,0 +1,13 @@
package com.pig4cloud.pigx.common.websocket.message;
/**
* @author Hccake 2021/1/4
* @version 1.0
*/
public class PingJsonWebSocketMessage extends AbstractJsonWebSocketMessage {
public PingJsonWebSocketMessage() {
super(WebSocketMessageTypeEnum.PING.getValue());
}
}

View File

@@ -0,0 +1,13 @@
package com.pig4cloud.pigx.common.websocket.message;
/**
* @author Hccake 2021/1/4
* @version 1.0
*/
public class PongJsonWebSocketMessage extends AbstractJsonWebSocketMessage {
public PongJsonWebSocketMessage() {
super(WebSocketMessageTypeEnum.PONG.getValue());
}
}

View File

@@ -0,0 +1,18 @@
package com.pig4cloud.pigx.common.websocket.message;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
/**
* @author Hccake 2021/1/4
* @version 1.0
*/
@Getter
@RequiredArgsConstructor
public enum WebSocketMessageTypeEnum {
PING("ping"), PONG("pong");
private final String value;
}

View File

@@ -0,0 +1,5 @@
/**
* 此包内代码 来源至 BallCat 关于BallCat: 组织旨在为项目快速开发提供一系列的基础能力,方便使用者根据项目需求快速进行功能拓展。
* https://github.com/ballcat-projects/ballcat
*/
package com.pig4cloud.pigx.common.websocket;

View File

@@ -0,0 +1,3 @@
com.pig4cloud.pigx.common.websocket.config.WebSocketAutoConfiguration
com.pig4cloud.pigx.common.websocket.config.RedisMessageDistributorConfiguration
com.pig4cloud.pigx.common.websocket.config.LocalMessageDistributorConfiguration