RabbitMQ的使用_rabbitmq system.in.read()-程序员宅基地

技术标签: Linux  队列  rabbitmq  交换机  

目录

 

RabbitMQ的使用

一.Java连接RabbitMQ

1.1创建Maven项目

1.2导入依赖

1.3创建工具类连接RabbitMQ

二.通讯方式

1.Hello-World

2.Work

3.Publish/Subscribe

4.Routing

5.Topic


RabbitMQ的使用

一.Java连接RabbitMQ

1.1创建Maven项目

pass 。。。。

1.2导入依赖

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.6.0</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
</dependencies>

1.3创建工具类连接RabbitMQ

public class MQConnection {

    public static Connection getConnetion(){
        // 创建Connection工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.247.128");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("123");
        factory.setVirtualHost("/root");

        // 创建Connection
        Connection connection =null;
        try {
         connection = factory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

        return connection;
    }
}

二.通讯方式

1.Hello-World

一个生产者,一个默认的交换机,一个队列,一个消费者

创建生产者,创建一个channel,发布消息到exchange,指定路由规则。

public class publish {
        //创建生产者,创建一个channel,发布消息到exchange,指定路由规则。
    @Test
    public void publish() throws IOException, TimeoutException {
        //1. 获取Connection
        Connection connetion = MQConnection.getConnetion();
        //2. 创建Channel
        Channel channel = connetion.createChannel();
        //3. 发布消息到exchange,同时指定路由的规则
        String msg="Hello-word!666";

        // 参数1:指定exchange,使用""。
        // 参数2:指定路由的规则,使用具体的队列名称。
        // 参数3:指定传递的消息所携带的properties,使用null。
        // 参数4:指定发布的具体消息,byte[]类型
        channel.basicPublish("","Hello",null,msg.getBytes());

        // Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
        System.out.println("消息发布成功!加油");
        //4. 释放资源
        channel.close();
        connetion.close();

    }
}

创建消费者,创建一个channel,创建一个队列,并且去消费当前队列

public class consume {

    @Test
    public void Consume() throws IOException, TimeoutException {

        Connection connetion = MQConnection.getConnetion();

        Channel channel = connetion.createChannel();

        //3. 声明队列-HelloWorld   declare声明
        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        channel.queueDeclare("Hello",true,false,false,null);

        //4. 开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接受消息:"+new String(body,"utf-8"));
            }
        };

        //参数1:queue - 指定消费哪个队列
        //参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
        //参数3:consumer - 指定消费回调
        channel.basicConsume("Hello",true,consumer);

        System.out.println("消费者开始监听队列!");

        System.in.read();
        channel.close();
        connetion.close();

    }
}

 

2.Work

一个生产者,一个默认的交换机,一个队列,两个消费者

只需要在消费者端,添加Qos能力以及更改为手动ack即可让消费者,根据自己的能力去消费指定的消息,而不是默认情况下由RabbitMQ平均分配了,生产者不变,正常发布消息到默认的exchange,并指定routing

消费者指定Qos和手动ack

public class publish {
        //创建生产者,创建一个channel,发布消息到exchange,指定路由规则。
    @Test
    public void publish() throws IOException, TimeoutException {
        //1. 获取Connection
        Connection connetion = MQConnection.getConnetion();
        //2. 创建Channel
        Channel channel = connetion.createChannel();
        //3. 发布消息到exchange,同时指定路由的规则
        String msg="Hello-word!666";

        // 参数1:指定exchange,使用""。
        // 参数2:指定路由的规则,使用具体的队列名称。
        // 参数3:指定传递的消息所携带的properties,使用null。
        // 参数4:指定发布的具体消息,byte[]类型

        for (int i=0;i<10;i++){
            channel.basicPublish("","Work",null,msg.getBytes());
        }

        // Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
        System.out.println("消息发布成功!加油");
        //4. 释放资源
        channel.close();
        connetion.close();

    }
}
public class consume1 {

