位置:首页 > 后端 > 框架组件

RabbitMQ的订阅和发布步骤详解

chenlong 发布:2021-10-22 17:38:50阅读:

一、关于RabbitMQ搭建和基本概念这里不做介绍,下面给出实用的参考博客

RabbitMQ基础概念及详细介绍参考文档:http://blog.csdn.net/whycold/article/details/41119807

RabbitMQ入门及环境的搭建:http://m.blog.csdn.net/article/details?id=50487028

RabbitMQ网页控制台开启方式:http://blog.csdn.net/spyiu/article/details/24697221


二、RabbitMQ的发布

(1)发布端的连接方法

发布端的连接只需要创建一个ConnectionFactory然后创建一个连接,然后创建一个频道,声明一个路由器,指定名称、模式、及其是否durable

public int BaseConnection(){        /**
         * 创建连接连接到RabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();        //设置MabbitMQ所在主机ip或者主机名
        factory.setHost(IP);
        factory.setPort(5672);
        factory.setUsername(user);
        factory.setPassword(password);        //创建一个连接
        try {
            connection = factory.newConnection();
        } catch (IOException e) {
            System.out.println("[x] 请确认输入的IP地址、用户名、密码是否准确!");            return  -1;
        } catch (Exception e) {
            System.out.println("[x] 连接RabbitMQ超时,请重试!");            return  -1;
        }        //创建一个频道
        try {
            channel = connection.createChannel();
        } catch (IOException e) {
            System.out.println("[x] 创建频道出错,请重试!");            return -1;
        }        //声明一个路由器,指定名称、模式、及其是否durable
        try {
            channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE,EXCHANGE_DURABLE);
        } catch (IOException e) {
            System.out.println("[x] 路由器声明失败,请重试!");            return -1;
        }

        System.out.println("[x] 发布消息者成功连接至RabbitMQ !");        return 0;
    }


(2)发布端发布消息的方法

发布端发布消息往指定的exchange(路由)发送的时候需要指定一个routingKey,topic类型时当接收端的bindingKey和routingKey相匹配的时候才能接收到订阅的消息


/**
     * 发布消息
     * @param routingKey routingKey
     * @param msg 消息
     * @return
     */
    public boolean Publish(String routingKey,byte[] msg){            try {
                channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg);
                System.out.println("[x] Sender Sent " + routingKey + " : " + msg);                return true;
            } catch (IOException e) {
                System.out.println("[y] Sender failed to basic publish.");                return false;
            }
    }


三、RabbitMQ的订阅

(1)订阅端的连接方法

订阅端的连接在发布端连接的基础上还需要给该exchange绑定


/**
     * 创建连接连接到RabbitMQ
     * @return   0 means success
     *           -1 means failure
     */
    public int BaseConnection(){
        ConnectionFactory factory = new ConnectionFactory();        //设置MabbitMQ所在主机ip或者主机名
        factory.setHost(IP);
        factory.setPort(5672);
        factory.setUsername(user);
        factory.setPassword(password);        //创建一个连接
        try {
            connection = factory.newConnection();
        } catch (IOException e) {
            System.out.println("[y] 请确认输入的IP地址、用户名、密码是否准确!");            return  -1;
        } catch (Exception e) {
            System.out.println("[y] 连接RabbitMQ超时,请重试!");            return  -1;
        }        //创建一个频道
        try {
            channel = connection.createChannel();
        } catch (IOException e) {
            System.out.println("[y] 创建频道出错,请重试!");            return -1;
        }        //声明一个路由器,指定名称、模式、及其是否durable
        try {
            channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE,EXCHANGE_DURABLE);
        } catch (IOException e) {
            System.out.println("[y] 路由器声明失败,请重试!");            return -1;
        }        //指定一个队列,随机队列名,non-durable,exclusive,not auto-delete//        QUEUE_NAME = MqConfig.QUEUE_NAME;

        Map<String,Object> args = new HashMap();
        args.put("x-message-ttl",message_ttl);        try {
            channel.queueDeclare(QUEUE_NAME,QUEUE_DURABLE,QUEUE_EXCLUSIVE,QUEUE_AUTO_DELETE,args);
        } catch (IOException e) {
            System.out.println("[y] 队列声明失败,请重试!");
        }        
        try {
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,bindingKey);
        } catch (IOException e) {
            System.out.println("[y] 队列绑定路由关键字 " + bindingKey + " 时出错!");
        }

        System.out.println("[y] 订阅消息者成功连接至RabbitMQ !");        return 0;
    }

(2)订阅端的订阅方法

/** * 订阅消息函数 */ public boolean Consume(){ System.out.println("[y] Receiver " + QUEUE_NAME + " Waiting for messages.To exit press CTRL+C."); //消息订阅 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { team.iOceanPlus.PB.Config.PBConfig rec=team.iOceanPlus.PB.Config.PBConfig.parseFrom(body); // rec.getConfigTargetDistribution(); // rec.getConfigTargetDistribution().getConfigType(); System.out.println(rec.getConfigTargetDistribution(0)); } }; //消息反馈 try { channel.basicConsume(QUEUE_NAME,true,consumer); return true; } catch (IOException e) { System.out.println("[y] Receiver "+ QUEUE_NAME + " failed to basic consume." ); return false; } }


24人点赞 返回栏目 提问 分享一波

小礼物走一波,支持作者

还没有人赞赏,支持一波吧

留言(问题紧急可添加微信 xxl18963067593) 评论仅代表网友个人 留言列表

暂无留言,快来抢沙发吧!

本刊热文
网友在读
手机扫码查看 手机扫码查看