RabbitMQ Topicsby Pigbrain

Topics (using the Java Client)

  • fanout 대신 direct를 사용하면 메세지를 선택적으로 수신할 수 있다
  • direct는 정해진 routingKey에 매칭되는 메세지만 수신가능 하다
    • routingKey를 하나만 지정 가능하다
  • topic은 여러 routingKey 조건을 설정 가능하다

Topic exchange

  • topic exchange로 publish하고자 하는 메세지의 routingKey는 특정 포맷을 지켜야한다
    • routingKey는 단어들의 리스트 형태여야 한다
    • 단어는 .으로 구분된다
    • Example
      • stock.usd.nyse
      • nyse.vmw
      • quick.orange.rabbit
    • 255bytes까지 routingKey로 단어들을 등록 가능하다
  • topic exchange는 direcet exchange와 동작 방식이 유사하다
    • routingKey를 기준으로 매칭되는 Queue에만 메세지를 전송한다
  • topic exchange의 routingKey에 사용되는 2가지 문자는 다음과 같다
    • *(star) : 한 단어와 매칭된다
    • #(hash) : 공백 혹은 여러개의 단어와 매칭된다
  • routingKey로 #만 지정해 놓으면 모든 메세지를 수신한다
    • fanout과 동일하게 동작한다
  • routingKey로 , # 둘다 지정하지 않으면 특정 메세지만 수신한다
    • direct와 동일하게 동작한다

Example

  • 동물을 묘사한 메세지를 전송하는 것으로 예를 든다

  • routingKey는 “.." 형태로 지정한다
    • Q1에는 .orange.로 binding 되어있다
      • Q1은 orange색의 동물에 대한 메세지만 수신한다
    • Q2에는 ..rabbit과 lazy.# 로 binding 되어 있다
      • Q2는 rabbit에 관한 모든 메세지를 수신한다
      • lazy 속성을 갖는 동물에대한 모든 메세지를 수신한다
  • routingKey : quick.orange.rabbit 메세지는 Q1과 Q2에 모두 전달된다
  • routingKey : lazy.orange.elephant 메세지는 Q1과 Q2에 모두 전달된다
  • routingKey : quick.orange.fox 메세지는 Q1에만 전달된다
  • routingKey : lazy.brown.fox 메세지는 Q2에만 전달된다
  • routingKey : quick.brown.fox 메세지는 어떠한 Queue에도 매칭되지 않는다. 이 메세지는 버려진다
  • routingKey : quick.orange.male.rabbit 메세지는 4개의 단어로 구성되어 매칭되지 않는다. 이 메세지는 버려진다
  • routingKey : lazy.orange.male.rabbit 메세지는 4개의 단어로 구성되지만 Q2에 매칭될 수 있다. Q2로 전달된다

Putting it all together

EmitLogTopic.java (Sending)

  1. public class EmitLogTopic {
  2.  
  3. private static final String EXCHANGE_NAME = "topic_logs";
  4. // argv -> "kern.critical" "A critical kernel error"
  5.  
  6. public static void main(String[] argv) throws Exception {
  7. ConnectionFactory factory = new ConnectionFactory();
  8. factory.setHost("localhost");
  9. Connection connection = factory.newConnection();
  10. Channel channel = connection.createChannel();
  11. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  12. String routingKey = getRouting(argv);
  13. String message = getMessage(argv);
  14. channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
  15. System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
  16. connection.close();
  17. }
  18. //...
  19. }

ReceiveLogsTopic.java (Receiving)

  1. import com.rabbitmq.client.*;
  2.  
  3. import java.io.IOException;
  4.  
  5. public class ReceiveLogsTopic {
  6. private static final String EXCHANGE_NAME = "topic_logs";
  7. // argv -> "kern.*" // "*.critical" // "kern.*" "*.critical" // "kern.critical" "A critical kernel error"
  8. public static void main(String[] argv) throws Exception {
  9. ConnectionFactory factory = new ConnectionFactory();
  10. factory.setHost("localhost");
  11. Connection connection = factory.newConnection();
  12. Channel channel = connection.createChannel();
  13. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  14. String queueName = channel.queueDeclare().getQueue();
  15. if (argv.length < 1) {
  16. System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
  17. System.exit(1);
  18. }
  19. for (String bindingKey : argv) {
  20. channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
  21. }
  22. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  23. Consumer consumer = new DefaultConsumer(channel) {
  24. @Override
  25. public void handleDelivery(String consumerTag, Envelope envelope,
  26. AMQP.BasicProperties properties, byte[] body) throws IOException {
  27. String message = new String(body, "UTF-8");
  28. System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
  29. }
  30. };
  31.  
  32. channel.basicConsume(queueName, true, consumer);
  33. }
  34. }


원문

  • http://next.rabbitmq.com/tutorials/tutorial-five-java.html
Published 11 April 2016