    @Test
    public void Consume() throws IOException, TimeoutException {

        Connection connetion = MQConnection.getConnetion();

        final Channel channel = connetion.createChannel();

        //3. 声明队列-HelloWorld   declare声明
        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        channel.queueDeclare("Work",true,false,false,null);
        // 指定当前消费者,一次消费多少个消息
        channel.basicQos(4);
        //4. 开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("消费者1号接收到消息:"+new String(body,"utf-8"));
                //2. 手动ack
                //参数一: 代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,deliveryTag 的范围仅限于 Channel
                //参数二:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,将一次性ack所有小于deliveryTag的消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //参数1:queue - 指定消费哪个队列
        //参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
        //参数3:consumer - 指定消费回调
        channel.basicConsume("Work",false,consumer);
        
        System.in.read();
        channel.close();
        connetion.close();
    }
}
public class consume2 {

    @Test
    public void Consume() throws IOException, TimeoutException {

        Connection connetion = MQConnection.getConnetion();

        final Channel channel = connetion.createChannel();

        //3. 声明队列-HelloWorld   declare声明
        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        channel.queueDeclare("Work",true,false,false,null);

        //4. 开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("消费者2号接收到消息:"+new String(body,"utf-8"));
                //2. 手动ack
                //参数一: 代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,deliveryTag 的范围仅限于 Channel
                //参数二:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,将一次性ack所有小于deliveryTag的消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //参数1:queue - 指定消费哪个队列
        //参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
        //参数3:consumer - 指定消费回调
        channel.basicConsume("Work",false,consumer);
        
        System.in.read();
        channel.close();
        connetion.close();
    }
}

 

3.Publish/Subscribe

一个生产者,一个交换机,两个队列,两个消费者

声明一个Fanout类型的exchange,并且将exchange和queue绑定在一起,绑定的方式就是直接绑定。

让生产者创建一个exchange并且指定类型,和一个或多个队列绑定到一起。

public class publish {
        // 声明一个Fanout类型的exchange,并且将exchange和queue绑定在一起,绑定的方式就是直接绑定。
        // 让生产者创建一个exchange并且指定类型,和一个或多个队列绑定到一起。
    @Test
    public void publish() throws IOException, TimeoutException {
        //1. 获取Connection
        Connection connetion = MQConnection.getConnetion();
        //2. 创建Channel
        Channel channel = connetion.createChannel();
        //3. 创建exchange - 绑定某一个队列
        //参数1: exchange的名称
        //参数2: 指定exchange的类型
        channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);
        channel.queueBind("pubsub-queue1","pubsub-exchange","");
        channel.queueBind("pubsub-queue2","pubsub-exchange","");
        //4. 发布消息到exchange,同时指定路由的规则
        String msg="Hello-word!666";

        // 参数1:指定exchange,使用""。
        // 参数2:指定路由的规则,使用具体的队列名称。
        // 参数3:指定传递的消息所携带的properties,使用null。
        // 参数4:指定发布的具体消息,byte[]类型

        for (int i=0;i<10;i++){
            channel.basicPublish("pubsub-exchange","",null,msg.getBytes());

        }

        // Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
        System.out.println("消息发布成功!加油");
        //4. 释放资源
        channel.close();
        connetion.close();
    }
}
public class consume1 {

    @Test
    public void Consume() throws IOException, TimeoutException {

        Connection connetion = MQConnection.getConnetion();

        final Channel channel = connetion.createChannel();

        //3. 声明队列-HelloWorld   declare声明
        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        channel.queueDeclare("pubsub-queue1",true,false,false,null);
        // 指定当前消费者,一次消费多少个消息
//        channel.basicQos(6);
        //4. 开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("消费者1号接收到消息:"+new String(body,"utf-8"));

                //2. 手动ack
                //参数一: 代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,deliveryTag 的范围仅限于 Channel
                //参数二:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,将一次性ack所有小于deliveryTag的消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //参数1:queue - 指定消费哪个队列
        //参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
        //参数3:consumer - 指定消费回调
        channel.basicConsume("pubsub-queue1",false,consumer);

        System.in.read();
        channel.close();
        connetion.close();
    }
}
public class consume2 {

