RabbitMQ Publish/Subscribeby Pigbrain

Publish/Subscribe (using the Java Client)

  • 여러 Consumer에게 메세지를 전달하는 방법을 소개한다
    • 이러한 패턴을 Publish/Subscribe라고 한다
    • 로깅시스템을 예로 들어서 소개 할 것이다
    • Publishing된 로그 메세지는 모든 Receiver(Consumer)에게 전달된다

Exchanges

  • “Hello World”“Work Queues”에서는 Queue에 직접 데이터를 Publishing하는 것 처럼 보였다
  • 실제 RabbitMQ는 Queue에 직접적으로 데이터를 넣지 않는다
  • Producer는 메세지를 오직 exchange에게만 전송할 수 있다
  • exchange의 한쪽에서는 Producer로 부터 메세지를 수신하고 다른 한쪽에서는 Queue로 메세지를 전달한다
  • exchange는 메세지를 어떠한 Queue에 넣어야 할지, 모든 Queue에 넣어야할지, 버려야할지 알고있어야 한다
  • exhcange는 “X”로 표시한다

  • 4가지 Exchage Type이 있다
    • direct
    • topic
    • headeers
    • fanout
      • 모든 Queue로 메세지를 전달한다
  • fanout 타입을 갖고 logs라는 이름의 exchange를 생성하기 위해 다음과 같이 한다

    1. channel.exchangeDeclare("logs", "fanout");
  • logs라는 exchange로 메세지를 보내기 위해 다음과 같이 한다

    1. channel.basicPublish("logs", "", null, message.getBytes());

Listing exchanges

  • 아래 명령을 실행하면 exchange 리스트를 볼 수 있다

    1. $ ./rabbitmqctl list_exchanges
    2. Listing exchanges ...
    3. direct
    4. amq.direct direct
    5. amq.fanout fanout
    6. amq.headers headers
    7. amq.match headers
    8. amq.rabbitmq.log topic
    9. amq.rabbitmq.trace topic
    10. amq.topic topic
    11. logs fanout
    12. ...done.
  • exchange 리스트 중 amp.* 형태로 되어있는 것들은 기본으로 생성된다

    • amp.* 형태의 리스트들을 아마 사용할일이 없을 것이다..

Nameless exchange

  • “Hello World”“Work Queues”에서는 exchange이름을 지정하지 않았다
  • exchange 자리에 ”“(Empty String)를 넣었었다

    1. channel.basicPublish("", "hello", null, message.getBytes());
    • 첫 번째 파라미터에 exchange의 이름을 넣어야 하지만 빈 값을 넣었다
    • 빈 값은 기본 exchage를 사용하도록되어 있다
  • routingKey를 이용하면 특정한 Queue에만 메세지를 전달 할 수 있다

Temporary queues

  • 임시 Queue를 생성하기 위해서는 2가지 방법이 있다
    • RabbitMQ에 연결을 할때마다 임의의 이름을 가진 Queue를 생성하는 방법
    • Queue를 생성 후 Consumer들이 Queue에서 모두 연결이 끊기면 자동적으로 Queue를 삭제하는 방법
  • 아무런 파라미터 없이 queueDeclare()를 호출하면 non-durable, exclusive, autodelete 속성을 갖는 임의의 Queue가 생성된다

    1. String queueName = channel.queueDeclare().getQueue();
    • Queue이름은 amq.gen-JzTY20BRgKO-HjmUJj0wLg 이러한 형태가 된다

Bindings

  • exchange와 Queue를 생성한 후 exchange가 Queue에게 메세지를 전송할 수 있도록 두개 사이의 관계를 생성해야한다
    • Binding한다고 부른다
  • logs라는 exchange에 Binding 하기 위해 다음과 같이 한다

    1. channel.queueBind(queueName, "logs", "");

Putting it all together

EmitLog.java (Sending)

  1. import java.io.IOException;
  2. import com.rabbitmq.client.ConnectionFactory;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.Channel;
  5.  
  6. public class EmitLog {
  7.  
  8. private static final String EXCHANGE_NAME = "logs";
  9. public static void main(String[] argv) throws java.io.IOException {
  10.  
  11. ConnectionFactory factory = new ConnectionFactory();
  12. factory.setHost("localhost");
  13. Connection connection = factory.newConnection();
  14. Channel channel = connection.createChannel();
  15. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  16. String message = getMessage(argv);
  17.  
  18. channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
  19. System.out.println(" [x] Sent '" + message + "'");
  20. channel.close();
  21. connection.close();
  22. }
  23. //...
  24. }

ReceiveLogs.java (Receiving)

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3.  
  4. public class ReceiveLogs {
  5. private static final String EXCHANGE_NAME = "logs";
  6. public static void main(String[] argv) throws Exception {
  7. ConnectionFactory factory = new ConnectionFactory();
  8. factory.setHost("localhost");
  9. Connection connection = factory.newConnection();
  10. Channel channel = connection.createChannel();
  11. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  12. String queueName = channel.queueDeclare().getQueue();
  13. channel.queueBind(queueName, EXCHANGE_NAME, "");
  14. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  15. Consumer consumer = new DefaultConsumer(channel) {
  16. @Override
  17. public void handleDelivery(String consumerTag,
  18. Envelope envelope,
  19. AMQP.BasicProperties properties,
  20. byte[] body) throws IOException {
  21. String message = new String(body, "UTF-8");
  22. System.out.println(" [x] Received '" + message + "'");
  23. }
  24. };
  25. channel.basicConsume(queueName, true, consumer);
  26. }
  27. }


원문

  • http://next.rabbitmq.com/tutorials/tutorial-three-java.html
Published 09 April 2016