fanout 타입을 갖고 logs라는 이름의 exchange를 생성하기 위해 다음과 같이 한다
channel.exchangeDeclare("logs", "fanout");
logs라는 exchange로 메세지를 보내기 위해 다음과 같이 한다
channel.basicPublish("logs", "", null, message.getBytes());
아래 명령을 실행하면 exchange 리스트를 볼 수 있다
$ ./rabbitmqctl list_exchanges
Listing exchanges ...
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
logs fanout
...done.
exchange 리스트 중 amp.* 형태로 되어있는 것들은 기본으로 생성된다
exchange 자리에 ”“(Empty String)를 넣었었다
channel.basicPublish("", "hello", null, message.getBytes());
아무런 파라미터 없이 queueDeclare()를 호출하면 non-durable, exclusive, autodelete 속성을 갖는 임의의 Queue가 생성된다
String queueName = channel.queueDeclare().getQueue();
logs라는 exchange에 Binding 하기 위해 다음과 같이 한다
channel.queueBind(queueName, "logs", "");
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
//...
}
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}