Spring AMQP 1.6完整参考指南-第二部分_rabbitconnectionfactorybean setsslpropertieslocati-程序员宅基地

技术标签: RabbitMQ  

非常感谢 http://www.blogjava.net/qbna350816/archive/2016/08/13/431561.html

RabbitMQ技术学习 https://www.itkc8.com 

 

3. 参考

 

这部分参考文档详细描述了组成Sring AMQP的各种组件. main chapter 涵盖了开发AMQP应用程序的核心类. 这部分也包含了有关示例程序的章节.

3.1 使用 Spring AMQP

在本章中,我们将探索接口和类,它们是使用Spring AMQP来开发应用程序的必要组件 .

3.1.1 AMQP 抽象

介绍

 

Spring AMQP 由少数几个模块组成, 每个都以JAR的形式来表现.这些模块是: spring-amqp和spring-rabbit. spring-amqp模块包含org.springframework.amqp.core 包. 
在那个包中,你会找到表示AMQP核心模块的类. 我们的目的是提供通用的抽象,不依赖于任何特定中间件的实现或AMQP客户端库。
最终用户代码将更具有移植性,以便跨供应商实现,因为它们可以对抽象层开发。这些抽象是使用broker的特定模块实现的,如spring-rabbit。
目前只有一个RabbitMQ实现;而对于抽象的验证,除了RabbitMQ外,也已经在.Net平台上使用Apache Qpid得到了验证。
由于AMQP原则上工作于协议层次,RabbitMQ客户端可以在任何支持相同的协议版本的broker中使用,但目前我们没有测试其它任何broker。

这里的概述假设你已经熟悉了AMQP规范.如果你还没有,那么你可以查看第5章,其它资源中列举的资源.
 

Message

 

AMQP 0-8 和0-9-1 规范没有定义一个消息类或接口.相反,当执行basicPublish()这样的操作的时候, 内容是以字节数组参数进行传递的,其它额外属性也是以单独参数进行传递的. Spring AMQP定义了一个 Message 类来作为AMQP领域模型表示的一部分.Message类的目的是在单个实例中简化封装消息体(body)和属性(header),这样API就可以变得更简单. Message类的定义相当简单.
 

public class Message {

    private final MessageProperties messageProperties;

    private final byte[] body;

    public Message(byte[] body, MessageProperties messageProperties) {
        this.body = body;
        this.messageProperties = messageProperties;
    }

    public byte[] getBody() {
        returnthis.body;
    }

    public MessageProperties getMessageProperties() {
        returnthis.messageProperties;
    }
}

MessageProperties 接口定义了多个共同属性,如messageId, timestamp, contentType 等等. 那些属性可以通过调用setHeader(String key, Object value) 方法使用用户定义头(user-defined headers)来扩展.

 

Exchange

Exchange 接口代表的是AMQP Exchange,它是生产者发送消息的地方.broker虚拟主机中的交换器名称都是唯一的,同时还有少量的其它属性:

public interface Exchange {

    String getName();

    String getExchangeType();

    boolean isDurable();

    boolean isAutoDelete();

    Map<String, Object> getArguments();

}

