RabbitMQ的订阅和发布步骤详解
chenlong 发布:2021-10-22 17:38:50阅读:一、关于RabbitMQ搭建和基本概念这里不做介绍,下面给出实用的参考博客
RabbitMQ基础概念及详细介绍参考文档:http://blog.csdn.net/whycold/article/details/41119807
RabbitMQ入门及环境的搭建:http://m.blog.csdn.net/article/details?id=50487028
RabbitMQ网页控制台开启方式:http://blog.csdn.net/spyiu/article/details/24697221
二、RabbitMQ的发布
(1)发布端的连接方法
发布端的连接只需要创建一个ConnectionFactory然后创建一个连接,然后创建一个频道,声明一个路由器,指定名称、模式、及其是否durable
public int BaseConnection(){ /** * 创建连接连接到RabbitMQ */ ConnectionFactory factory = new ConnectionFactory(); //设置MabbitMQ所在主机ip或者主机名 factory.setHost(IP); factory.setPort(5672); factory.setUsername(user); factory.setPassword(password); //创建一个连接 try { connection = factory.newConnection(); } catch (IOException e) { System.out.println("[x] 请确认输入的IP地址、用户名、密码是否准确!"); return -1; } catch (Exception e) { System.out.println("[x] 连接RabbitMQ超时,请重试!"); return -1; } //创建一个频道 try { channel = connection.createChannel(); } catch (IOException e) { System.out.println("[x] 创建频道出错,请重试!"); return -1; } //声明一个路由器,指定名称、模式、及其是否durable try { channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE,EXCHANGE_DURABLE); } catch (IOException e) { System.out.println("[x] 路由器声明失败,请重试!"); return -1; } System.out.println("[x] 发布消息者成功连接至RabbitMQ !"); return 0; }
(2)发布端发布消息的方法
发布端发布消息往指定的exchange(路由)发送的时候需要指定一个routingKey,topic类型时当接收端的bindingKey和routingKey相匹配的时候才能接收到订阅的消息
/** * 发布消息 * @param routingKey routingKey * @param msg 消息 * @return */ public boolean Publish(String routingKey,byte[] msg){ try { channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg); System.out.println("[x] Sender Sent " + routingKey + " : " + msg); return true; } catch (IOException e) { System.out.println("[y] Sender failed to basic publish."); return false; } }
三、RabbitMQ的订阅
(1)订阅端的连接方法
订阅端的连接在发布端连接的基础上还需要给该exchange绑定
/** * 创建连接连接到RabbitMQ * @return 0 means success * -1 means failure */ public int BaseConnection(){ ConnectionFactory factory = new ConnectionFactory(); //设置MabbitMQ所在主机ip或者主机名 factory.setHost(IP); factory.setPort(5672); factory.setUsername(user); factory.setPassword(password); //创建一个连接 try { connection = factory.newConnection(); } catch (IOException e) { System.out.println("[y] 请确认输入的IP地址、用户名、密码是否准确!"); return -1; } catch (Exception e) { System.out.println("[y] 连接RabbitMQ超时,请重试!"); return -1; } //创建一个频道 try { channel = connection.createChannel(); } catch (IOException e) { System.out.println("[y] 创建频道出错,请重试!"); return -1; } //声明一个路由器,指定名称、模式、及其是否durable try { channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE,EXCHANGE_DURABLE); } catch (IOException e) { System.out.println("[y] 路由器声明失败,请重试!"); return -1; } //指定一个队列,随机队列名,non-durable,exclusive,not auto-delete// QUEUE_NAME = MqConfig.QUEUE_NAME; Map<String,Object> args = new HashMap(); args.put("x-message-ttl",message_ttl); try { channel.queueDeclare(QUEUE_NAME,QUEUE_DURABLE,QUEUE_EXCLUSIVE,QUEUE_AUTO_DELETE,args); } catch (IOException e) { System.out.println("[y] 队列声明失败,请重试!"); } try { channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,bindingKey); } catch (IOException e) { System.out.println("[y] 队列绑定路由关键字 " + bindingKey + " 时出错!"); } System.out.println("[y] 订阅消息者成功连接至RabbitMQ !"); return 0; }
(2)订阅端的订阅方法
/** * 订阅消息函数 */ public boolean Consume(){ System.out.println("[y] Receiver " + QUEUE_NAME + " Waiting for messages.To exit press CTRL+C."); //消息订阅 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { team.iOceanPlus.PB.Config.PBConfig rec=team.iOceanPlus.PB.Config.PBConfig.parseFrom(body); // rec.getConfigTargetDistribution(); // rec.getConfigTargetDistribution().getConfigType(); System.out.println(rec.getConfigTargetDistribution(0)); } }; //消息反馈 try { channel.basicConsume(QUEUE_NAME,true,consumer); return true; } catch (IOException e) { System.out.println("[y] Receiver "+ QUEUE_NAME + " failed to basic consume." ); return false; } }
小礼物走一波,支持作者
赏还没有人赞赏,支持一波吧