RabbitMQ Routingby Pigbrain

Routing (using the Java Client)

  • Publish/Subscribe에서는 모든 메세지를 수신했다
  • Routing 기능을 이용하여 특정 메세지만 수신가능하도록 변경할 것이다

Bindings

  1. channel.queueBind(queueName, EXCHANGE_NAME, "");
  • Binding은 Exchange와 Queue와의 관계를 설정하는 것이다
    • Queue는 이 Exchange가 주는 메세지만 수신한다는 것으로 볼 수 있다
  • queueBind 메소드는 routingKey 파라미터를 가질수 있다

    1. channel.queueBind(queueName, EXCHANGE_NAME, "black"); // routingKey : "black"
  • routingKey는 exchange Type에 영향을 받는다
    • fanout으로 설정할 경우 routingKey 설정은 무시된다

Direct exchange

  • direct exchange의 원리는 단순하다
    • routingKey와 매칭되는 메세지만 전달한다

  • exchange X에 두개의 Queue(Q1, Q2)가 Binding 되어 있다
    • 첫 번째 Queue의 routingKey는 orage이다
    • 두 번째 Queue의 routingKeys느 black, green이다
  • routingKey를 orage로 설정하고 메세지를 publish 하면 Q1에 들어가진다
  • routingKey를 black 혹은 green으로 설정한 메세지는 Q2에 들어가지낟

Multiple bindings

  • 동일한 routingKey(black)로 두개의 Queue(Q1, Q2)에 Binding 하였다
  • routingKey가 black으로 설정된 메세지를 publish하면 fanout처럼 동작한다
    • Q1,Q2에 메세지가 모두 전달 된다

Putting it all together

EmitLogDirect.java (Sending)

  1. public class EmitLogDirect {
  2.  
  3. private static final String EXCHANGE_NAME = "direct_logs";
  4. // argv -> info warning error ....
  5.  
  6. public static void main(String[] argv) throws java.io.IOException {
  7. ConnectionFactory factory = new ConnectionFactory();
  8. factory.setHost("localhost");
  9. Connection connection = factory.newConnection();
  10. Channel channel = connection.createChannel();
  11. channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  12. String severity = getSeverity(argv);
  13. String message = getMessage(argv);
  14. channel.basicPublish(EXCHANGE_NAME,
  15. severity,
  16. null,
  17. message.getBytes());
  18. System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
  19. channel.close();
  20. connection.close();
  21. }
  22. //..
  23. }

ReceiveLogsDirect.java (Receiving)

  1. import com.rabbitmq.client.*;
  2.  
  3. import java.io.IOException;
  4.  
  5. public class ReceiveLogsDirect {
  6.  
  7. private static final String EXCHANGE_NAME = "direct_logs";
  8. public static void main(String[] argv) throws Exception {
  9. ConnectionFactory factory = new ConnectionFactory();
  10. factory.setHost("localhost");
  11. Connection connection = factory.newConnection();
  12. Channel channel = connection.createChannel();
  13. channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  14. String queueName = channel.queueDeclare().getQueue();
  15. if (argv.length < 1){
  16. System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
  17. System.exit(1);
  18. }
  19. for(String severity : argv){
  20. channel.queueBind(queueName, EXCHANGE_NAME, severity);
  21. }
  22.  
  23. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  24. Consumer consumer = new DefaultConsumer(channel) {
  25. @Override
  26. public void handleDelivery(String consumerTag, Envelope envelope,
  27. AMQP.BasicProperties properties,
  28. byte[] body) throws IOException {
  29.  
  30. String message = new String(body, "UTF-8");
  31. System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
  32. }
  33. };
  34. channel.basicConsume(queueName, true, consumer);
  35. }
  36. }


원문

  • http://next.rabbitmq.com/tutorials/tutorial-four-java.html
Published 10 April 2016