    @Test
    public void Consume() throws IOException, TimeoutException {

        Connection connetion = MQConnection.getConnetion();

        final Channel channel = connetion.createChannel();

        //3. 声明队列-HelloWorld   declare声明
        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        channel.queueDeclare("pubsub-queue2",true,false,false,null);

        //4. 开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("消费者2号接收到消息:"+new String(body,"utf-8"));
                //2. 手动ack
                //参数一: 代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,deliveryTag 的范围仅限于 Channel
                //参数二:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,将一次性ack所有小于deliveryTag的消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //参数1:queue - 指定消费哪个队列
        //参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
        //参数3:consumer - 指定消费回调
        channel.basicConsume("pubsub-queue2",false,consumer);

        System.in.read();
        channel.close();
        connetion.close();
    }
}

4.Routing

一个生产者,一个交换机,两个队列,两个消费者

生产者在创建DIRECT类型的exchange后,根据RoutingKey去绑定相应的队列,并且在发送消息时,指定消息的具体RoutingKey即可。

public class publish {
//    生产者在创建DIRECT类型的exchange后,根据RoutingKey去绑定相应的队列,
//    并且在发送消息时,指定消息的具体RoutingKey即可。
    @Test
    public void publish() throws IOException, TimeoutException {
        //1. 获取Connection
        Connection connetion = MQConnection.getConnetion();
        //2. 创建Channel
        Channel channel = connetion.createChannel();
        //3. 创建exchange, routing-queue-error,routing-queue-info,
        //参数1: exchange的名称
        //参数2: 指定exchange的类型
        channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);
        channel.queueBind("routing-queue-error","routing-exchange","ERROR");
        channel.queueBind("routing-queue-info","routing-exchange","INFO");
        //4. 发布消息到exchange,同时指定路由的规则
        String msg="Hello-word!666";

        // 参数1:指定exchange,使用""。
        // 参数2:指定路由的规则,使用具体的队列名称。
        // 参数3:指定传递的消息所携带的properties,使用null。
        // 参数4:指定发布的具体消息,byte[]类型

            channel.basicPublish("routing-exchange","ERROR",null,"ERROR".getBytes());
            channel.basicPublish("routing-exchange","INFO",null,"INFO1".getBytes());
            channel.basicPublish("routing-exchange","INFO",null,"INFO2".getBytes());
            channel.basicPublish("routing-exchange","INFO",null,"INFO3".getBytes());

        // Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
        System.out.println("消息发布成功!加油");
        //4. 释放资源
        channel.close();
        connetion.close();

    }
}
public class consume1 {

    @Test
    public void Consume() throws IOException, TimeoutException {

        Connection connetion = MQConnection.getConnetion();

        final Channel channel = connetion.createChannel();

        //3. 声明队列-HelloWorld   declare声明
        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        channel.queueDeclare("routing-queue-error",true,false,false,null);
        // 指定当前消费者,一次消费多少个消息
//        channel.basicQos(6);
        //4. 开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("消费者1号接收到消息:"+new String(body,"utf-8"));

                //2. 手动ack
                //参数一: 代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,deliveryTag 的范围仅限于 Channel
                //参数二:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,将一次性ack所有小于deliveryTag的消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //参数1:queue - 指定消费哪个队列
        //参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
        //参数3:consumer - 指定消费回调
        channel.basicConsume("routing-queue-error",false,consumer);

        System.in.read();
        channel.close();
        connetion.close();
    }
}
public class consume2 {

