Springboot 整合 WebSocket ,使用STOMP协议+Redis 解决负载场景问题(二)

代码 代码 1548 人阅读 | 0 人回复

<
媒介


上一篇,简朴给各人整开了一下websocket,利用stomp方法。
那篇,便是考虑到单体的效劳利用websocket ,根据上一篇的整开,的确出成绩。
可是假如一旦是背载多台效劳的时分,那末便会呈现丧失成绩。

甚么?出有念过那个成绩? 
没关系,看图教工具:

一向风格,我瞎绘了一张简图,大抵讲一下前后端利用websocket通信的局面。

094637lme7efw3v5vsamev.jpg

 简析:
 后端某个效劳起了,整开了websocket做为 server,开放了一些节面endpoints ;
 前端效劳也起了,也整开了websocket做为 client,毗连server的websocket ;
 后端server 将每一个 前端client 毗连的 websocket session 皆存起去, 确保 明白谁是谁。
 如许正在server给client推收动静的时分,能包管推收,数据没有丧失。
websocket session map  是存正在于 后端效劳 的内乱存内里的 ,单一台后端 server,貌似出啥年夜成绩。

ok,我们简朴也理解了一下大抵的局面,接下去持续看图 ,多台websocket server 背载场景:

094637g18h7iy7z3674uz5.jpg

 简析:
如今做为统一个后端微效劳,整开websocket,起了两台。
如上图所示,假如 红色的client 跟 websocket挨交讲的时分,毗连的是上里的 浅蓝色websocket server;

浅蓝色websocket server 推收动静给  红色的client ,通信出成绩。
由于websocket server 确当天效劳session map内里寄存着 红色client的毗连websocket session;
动静丧失情况呈现:
我们背载了两台websocket server ,假如触收 websocket server 给红色client持续推收告诉动静, nginx/网闭 按照我们平常的背载均衡设置划定规矩,分收到了 绿色的 websocket server。
此时,绿色的 websocket server 确当天效劳session map内里 并出有  红色client的毗连websocket session ,以是会招致  告诉动静 丧失 。

处理计划:
既然成绩呈现了,那末我们便处理它,本篇便是介绍怎样经由过程 整开动静中心件来处理那个动静丧失的成绩。
我们采取的计划 是 将整开websocket 的 server效劳 (多台) 皆整开 redis做为动静中心件;
正在websocket  server 推收动静给websocket  client时, 先把动静拾到 redis内里 。
然后一切的websocket  server (不论几台效劳) 城市定阅此主题,获得到需求推收的数据,接下去 再推收给到对应的 destination 节面 (这时候候只要实正取当前client有毗连干系的 server扶效劳才气推收胜利,其他皆出有推收)。
做个简图:
094638fqwa9uf3aww9tnw7.jpg


或许许多人看到那内心里多几少有些迷惑, 实在底子缘故原由便是 多台 server 出法子同享 毗连session,假如能把session 连结起去同享,岂没有是处理了?
是的,思绪是对的,惋惜的是 websocekt session 是出有完成序列化接心的,没法利用相同redis来存储起去,然后反序列化获得。(可是实在能够经由过程redis存储相关websocket sessionkey 取节面的IP地点、端心,强止把恳求再次分收到准确的websocket server上里来。可是小我私家觉得这类方法没有是很好,以是本文仍是介绍redis的定阅/推收方法去处理那个成绩)
话未几道,进进代码环节。

注释


基于上一篇的根柢,开端魔改把redis参加出去。
094638a8se3dd5725a5a0r.jpg

 揭代码:

pom.xml :
  1.     <dependencies>
  2.         <dependency>
  3.             <groupId>org.springframework.boot</groupId>
  4.             <artifactId>spring-boot-starter-data-redis</artifactId>
  5.         </dependency>
  6.         <dependency>
  7.             <groupId>com.alibaba</groupId>
  8.             <artifactId>fastjson</artifactId>
  9.             <version>1.2.68</version>
  10.         </dependency>
  11.         <dependency>
  12.             <groupId>org.springframework.boot</groupId>
  13.             <artifactId>spring-boot-starter-websocket</artifactId>
  14.         </dependency>
  15.         <dependency>
  16.             <groupId>org.springframework.boot</groupId>
  17.             <artifactId>spring-boot-starter</artifactId>
  18.         </dependency>
  19.         <dependency>
  20.             <groupId>org.springframework.boot</groupId>
  21.             <artifactId>spring-boot-starter-test</artifactId>
  22.             <scope>test</scope>
  23.         </dependency>
  24.     </dependencies>
复造代码
application.yml :
  1. server:
  2.   port: 9908
  3. spring:
  4.   redis:
  5.     host: 127.0.0.1
  6.     port: 6379
  7.     password: 123456
复造代码

RedisConfig.java :
 
  1. import com.fasterxml.jackson.annotation.JsonAutoDetect;
  2. import com.fasterxml.jackson.annotation.PropertyAccessor;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import com.stomp.stomptest.listener.RedisListener;
  5. import org.springframework.cache.annotation.CachingConfigurerSupport;
  6. import org.springframework.cache.annotation.EnableCaching;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. import org.springframework.context.annotation.Primary;
  10. import org.springframework.data.redis.connection.RedisConnectionFactory;
  11. import org.springframework.data.redis.core.RedisTemplate;
  12. import org.springframework.data.redis.listener.PatternTopic;
  13. import org.springframework.data.redis.listener.RedisMessageListenerContainer;
  14. import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
  15. import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
  16. import org.springframework.data.redis.serializer.StringRedisSerializer;
  17. /**
  18. * @Author JCccc
  19. * @Description   redis设置
  20. * @Date 2021/6/30 8:53
  21. */
  22. @Configuration
  23. @EnableCaching
  24. public class RedisConfig extends CachingConfigurerSupport {
  25.     @Bean
  26.     @Primary
  27.     public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
  28.         RedisTemplate<String, Object> template = new RedisTemplate<>();
  29.         template.setConnectionFactory(factory);
  30.         Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class);
  31.         StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
  32.         ObjectMapper om = new ObjectMapper();
  33.         om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
  34.         om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
  35.         jacksonSeial.setObjectMapper(om);
  36.         template.setValueSerializer(jacksonSeial);
  37.         template.setKeySerializer(stringRedisSerializer);
  38.         template.setHashKeySerializer(stringRedisSerializer);
  39.         template.setHashValueSerializer(jacksonSeial);
  40.         template.afterPropertiesSet();
  41.         return template;
  42.     }
  43.     @Bean
  44.     RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
  45.                                             MessageListenerAdapter topicAdapter) {
  46.         RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  47.         container.setConnectionFactory(connectionFactory);
  48.         //定阅了主题 webSocketMsgPush
  49.         container.addMessageListener(topicAdapter, new PatternTopic("webSocketMsgPush"));
  50.         return container;
  51.     }
  52.     /**
  53.      * 动静监听器适配器,绑定动静处置器
  54.      *
  55.      * @return
  56.      */
  57.     @Bean
  58.     MessageListenerAdapter topicAdapter() {
  59.         return new MessageListenerAdapter(new RedisListener());
  60.     }
  61. }