正如你所看到的, Exchange还有一个type (它是在ExchangeTypes中定义的常量). 基本类型是: DirectTopicFanout,和Headers.
在核心包中,你可以找到每种类型的Exchange 接口实现.这些交换器类型会在处理队列绑定时,行为有所不同.
例如,Direct交换器允许队列以固定路由键进行绑定(通常是队列的名称).
Topic交换器支持使用路由正则表达式(*通配符明确匹配一个,而#通配符可匹配0个或多个). 

Fanout交换器会把消息发布到所有绑定到它上面的队列而不考虑任何路由键.

关于交换器类型的更多信息,查看Chapter 5, Other Resources.

AMQP规范还要求任何broker必须提供一个默认的无名字的(空字符串)Direct交换器.所有声明的队列都可以用它们的名字作为路由键绑定到默认交换器中. 在Section 3.1.4, “AmqpTemplate”你会了解到更多关于在Sring AMQP中使用默认交换器的使用情况.
 

Queue

Queue 代表的是消费者接收消息的组件. 像各种各样的 Exchange 类,我们的实现目标是作为核心AMQP类型的抽象表示.

public class Queue  {

    private final String name;

    private volatile boolean durable;

    private volatile boolean exclusive;

    private volatile boolean autoDelete;

    private volatile Map<String, Object> arguments;

    /**
     * The queue is durable, non-exclusive and non auto-delete.
     *
     * @param name the name of the queue.
     */
 public Queue(String name) {
        this(name, true, false, false);
    }

    // Getters and Setters omitted for brevity

}

注意,构造器需要接受队列名称作为参数.根据实现, admin template可能会提供生成独特队列名称的方法.这些队列作为回复地址或用于临时情景是非常有用的.
基于这种原因,自动生成队列的exclusive和 autoDelete 属性都应该设置为true.

参考 Section 3.1.10, “Configuring the broker” 来了解关于使用命名空间来声明队列,包括队列参数的详细情况.

Binding

生产者发送消息到Exchange,而消费者将从Queue中获取消息,连接Queues与Exchanges之间的绑定对于通过消息来连接生产者和消费者是非常关键的.
在Spring AMQP中,我们定义了一个 Binding 类来表示这些连接. 让我们重新回顾一下绑定队列和交换器的操作.

你可以使用固定的路由键来绑定 Queue 到 DirectExchange上.

new Binding(someQueue, someDirectExchange, "foo.bar")

你可以使用路由正则表达式来绑定Queue到TopicExchange上.

new Binding(someQueue, someTopicExchange, "foo.*")

你可以不使用路由键来绑定Queue到FanoutExchange上.

new Binding(someQueue, someFanoutExchange)

我们还提供了BindingBuilder来方便操作.

Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");

上面展示的BindingBuilder 类很清晰,但如果为bind()方法使用静态导入,这种形式将工作得更好.

本身来说,Binding类的实例只能一个connection中持有数据.换句话说,它不是一个活力(active)组件.
但正如在后面Section 3.1.10, “Configuring the broker”看到的, Binding实例可由AmqpAdmin 类来触发broker上的绑定操作.
同样,在同一个章节中,你还会看到Binding实例可在@Configuration类中使用Spring @Bean风格来定义
还有方便的基类来简化生成AMQP相关bean定义和识别队列,交换器,绑定的方法,这样当AMQP broker运行程序启动时,就可以得到声明.

AmqpTemplate 也在核心包中定义.作为涉及AMQP消息的主要组件, 会在它自己的章节中进行详细介绍(参考Section 3.1.4, “AmqpTemplate”).

3.1.2 连接和资源管理

介绍

虽然我们在前面章节中描述的AMQP模型是通用的,适用于所有实现,但当我们说到资源管理时,其细节是针对特定broker实现的.因此,在这个章节中,我们只关注我们的"spring-rabbit"模块,因为到现在为止,RabbitMQ是唯一支持的实现.

RabbitMQ broker中用于管理连接的中心组件是ConnectionFactory 接口. ConnectionFactory实现的责任是提供一个org.springframework.amqp.rabbit.connection.Connection 的实例,它包装了com.rabbitmq.client.Connection
我们提供的唯一具体实现提CachingConnectionFactory,默认情况下,会建立应用程序可共享的单个连接代理.连接共享是可行的,因为在AMQP处理消息的工作单位实际是 "channel" (在某些方面,这类似于JMS中Connection 和 Sessionin的关系).
你可以想象,连接实例提供了一个createChannel方法。CachingConnectionFactory 实现支持这些channels的缓存,它会基于它们是否是事务的来单独维护其缓存. 
当创建CachingConnectionFactory的实例时, hostname 可通过构造器来提供,username 和password 属性也可以提供.如果你想配置channel缓存的大小(默认是25),你可以调用setChannelCacheSize()方法.

从1.3版本开始,CachingConnectionFactory 也可以同channel一样,配置缓存连接.在这种情况下每次调用createConnection() 都会创建一个新连接(或者从缓存中获取空闲的连接).
关闭连接会将其返回到缓存中(如果还没有达到缓存大小的话).在这些连接上创建的Channels同样也会被缓存. 单独连接的使用在某些环境中是有用的,如从HA 集群中消费, 连接负载均衡器,连接不同的集群成员.设置cacheMode 为 CacheMode.CONNECTION.

这不会限制连接的数目,它用于指定允许空闲打开连接的数目.

从1.5.5版本开始,提供了一个新属性connectionLimit.当设置了此属性时,它会限制连接的总数目,当达到限制值时,将channelCheckoutTimeLimit 来等待空闲连接.如果时间超时了,将抛出AmqpTimeoutException.

重要

当缓存模式是CONNECTION时, 队列的自动声明等等 (参考 the section called “Automatic Declaration of Exchanges, Queues and Bindings”) 将不再支持.

 

此外,在写作的时候,rabbitmq-client 包默认为每个连接(5个线程)创建了一个固定的线程池. 当需要使用大量连接时,你应该考虑在CachingConnectionFactory定制一个executor. 然后,同一个executor会用于所有连接,其线程也是共享的.  
executor的线程池是没有界限的或按预期使用率来设置(通常, 一个连接至少应该有一个线程).如果在每个连接上创建了多个channels,那么池的大小会影响并发性,因此一个可变的线程池executor应该是最合适的.

 

理解缓存大小不是限制是很重要的, 它仅仅是可以缓存的通道数量.当说缓存大小是10时,在实际使用中,其实可以是任何数目的通道. 如果超过10个通道被使用,他们都返回到高速缓存,10个将在高速缓存中,其余的将物理关闭。

从1.6版本开始,默认通道缓存大小从1增加到了25. 在高容量,多线程环境中,较小的缓存意味着通道的创建和关闭将以很高的速率运行.加大默认缓存大小可避免这种开销.
你可以监控通道的使用情况(通过RabbitMQ Admin UI) ,如果看到有很多通道在创建和关闭,你可以增大缓存大小.缓存只会增长按需(以适应应用程序的并发性要求),所以这个更改不会影响现有的低容量应用程序。

从1.4.2版本开始,CachingConnectionFactory 有一个channelCheckoutTimeout属性. 当此属性的值大于0时,channelCacheSize 会变成连接上创建通道数目的限制.
如果达到了限制,调用线程将会阻塞,直到某个通道可用或者超时, 在后者的情况中,将抛出AmqpTimeoutException 异常.

在框架(如.RabbitTemplate)中使用的通道将会可靠地返回到缓存中.如果在框架外创建了通道 (如.直接访问connection(s)并调用createChannel()),你必须可靠地返回它们(通过关闭),也许需要在 finally 块中以防止耗尽通道.

CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

Connection connection = connectionFactory.createConnection();

当使用XML时,配置看起来像下面这样:

<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username"value="guest"/>
<property name="password" value="guest"/>
</bean>

这里也有一个 SingleConnectionFactory 实现,它只能用于框架的单元测试代码中.它比CachingConnectionFactory 简单,因为它不会缓存通道,由于其缺乏性能和韧性,它不打算用于简单的测试以外的实际使用
如果基于某些原
因,你需要自己来实现ConnectionFactory ,AbstractConnectionFactory 基类提供了一个非常好的起点.

ConnectionFactory 可使用rabbit命名空间来快速方便的建立:

<rabbit:connection-factory id="connectionFactory"/>

在多数情况下,这是很好的,因为框架会为你选择最好的默认值.创建的实例会是CachingConnectionFactory.要记住,默认的缓存大小是25.如果你想缓存更多的通道,你可以设置channelCacheSize 属性值.在XML中,它看起来像下面这样:

<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>

在命名空间中,你也可以添加channel-cache-size 属性:

<rabbit:connection-factory id="connectionFactory" channel-cache-size="50"/>

默认的缓存模式是CHANNEL, 但你可以使用缓存连接来替换;在这种情况下,我们会使用connection-cache-size:

<rabbit:connection-factory id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>

Host 和 port 属性也可以在命名空间中提供:

<rabbit:connection-factory id="connectionFactory" host="somehost" port="5672"/>

此外,如果运行集群环境中,使用addresses属性.

<rabbit:connection-factory id="connectionFactory" addresses="host1:5672,host2:5672"/>

下面是一个自定义的线程工厂,其前辍线程名称为rabbitmq-.

<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567" thread-factory="tf" channel-cache-size="10" username="user" password="password" />
<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
<constructor-arg value="rabbitmq-" />
</bean>

配置底层客户端连接工厂

CachingConnectionFactory 使用的是 Rabbit client ConnectionFactory的实例; 当在CachingConnectionFactory设置等价属性时,许多属性(host, port, userName, password, requestedHeartBeat, connectionTimeout) 来传递. 
要设置其它属性(例如clientProperties),可定义一个rabbit factory 的实例,并使用CachingConnectionFactory的适当构造器来提供引用
当使用上面提到的命名空间时,要在connection-factory属性中提供一个工厂的引用来配置. 为方便起见,提供了一个工厂,以协助在一个Spring应用程序上下文中配置连接工厂,在下一节讨论。

<rabbit:connection-factory id="connectionFactory" connection-factory="rabbitConnectionFactory"/>

RabbitConnectionFactoryBean 和配置SSL

从1.4版本开始, 提供了一个便利的RabbitConnectionFactoryBean 类通过依赖注入来配置底层客户端连接工厂的SSL属性.其它设置简单地委派给底层工厂.以前你必须以编程方式配置SSL选项。

<rabbit:connection-factory id="rabbitConnectionFactory"connection-factory="clientConnectionFactory" host="${host}" port="${port}" virtual-host="${vhost}" username="${username}" password="${password}" />
<bean id="clientConnectionFactory" class="org.springframework.xd.dirt.integration.rabbit.RabbitConnectionFactoryBean">
<property name="useSSL" value="true" />
<property name="sslPropertiesLocation" value="file:/secrets/rabbitSSL.properties"/>
</bean>

参考 RabbitMQ Documentation 来了解关于配置SSL的更多信息. 省略的keyStore 和 trustStore 配置将在无证书验证的情况下,通过SSL来连接. Key和trust store 配置可以按如下提供:

sslPropertiesLocation 属性是一个Spring Resource ,它指向一个包含下面key的属性文件:

keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret

keyStore 的 truststore 是指向store的 Spring Resources .通常情况下,这个属性文件在操作系统之下安全的,应用程序只能读取访问.

从Spring AMQP 1.5版本开始,这些属性可直接在工厂bean上设置.如果同时提供了discrete和 sslPropertiesLocation 属性, 后者属性值会覆盖discrete值.

路由连接工厂

从1.3版本开始,引入了AbstractRoutingConnectionFactory.这提供了一种机制来配置多个ConnectionFactories的映射,并通过在运行时使用lookupKey来决定目标ConnectionFactory

通常,实现会检查线程绑定上下文. 为了方便, Spring AMQP提供了SimpleRoutingConnectionFactory, 它会从SimpleResourceHolder中获取当前线程绑定的lookupKey:

<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
<propertyname="targetConnectionFactories">
<map>
<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/> 
<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
</map>
</property>
</bean>
<rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService {

	@Autowired
  private RabbitTemplate rabbitTemplate;

	public  void service(String vHost, String payload) {
		SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
		rabbitTemplate.convertAndSend(payload);
		SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
	}

}

在使用资源后,对其进行解绑是很重要的.更多信息参考AbstractRoutingConnectionFactory的JavaDocs.

从1.4版本开始, RabbitTemplate 支持SpEL sendConnectionFactorySelectorExpression 和receiveConnectionFactorySelectorExpression 属性, 
它会在每个AMQP 协议交互操作(sendsendAndReceivereceiveor receiveAndReply)进行评估, 为提供的AbstractRoutingConnectionFactory类解析lookupKey值
Bean 引用,如"@vHostResolver.getVHost(#root)" 可用于表达式中.对于send 操作,  要发送的消息是根评估对象;对于receive操作, queueName 是根评估对象.

路由算法为:如果selector 表达式为null,或等价于null,或提供的ConnectionFactory 不是AbstractRoutingConnectionFactory的实例,根据提供的ConnectionFactory 实现,
所有的工作都按之前的进行.同样的结果也会发生:如果评估结果不为null,但对于lookupKey 无目标ConnectionFactory,且 the AbstractRoutingConnectionFactory 使用lenientFallback = true进行了配置
当然,在AbstractRoutingConnectionFactory 的情况下,它会基于determineCurrentLookupKey()的路由实现来进行回退. 但,如果lenientFallback = false, 将会抛出 IllegalStateException 异常.

Namespace 在<rabbit:template>组件中也支持send-connection-factory-selector-expression 和receive-connection-factory-selector-expression属性.

也是从1.4版本开始, 你可以在SimpleMessageListenerContainer配置路由连接工厂. 在那种情况下,队列名称的列表将作为lookup key.例如,如果你在容器中配置setQueueNames("foo", "bar"),lookup key将是"[foo,bar]" (无空格).

RabbitMQ技术学习 https://www.itkc8.com 

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/HUXU981598436/article/details/78469461

智能推荐

Tomcat 启动报错:javax.naming.NamingException: No naming context bound to this class loader-程序员宅基地

文章浏览阅读854次。分析原因:在类中使用了Log .只是在项目lib路径下添加了slf4 的jar包,在Tomcat\lib下未添加 解决方案:将slf4的jar包放到tomcat\lib下。 这个是别人遇到的问题的解决办法,我们的问题的解决办法是tomcat的lib下面的jar多余,删除这个jar可以解决..._javax.naming.namingexception

OMV(Openmediavault)修改静态IP的方法_omv修改ip地址-程序员宅基地

文章浏览阅读1.1w次。安装完Openmediavault后,第一次启动发现IP地址有问题,需要修改。这个系统基于Debian,在网上找了许多方法,反复不能成功,后来发现是由于该系统对eth0网卡的命名不同造成。查出实体机网卡的名称# dmesg | grep -in eth接下来出现信息中含有enp0s25: renamed from eth0这个enp0s25就是实体机网卡在OMV系统配置IP时用到的名字,而不是eth0备份原来的interfaces文件# mv /etc/network/interfa_omv修改ip地址

验收测试的名词解释_软件测试名词解释-程序员宅基地

文章浏览阅读905次。软件质量与软件产品满足明确或隐藏需求的能力有关的特征或特性的总和。软件测试使用人工和自动手段来运行或测试某个系统的过程,其目的在于检验它是否满足规定的需求或弄清楚预期结果与实际结果之间的差别。验收测试是软件产品完成了功能测试和系统测试之后,在产品发布之前所进行的软件测试活动。失败测试纯粹为了破坏软件而设计和执行的测试案例,被称为失败测试。边界测试是指使用预定定义的边界值,如最大值、最小值、空值或其..._对验收测试解释正确的是

优秀程序员必须知道的32个算法,提高你的开发效率_优秀程序员32个算法-程序员宅基地

文章浏览阅读587次。最近看到一篇比较好的文章,在此分享给网友点击打开链接_优秀程序员32个算法

vs2019C#引用其他项目成功但无法引用类_vs2019添加引用没有引用类-程序员宅基地

文章浏览阅读1.3k次。可能是因为你将被引用的那个类写在了另一个类之中。像下面这样在另一个项目中就无法调用Person类。我就粗心犯过这个错误,找半天没找到原因。_vs2019添加引用没有引用类

springboot集成freemarker模板引擎_applicationcontext 写 ftl-程序员宅基地

文章浏览阅读266次。添加依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId></dependency><dependency> <group..._applicationcontext 写 ftl

随便推点

从零开始学习CANoe(三)—— 系统变量的创建和使用_canoe系统变量不生效-程序员宅基地

文章浏览阅读1.3w次,点赞15次,收藏59次。从零开始学习CANoe(三)—— 系统变量的创建和使用_canoe系统变量不生效

申请Android API Key时keytool出错,java.lang.exception:keystore不存在_java.lang.exception:keystore not found-程序员宅基地

文章浏览阅读1.3w次。1.首先要得到你的debug keystore位置:打开Eclipse--->Windows--->Preferences--->Android--->Build 查看默认的debug keystore位置,例如:C:\Documents and Settings\(你电脑的名字)\.android\debug.keystore把这个文件拷贝到C 盘或者其他盘符下面(最简单的办法直接拷_java.lang.exception:keystore not found

【竞赛|数学建模】Part 1:什么是数学建模和各模块介绍_数学建模论文写作的模块分类-程序员宅基地

竞赛|数学建模的第一部分主要介绍了什么是数学建模以及各模块的简要介绍。数学建模是一种将实际问题抽象为数学模型,并通过数学方法进行分析和解决的过程。需要具备的包括分类理解、避免误区等。各模块简要介绍包括数学建模的步骤和过程、论文格式、准备工作以及团队协作。

求周期方波信号的傅里叶级数_1-1 求周期方波(见图1-4)的傅里叶级数复指数函数形 … -...-程序员宅基地

文章浏览阅读1.8k次。1-1 求周期方波(见图1-4)的傅里叶级数(复指数函数形式),划出|cn|–ω和φn–ω图,并与表1-1对比。x(t) A … ?T0 20 -A T0 2T0 … t ?T0 图1-4 周期方波信号波形图解答:在一个周期的表达式为T0??A (??t?0)??2 x(t)??? A (0?t?T0)??2积分区间取(-T/2,T/2)T02T?02T0201cn?T0 =..._求指数函数x(x)=ae(a>0,≥0)的频谱

全套毕设-基于springboot的房产销售交易平台房屋(mysql)-JAVA.VUE【论文、源码、开题报告】-程序员宅基地

文章浏览阅读39次。随着科学技术的飞速发展,各行各业都在努力与现代先进技术接轨,通过科技手段提高自身的优势;对于房产销售系统当然也不能排除在外,随着网络技术的不断成熟,带动了房产销售系统,它彻底改变了过去传统的管理方式,不仅使服务管理难度变低了,还提升了管理的灵活性。这种个性化的平台特别注重交互协调与管理的相互配合,激发了管理人员的创造性与主动性,对房产销售系统而言非常有利。本系统采用的数据库是Mysql,使用SpringBoot框架开发,运行环境使用Tomcat服务器,ECLIPSE 是本系统的开发平台。

cosnt修饰指针变量-程序员宅基地

文章浏览阅读94次。

推荐文章

热门文章

相关标签