    @Test
    public void Consume() throws IOException, TimeoutException {

        Connection connetion = MQConnection.getConnetion();

        final Channel channel = connetion.createChannel();

        //3. 声明队列-HelloWorld   declare声明
        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        channel.queueDeclare("routing-queue-info",true,false,false,null);

        //4. 开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("消费者2号接收到消息:"+new String(body,"utf-8"));
                //2. 手动ack
                //参数一: 代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,deliveryTag 的范围仅限于 Channel
                //参数二:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,将一次性ack所有小于deliveryTag的消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //参数1:queue - 指定消费哪个队列
        //参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
        //参数3:consumer - 指定消费回调
        channel.basicConsume("routing-queue-info",false,consumer);

        System.in.read();
        channel.close();
        connetion.close();
    }
}

5.Topic

一个生产者,一个交换机,两个队列,两个消费者

生产者创建Topic的exchange并且绑定到队列中,这次绑定可以通过*和#关键字,对指定RoutingKey内容,编写时注意格式 xxx.xxx.xxx去编写, * -> 一个xxx,而# -> 代表多个xxx.xxx,在发送消息时,指定具体的RoutingKey到底是什么。

public class publish {
//   生产者创建Topic的exchange并且绑定到队列中,这次绑定可以通过*和#关键字,对指定RoutingKey内容,
//   编写时注意格式 xxx.xxx.xxx去编写, \* -> 一个xxx,而# -> 代表多个xxx.xxx,在发送消息时,指定具体的RoutingKey到底是什么。
    @Test
    public void publish() throws IOException, TimeoutException {
        //1. 获取Connection
        Connection connetion = MQConnection.getConnetion();
        //2. 创建Channel
        Channel channel = connetion.createChannel();
        //3. 创建exchange并指定绑定方式
        //  fast.red.cat
        //  fast.white.dog
        //  slow.yello.dog
       channel.exchangeDeclare("topic-exchange",BuiltinExchangeType.TOPIC);
       channel.queueBind("topic-queue-1","topic-exchange","*.red.*");
       channel.queueBind("topic-queue-2","topic-exchange","fast.#");
       channel.queueBind("topic-queue-2","topic-exchange","*.*.rabbit");


        //4. 发布消息到exchange,同时指定路由的规则
//        String msg="Hello-word!666";

        // 参数1:指定exchange,使用""。
        // 参数2:指定路由的规则,使用具体的队列名称。
        // 参数3:指定传递的消息所携带的properties,使用null。
        // 参数4:指定发布的具体消息,byte[]类型

        channel.basicPublish("topic-exchange","fast.red.monkey",null,"红快猴".getBytes());
        channel.basicPublish("topic-exchange","slow.black.dog",null,"黑漫狗".getBytes());
        channel.basicPublish("topic-exchange","fast.white.cat",null,"快白猫".getBytes());

        // Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
        System.out.println("消息发布成功!加油");
        //4. 释放资源
        channel.close();
        connetion.close();

    }
}
public class consume1 {

    @Test
    public void Consume() throws IOException, TimeoutException {

        Connection connetion = MQConnection.getConnetion();

        final Channel channel = connetion.createChannel();

        //3. 声明队列-HelloWorld   declare声明
        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        channel.queueDeclare("topic-queue-1",true,false,false,null);
        // 指定当前消费者,一次消费多少个消息
//        channel.basicQos(6);
        //4. 开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("消费者1号接收到消息:"+new String(body,"utf-8"));

                //2. 手动ack
                //参数一: 代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,deliveryTag 的范围仅限于 Channel
                //参数二:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,将一次性ack所有小于deliveryTag的消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //参数1:queue - 指定消费哪个队列
        //参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
        //参数3:consumer - 指定消费回调
        channel.basicConsume("topic-queue-1",false,consumer);

        System.in.read();
        channel.close();
        connetion.close();
    }
}
public class consume2 {