复造代码
简析:
094638ati8j680toi6tfyb.jpg

 WebSocketConfig.java :
  1. import org.springframework.context.annotation.Configuration;
  2. import org.springframework.messaging.simp.config.MessageBrokerRegistry;
  3. import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
  4. import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
  5. import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
  6. import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
  7. import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;
  8. /**
  9. * @Author JCccc
  10. * @Description   EnableWebSocketMessageBroker-注解开启STOMP和谈去传输基于代办署理的动静,此时掌握器撑持利用@MessageMapping
  11. * @Date 2021/6/30 8:53
  12. */
  13. @Configuration
  14. @EnableWebSocketMessageBroker
  15. public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
  16.     private static long HEART_BEAT = 10000;
  17.     @Override
  18.     public void configureMessageBroker(MessageBrokerRegistry config) {
  19.         ThreadPoolTaskScheduler te = new ThreadPoolTaskScheduler();
  20.         te.setPoolSize(1);
  21.         te.setThreadNamePrefix("wss-heartbeat-thread-");
  22.         te.initialize();
  23.         config.enableSimpleBroker("/user", "/topic").setHeartbeatValue(new long[]{HEART_BEAT, HEART_BEAT}).setTaskScheduler(te);
  24.     }
  25.     /**
  26.      * 开放节面
  27.      * @param registry
  28.      */
  29.     @Override
  30.     public void registerStompEndpoints(StompEndpointRegistry registry) {
  31.         //注册两个STOMP的endpoint,别离用于播送战面对面
  32.         //播送
  33.         registry.addEndpoint("/publicServer").withSockJS();
  34.         //面对面
  35.         registry.addEndpoint("/privateServer").withSockJS();
  36.     }
  37.     @Override
  38.     public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
  39.         registration.setMessageSizeLimit(500 * 1024 * 1024);
  40.         registration.setSendBufferSizeLimit(1024 * 1024 * 1024);
  41.         registration.setSendTimeLimit(200000);
  42.     }
  43. }
复造代码
InjectServiceUtil.java :
  1. import com.stomp.stomptest.producer.PushMessage;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Component;
  4. import javax.annotation.PostConstruct;
  5. /**
  6. * @Author JCccc
  7. * @Description   pushMessage (单例)
  8. * @Date 2021/6/30 8:53
  9. */
  10. @Component
  11. public class InjectServiceUtil {
  12.     @Autowired
  13.     private PushMessage pushMessage;
  14.     @PostConstruct
  15.     public void init(){
  16.         InjectServiceUtil.getInstance().pushMessage = this.pushMessage;
  17.     }
  18.     /**
  19.      *  完成单例 start
  20.      */
  21.     private static class SingletonHolder {
  22.         private static final InjectServiceUtil INSTANCE = new InjectServiceUtil();
  23.     }
  24.     private InjectServiceUtil (){}
  25.     public static final InjectServiceUtil getInstance() {
  26.         return SingletonHolder.INSTANCE;
  27.     }
  28.     /**
  29.      *  完成单例 end
  30.      */
  31.     public PushMessage pushMessage(){
  32.         return InjectServiceUtil.getInstance().pushMessage;
  33.     }
  34. }
复造代码
RedisListener.java :
  1. import org.springframework.data.redis.connection.Message;
  2. import org.springframework.data.redis.connection.MessageListener;
  3. /**
  4. * @Author JCccc
  5. * @Description   redis监听动静
  6. * @Date 2021/6/30 8:53
  7. */
  8. public class RedisListener implements MessageListener {
  9.     @Override
  10.     public void onMessage(Message message, byte[] bytes) {
  11.         System.out.println("步调1.监听到需求停止背载转收的动静:" + message.toString());
  12.         InjectServiceUtil.getInstance().pushMessage().send(message.toString());
  13.     }
  14. }
复造代码
简析:
094638qneo4i4olh4osckw.jpg


Message.java :
  1. /**
  2. * @Author JCccc
  3. * @Description
  4. * @Date 2021/8/20 9:26
  5. */
  6. public class Message {
  7.     /**
  8.      * 动静编码
  9.      */
  10.     private String code;
  11.     /**
  12.      * 去自(包管独一)
  13.      */
  14.     private String form;
  15.     /**
  16.      * 来自(包管独一)
  17.      */
  18.     private String to;
  19.     /**
  20.      * 内乱容
  21.      */
  22.     private String content;
  23.     public String getCode() {
  24.         return code;
  25.     }
  26.     public void setCode(String code) {
  27.         this.code = code;
  28.     }
  29.     public String getForm() {
  30.         return form;
  31.     }
  32.     public void setForm(String form) {
  33.         this.form = form;
  34.     }
  35.     public String getTo() {
  36.         return to;
  37.     }
  38.     public void setTo(String to) {
  39.         this.to = to;
  40.     }
  41.     public String getContent() {
  42.         return content;
  43.     }
  44.     public void setContent(String content) {
  45.         this.content = content;
  46.     }
  47. }
