fanout 타입을 갖고 logs라는 이름의 exchange를 생성하기 위해 다음과 같이 한다
- channel.exchangeDeclare("logs", "fanout");
logs라는 exchange로 메세지를 보내기 위해 다음과 같이 한다
- channel.basicPublish("logs", "", null, message.getBytes());
아래 명령을 실행하면 exchange 리스트를 볼 수 있다
- $ ./rabbitmqctl list_exchanges
- Listing exchanges ...
- direct
- amq.direct direct
- amq.fanout fanout
- amq.headers headers
- amq.match headers
- amq.rabbitmq.log topic
- amq.rabbitmq.trace topic
- amq.topic topic
- logs fanout
- ...done.
exchange 리스트 중 amp.* 형태로 되어있는 것들은 기본으로 생성된다
exchange 자리에 ”“(Empty String)를 넣었었다
- channel.basicPublish("", "hello", null, message.getBytes());
아무런 파라미터 없이 queueDeclare()를 호출하면 non-durable, exclusive, autodelete 속성을 갖는 임의의 Queue가 생성된다
- String queueName = channel.queueDeclare().getQueue();
logs라는 exchange에 Binding 하기 위해 다음과 같이 한다
- channel.queueBind(queueName, "logs", "");
- import java.io.IOException;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.Channel;
-
- public class EmitLog {
-
- private static final String EXCHANGE_NAME = "logs";
-
- public static void main(String[] argv) throws java.io.IOException {
-
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
-
- String message = getMessage(argv);
-
- channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
- System.out.println(" [x] Sent '" + message + "'");
-
- channel.close();
- connection.close();
- }
- //...
- }
- import com.rabbitmq.client.*;
- import java.io.IOException;
-
- public class ReceiveLogs {
- private static final String EXCHANGE_NAME = "logs";
-
- public static void main(String[] argv) throws Exception {
-
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
- String queueName = channel.queueDeclare().getQueue();
- channel.queueBind(queueName, EXCHANGE_NAME, "");
-
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
-
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag,
- Envelope envelope,
- AMQP.BasicProperties properties,
- byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println(" [x] Received '" + message + "'");
- }
- };
-
- channel.basicConsume(queueName, true, consumer);
- }
- }