    @Test
    public void Consume() throws IOException, TimeoutException {

        Connection connetion = MQConnection.getConnetion();

        final Channel channel = connetion.createChannel();

        //3. 声明队列-HelloWorld   declare声明
        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化(true)
        //参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
        //参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
        //参数5:arguments - 指定当前队列的其他信息

        channel.queueDeclare("topic-queue-2",true,false,false,null);

        //4. 开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("消费者2号接收到消息:"+new String(body,"utf-8"));
                //2. 手动ack
                //参数一: 代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,deliveryTag 的范围仅限于 Channel
                //参数二:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,将一次性ack所有小于deliveryTag的消息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        //参数1:queue - 指定消费哪个队列
        //参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
        //参数3:consumer - 指定消费回调
        channel.basicConsume("topic-queue-2",false,consumer);
        
        System.in.read();
        channel.close();
        connetion.close();
    }
}

 

 

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qwq1518346864/article/details/117879931

智能推荐

移动边缘计算MEC学习笔记_移动设备和边缘设备的区别-程序员宅基地

文章浏览阅读6.4k次,点赞12次,收藏101次。移动边缘计算是指在移动网络边缘部署计算和存储资源,为移动网络提供 IT 服务环境和云计算能力,从而为用户提供超低时延和高带宽的网络服务解决方案。计算卸载是指终端设备将部分或全部计算任务交给云计算环境处理的技术,以解决移动设备在资源存储、计算性能以及能效等方面存在的不足。..._移动设备和边缘设备的区别

Verilog 对assign和always的一点理解-程序员宅基地

文章浏览阅读1.3k次。assign 用于描述组合逻辑always@(敏感事件列表) 用于描述时序逻辑敏感事件 上升沿 posedge,下降沿 negedge,或电平敏感事件列表中可以包含多个敏感事件,但不可以同时包括电平敏感事件和边沿敏感事件,也不可以同时包括同一个信号的上升沿和下降沿,这两个事件可以合并为一个电平敏感事件。在新的verilog2001中“,”和“or”都可以用来分割敏感事..._assign能描述时序逻辑吗

Javascript 获取数组长度, 对象成员个数, 字符串数_js对象元素个数-程序员宅基地

文章浏览阅读8.7k次。1.应用场景获取数组长度, 对象成员个数, 字符串数. 2.学习/操作 TBD 3.问题/补充 TBD 4.参考https://www.cnblogs.com/sunnywindycloudy/p/7382226.html //js获取数组长度,对象成员个数、字符串数 后续补充......_js对象元素个数

Go ElasticSearch 游标查询Scroll_golang es 滚动查询-程序员宅基地

文章浏览阅读1.8k次。scroll 查询 可以用来对 Elasticsearch 有效地执行大批量的文档查询。游标查询会取某个时间点的快照数据。 查询初始化之后索引上的任何变化会被它忽略。 它通过保存旧的数据文件来实现这个特性,结果就像保留初始化时的索引视图一样。Go olivere elastic基础:golang中使用elasticsearch之olivere elastic汇总环境Go mod包:github.com/olivere/elastic/v7Elasticsearch版本:v7游标查询do, e_golang es 滚动查询

Spring的注解 @Bean用法-程序员宅基地

文章浏览阅读2.7w次,点赞21次,收藏65次。随着SpringBoot的流行,基于注解式开发的热潮逐渐覆盖了基于XML纯配置的开发,而作为Spring中最核心的bean当然也能够使用注解的方式进行表示。所以本篇就来详细的讨论一下作为Spring中的Bean到底都有哪些用法。@Bean 基础声明Spring的@Bean注解用于告诉方法,产生一个Bean对象,然后这个Bean对象交给Spring管理。产生这个Bean对象的方法Spring只会调用一次,随后这个Spring将会将这个Bean对象放在自己的IOC容器中。S_@bean

用python创建窗口_win32gui_struct.packdev_broadcast_deviceinterface-程序员宅基地

