标签搜索

目 录CONTENT

文章目录

netty的集群部署多channel解决方案之Rabbitmq

高大北
2023-01-04 / 0 评论 / 0 点赞 / 110 阅读 / 1,054 字 / 正在检测是否收录...

netty做集群 channel如何共享?

方案一:
netty 集群,通过rocketmq等MQ 推送到所有netty服务端,channel 共享无非是要那个通道都可以发送消息向客户端
方案二:
MQ广播+ 多Netty ,Netty收到MQ消息后,如果本地存储有该channel,就发送,没有存储就忽略,完美解决,不需要做channel的共享。

这里使用rabbitmq的订阅发布的广播模式(如果有其他服务可以使用Topic)
1、添加配置文件

		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.5.5</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
        </dependency>
         <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <version>2.0.3.RELEASE</version>
        </dependency>
server:
  port: 8888
  
nettyEventExchange: netty.event.exchange
nettyQueue: netty.${server.port}.message.queue

spring:
  rabbitmq:
    host: 127.0.0.1 
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      retry:
        ####开启消费者(程序出现异常的情况下会)进行重试
        enabled: false #默认为false
        ####最大重试次数
        max-attempts: 5
        ####重试间隔次数
        initial-interval: 3000
      simple:
        acknowledge-mode: manual
      simple.concurrency: 10  #线程池大小,默认为10
      simple.max-concurrency: 100  #最大线程池大小,默认为10

2、添加配置文件和实体

package yws.net.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class EventMessage extends Throwable implements Serializable {

    /**
     * 消息队列的消息id
     */
    private String messageId;


    /**
     * 事件类型
     */
    private String eventMessageType;


    /**
     * 业务id
     */
    private String bizId;


    /**
     * 账号
     */
    private Long accountNo;


    /**
     * 消息体
     */
    private String content;

    /**
     * 备注
     */
    private String remark;

}

package yws.net.config;

import lombok.Data;
import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 如果多集群部署需要使用
 * 推送消息时候使用订阅发布,对所有服务都发送,每个服务都查询自己
 * jvm里面是否有存储对应的channel,有的话推送,没有的话忽略
 * */
@Configuration
@Data
public class RabbitMQConfig {

    /**
     * 交换机
     */
    @Value("${nettyEventExchange}")
    private String nettyEventExchange;

    /**
     * 队列
     */
    @Value("${nettyQueue}")
    private String nettyQueue;

    /**
     * 创建交换机 Fanout类型
     * @return
     */
    @Bean
    public FanoutExchange nettyEventExchange(){
        //durable:开启持久化,autoDelete:不自动删除
        //return new TopicExchange(nettyEventExchange,true,false);
        return new FanoutExchange(nettyEventExchange,true,false);
    }

    /**
     * 消息转换器
     * @return
     */
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    

    /**
     * 队列和交换机的绑定关系建立
     */
    @Bean
    public Binding nettyAddApiBinding() {
        return BindingBuilder.bind(nettyQueue()).to(nettyEventExchange());
    }

    /**
     * 普通队列,用于被监听
     */
    @Bean
    public Queue nettyQueue() {
        return new Queue(nettyQueue);
    }

}

3、添加监听

package yws.net.listener;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import yws.net.enums.BizCodeEnum;
import yws.net.exception.BizException;
import yws.net.model.EventMessage;
import yws.net.util.redis.RedisUtils;

import java.io.IOException;

/**
 * @author Yws
 * @since 2022/8/26 集群消息推送消费者
 */
@Component
@Slf4j
@RabbitListener(queues = "${nettyQueue}")
public class NettyMsgMqListener {

  @Autowired private RedisUtils redisUtils;

  @Value("${server.port}")
  public int port;

  @RabbitHandler
  public void nettyMsgHandler(EventMessage eventMessage, Message message, Channel channel)
      throws IOException {
      //测试多服务
    log.info("测试端口:{}",port);
    try {
        //查看对应的netty服务的本地存储里边是否有该channel,有的话推送消息、没有的话忽略

    } catch (Exception e) {
      // 处理业务异常,还有进行其他操作,比如记录失败原因
      log.error("消费失败1:" + eventMessage);
    }
     //确认消息消费成功,
     channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

  }
}

4、启动多个springboot项目,启动多个消费者,多集群可以添加jvm参数

-Dserver.port=8910 -Dnetty.port=7001

1672817085032
5、添加测试类

package yws.net.biz;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import yws.net.MainApplication;
import yws.net.config.RabbitMQConfig;
import yws.net.model.EventMessage;

/**
 * @author Yws
 * @since 2023/1/4
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = MainApplication.class)
@Slf4j
public class NettyTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private RabbitMQConfig rabbitMQConfig;

    @Test
    public void test(){
        String nettyEventExchange = rabbitMQConfig.getNettyEventExchange();
        System.out.println(nettyEventExchange);
        //构建消息
        EventMessage eventMessage = EventMessage.builder()
                .content("test")
                .build();
        rabbitTemplate.convertAndSend(rabbitMQConfig.getNettyEventExchange(),
                "", eventMessage);
    }

}
0

评论区