复造代码

PushMessage.java  :
  1. import com.alibaba.fastjson.JSON;
  2. import com.stomp.stomptest.pojo.Message;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.messaging.simp.SimpMessagingTemplate;
  5. import org.springframework.stereotype.Service;
  6. /**
  7. * @Author JCccc
  8. * @Description   动静收收
  9. * @Date 2021/6/30 8:53
  10. */
  11. @Service
  12. public class PushMessage {
  13.     @Autowired
  14.     private SimpMessagingTemplate template;
  15.     public void send(String msgJson){
  16.         System.out.println("步调2.行将推收给websocket server:");
  17.         Message message = JSON.parseObject(msgJson, Message.class);
  18.         System.out.println("步调2.1 动静收给:"+message.getTo());
  19.         System.out.println("步调2.2 收收内乱容是:"+message.getContent());
  20.         template.convertAndSendToUser(message.getTo(), "/message", message.getContent());
  21.         System.out.println("----------动静收收终了----------");
  22.         
  23.         //播送
  24.        // template.convertAndSend("/topic/public",chatMessage);
  25.     }
  26. }
复造代码

最初是 TestController.java (测试接心。模仿触收体系推收动静):
  1. @Controller
  2. public class TestController {
  3.     @Autowired
  4.     public SimpMessagingTemplate template;
  5.     @ResponseBody
  6.     @RequestMapping("/pushToOneTest")
  7.     public String sendMessage(@RequestBody Message message) {
  8.         String messageJson = JSON.toJSONString(message);
  9.         System.out.println("!!!体系筹办做动静推收!!!");
  10.         stringRedisTemplate.convertAndSend("webSocketMsgPush",messageJson);
  11.         return "收收胜利";
  12.     }
  13. }
复造代码

ok,项目跑起去,挪用一下测试接心,看看全部流程:
能够看到动静一般推收:
094639oxnony1rf3zkcooo.jpg


看下代码内里挨印的疑息(各人初教的能够之间挨debug看下贱程会更好):
094639j9jahnsje88552yh.jpg


能够看到,这类场景,不论背载了几台, 动静皆先到 redis内里来。

每台server皆经由过程redisListener 监听主题,获得到相关 动静 , 然后才开端利用本地的websocket session 来找到各自效劳器内乱能否存正在当前用户的毗连数据,然后推收进来。
那些找没有到的没法举办推收进来,当然相形见绌而已,可是 我们的背载成绩处理了。
祝贺我们本人。


ok,该篇便到那。






最初另有便是之前的一篇,利用rabbitmq做为动静代理的:
《Springboot 整开Websocket+Stomp和谈+RabbitMQ做动静代理 真例教程》
https://blog.csdn.net/qq_35387940/article/details/108276136











免责声明:假如进犯了您的权益,请联络站少,我们会实时删除侵权内乱容,感谢协作!
1、本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,按照目前互联网开放的原则,我们将在不通知作者的情况下,转载文章;如果原文明确注明“禁止转载”,我们一定不会转载。如果我们转载的文章不符合作者的版权声明或者作者不想让我们转载您的文章的话,请您发送邮箱:Cdnjson@163.com提供相关证明,我们将积极配合您!
2、本网站转载文章仅为传播更多信息之目的,凡在本网站出现的信息,均仅供参考。本网站将尽力确保所提供信息的准确性及可靠性,但不保证信息的正确性和完整性,且不对因信息的不正确或遗漏导致的任何损失或损害承担责任。
3、任何透过本网站网页而链接及得到的资讯、产品及服务,本网站概不负责,亦不负任何法律责任。
4、本网站所刊发、转载的文章,其版权均归原作者所有,如其他媒体、网站或个人从本网下载使用,请在转载有关文章时务必尊重该文章的著作权,保留本网注明的“稿件来源”,并自负版权等法律责任。
回复 关闭延时

使用道具 举报

 
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则