文章浏览阅读2.5k次,点赞3次,收藏2次。利用win32库可以做到这点,直接上代码import win32api, win32con, win32gui import win32gui_struct import ctypes from ctypes import * GUID_DEVINTERFACE_USB_DEVICE = "{A5DCBF10-6530-11D2-901F-00C04FB951ED}" class..._win32gui_struct.packdev_broadcast_deviceinterface

随便推点

bzoj3924: [Zjoi2015]幻想乡战略游戏(动态点分治 + 欧拉序ST表求LCA)-程序员宅基地

文章浏览阅读181次。题目:题解:大坑啊,来自巨佬的题解 https://www.cnblogs.com/bztMinamoto/p/9489473.html题意转化后就是求带权的重心,考虑暴力的做法:设原树的树根 rtrtrt,sumrtsum_{rt}sumrt​为 rt的所有子树的点权的和,disrtdis_{rt}disrt​为rt子树的点到 rtrtrt 的距离和(题目就是要求disdisdis最小的那...

c语言计算两个大数相加,C语言计算大数相加的方法-程序员宅基地

文章浏览阅读2k次,点赞3次,收藏5次。本文实例为大家分享了C语言计算大数相加的具体代码,供大家参考,具体内容如下问题描述输入两个整数a和b,输出这两个整数的和。a和b都不超过100位。算法描述由于a和b都比较大,所以不能直接使用语言中的标准数据类型来存储。对于这种问题,一般使用数组来处理。定义一个数组A,A[0]用于存储a的个位,A[1]用于存储a的十位,依此类推。同样可以用一个数组B来存储b。计算c=a+b的时候,首先将A[..._c语言两个很大的数相加

ic启动器我的世界_我的世界HMCL启动器-程序员宅基地

文章浏览阅读958次。软件介绍我的世界HMCL启动器的最新版本,自定义您的游戏客户端玩家可以自己安装mod,修改功能,操作起来就会更加的方便,影响游戏的启动速度,因此我的世界HMCL启动器非常适合您,快来下载吧。我的世界HMCL启动器介绍我的世界HMCL启动器是为我的世界(Minecraft)玩家们精心准备的一款游戏启动器,能够快速启动游戏客户端,帮助玩家获得更好的游戏体验。HMCL启动器组最新版包含了Minecraf..._authlib-injector报错

SpringBoot2 | SpringBoot启动流程源码分析(二)-程序员宅基地

文章浏览阅读120次。版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。本文链接:https://blog.csdn.net/woshilijiuyi/article/details/82350057微信公众号:吉姆餐厅ak学习更多源码知识,欢迎关注。SpringBoot2 | SpringBoot启动流程源码分析(一)SpringBoot2 | S...

Python异常模块与包-程序员宅基地

文章浏览阅读1.3k次,点赞44次,收藏19次。模块定义别名import 模块名 as 别名# 功能定义别名from 模块名 import 功能 as 别名注意:模块的导入一般都在开头的地方写好,再到下面写功能代码Python自定义模块是指用户自己编写的一个Python脚本文件,其中包含了一些函数、类或变量等,可以供其他Python程序使用。创建一个以.py为后缀的Python脚本文件,例如module.py。在脚本文件中定义函数、类或变量等。在其他Python程序中,通过import语句导入自定义模块,并使用其中的函数、类或变量等。

04-GeoServer WFS服务发布及参数设置_geoserver发布wfs-程序员宅基地

文章浏览阅读4.2k次,点赞2次,收藏12次。Web Feature Service服务(WFS)1. WFS服务简介1.1 WFS服务基础知识 WFS服务:Web Feature Service,网络要素服务,是指用户在分布式的环境下通过HTTP对地理要素进行插入,更新,删除,检索和发现服务。该服务根据HTTP客户请求返回要素级的GML(Geography Markup Language、地理标识语言)数据,并提供对要素的增加、修改、删除等事务操作,是对Web地图服务的进一步深入。WFS通过OGC Filter构造查询条件,支..._geoserver发布wfs