ITKeyword - 技术文章推荐分享

首页 > (七) Thunder分布式RPC框架

(七) Thunder分布式RPC框架

相关推荐:(一) Thunder分布式RPC框架

Thunder(QQ 群 471164539)发布在淘宝代码基地 http://code.taobao.org/p/Thunder/1. 概要1.1 Thunder是基于Netty + Hessian + Kafka + ActiveMQ + Tibco + Zookeeper(Curator Framework) + Redis + FST + Spring + Spring Web MVC分布式RPC调用框

Thunder(QQ 群 471164539)发布在淘宝代码基地 http://code.taobao.org/p/Thunder/

1. JMS消息队列模型主要是用于ActiveMQ & Tibco等基于JMS标准的MQ。结构图如下: 点击查看大图工作原理 1)Spring扫描线程扫描到一个Service节点后,就会去新建一个MQContext对象,放入ServiceContextMap缓存中(如果存在则不新建,去缓存中拿),进而创建queueAsyncResponseDestination,queueSyncResponseDestination,topicAsyncResponseDestination,放入相关DestinationMap缓存中(如果存在则不新建,去缓存中拿),然后根据这些Destination建立MessagerListener消息监听对象,实现对Response Queue/Topic的监听。如果有消息被监听到,从消息里获取ProtocolRequest对象,执行ServerHandlerAdapter.handle(request, response)。机制跟Netty等一样,不阐述了。 2)Spring扫描线程扫描到一个Reference节点后,就会去新建一个MQContext对象,放入ReferenceContextMap缓存中(如果存在则不新建,去缓存中拿),进而创建 queueAsyncRequestDestination,queueSyncRequestDestination,放入相关DestinationMap缓存中(如果存在则不新建,去缓存中拿),然后根据这些Destination建立MessagerListener消息监听对象,实现对Request Queue的监听。如果有消息被监听到,从消息里获取ProtocolResponse对象,执行ClientHandlerAdapter.handle(response)。机制跟Netty等一样,不阐述了。 3)当调用端通过Spring Aop进行同步/异步远程调用时,先从缓存获取相关的DestinationMap中获取Destination对象,把ProtocolRequest请求通过MQProducer.produce发送到MQ服务器的Response Queue里,服务端监听到消息后处理,把结果封装到ProtocolResponse通过MQProducer.produce发送到Request Queue,调用端监听到消息后处理,如果是异步调用Callback方式完成调用,如果是同步通过CyclicBarrier的线程等待返回值,最后完成调用 4)当调用端通过Spring Aop进行广播远程调用时,把ProtocolRequest发布到Response Topic,服务端订阅监听该ProtocolRequest后,进行处理,不返回结果 5)Queue和Topic Destination名称是 destinationType + "-" + group + "-" + application + "-" + interfaze,防止不同类型的应用,重名接口接入到同一个MQ服务器

例如:request-queue-async-POA_EA_INF-APP_IOS-com.nepxion.thunder.test.service.UserService 6)为服务/调用的接口指定不同的MQ服务器,前提是配置文件中必须出现两个以上的MQ服务器配置,如果只有一个,可以不指定,默认取配置的那个MQ服务器 7)为MQ指定Connection或Session的缓存方式(SingleConnectionFactory,CachingConnectionFactory,PooledConnectionFactory),这个必须在配置文件中配置 8)为MQ指定两种不同的初始化方式(JNDI和非JNDI) 点击查看大图类结构1)MQTemplate.java - 消息发送模板类,继承JmsTemplate.java,覆盖doSend方法,把JmsMessage转换为ProtocolMessage,设定DeliveryMode,TimeToLive等属性值2)MQProducer.java - 消息生产者类,通过线程池发布消息到MQ服务器指定Queue或者Topic,同时指定消息消费者消费完消息后,发送响应消息的指定Queue。调用端发送请求消息到Response Queue/Topic,实现对Request Queue监听,服务端发送响应消息到Request Queue,实现对Response Queue/Topic的监听3)MQConsumer - 消息消费类,实现SessionAwareMessageListener.java4)MQHierachy.java - MQ层次类,实现创建不同类型的ConnectionFactory,MQTemplate,MQProducer等核心对象,以及DefaultMessageListenerContainer的优化等,包括消息过滤,消费并发数,接收超时等5)MQConnectionHierachy.java - 继承MQHierachy.java,实现非JNDI连接方式的初始化MQ连接6)MQJndiHierachy.java - 继承MQHierachy.java,实现JNDI连接方式的初始化MQ连接7)MQExecutorDelegate.java - MQExecutor的代理接口8)MQServerExecutor.java - 继承AbstractServerExecutor.java,实现MQExecutorDelegate.java,初始化MQContext,初始化跟Response Queue/Topic相关的事情9)MQServerHandler.java - 继承MQConsumer.java的onMessage方法,实现服务端对消息监听10)MQClientExecutor.java - 继承AbstractClientExecutor.java,实现MQExecutorDelegate.java,初始化MQContext,初始化跟Request Queue和Response Queue/Topic相关的事情11)MQClientHandler.java - 继承MQConsumer.java的onMessage方法,实现调用端对消息监听12)MQClientInterceptor.java - 继承AbstractClientInterceptor.java,实现如下调用方式:

异步调用:从缓存获取要调用的queueResponseDestination和queueRequestDestination,通过MQProducer.produce方法发送异步请求ProtocolRequest。通过监听获取异步返回

同步调用:同异步调用,采用返回值返回

广播调用:从缓存获取要调用的topicResponseDestination和topicRequestDestination,通过MQProducer.produce方法发送广播请求ProtocolRequest。不返回

重复调用:不支持13)MQContext.java - 实现初始化连接(MQHierachy),初始化消息队列(Queue/Topic),初始化消息监听(MQServerHandler/MQClientHandler),初始化重连机制 14)MQCacheContainer.java - 缓存容器15)MQQueueDestinationContainer.java - Queue Destination缓存容器16)MQTopicDestinationContainer.java - Topic Destination缓存容器17)MQDestinationUtil.java - Destination工具类18)MQSelectorUtil.java - Selector工具类19)MQBytesMessageConverter - 二进制和Java序列化对象互转的适配类

相关推荐:(九) Thunder分布式RPC框架

Thunder(QQ 群 471164539)发布在淘宝代码基地 http://code.taobao.org/p/Thunder/1. 介绍治理中心是基于Nepxion Swing Repository组件,Java Desktop版的服务治理系统,计划用基于Ebay Jetstream框架做个Web版。它的主要功能包括1.1 登录需要填入

2. 非JMS消息队列模型主要是用于Kafka。结构图如下: 点击查看大图使用Kafka作为点对点通信,有响应返回的场景(同步返回值,异步带Callback返回),特别要注意,所对应的Topic下分区数一定要大于等于调用端数目 工作原理 1)Spring扫描线程扫描到一个Service节点后,就会去新建一个KafkaMQContext对象,放入ServiceContextMap缓存中(如果存在则不新建,去缓存中拿),进而创建 responseQueueDestinationEntity,responseTopicDestinationEntity,requestQueueDestinationEntity,然后根据这些DestinationEntity建立KafkaMQConsumer消息监听对象,实现对Response Queue/Topic(名称)的监听。如果有消息被监听到,从消息里获取ProtocolRequest对象,执行ServerHandlerAdapter.handle(request, response)。机制跟Netty等一样,不阐述了。 2)Spring扫描线程扫描到一个Reference节点后,就会去新建一个KafkaMQContext对象,放入ReferenceContextMap缓存中(如果存在则不新建,去缓存中拿),进而创建 requestQueueDestinationEntity,然后根据这些DestinationEntity建立KafkaMQConsumer消息监听对象,实现对Request Queue(名称)的监听。如果有消息被监听到,从消息里获取ProtocolResponse对象,执行ClientHandlerAdapter.handle(response)。机制跟Netty等一样,不阐述了。 3)当调用端通过Spring Aop进行同步/异步远程调用时,先通过相关参数获得Topic名称,把ProtocolRequest请求通过KafkaMQProducer.produce发送到Kafka服务器的Response Queue里,服务端监听到消息后处理,把结果封装到ProtocolResponse通过KafkaMQProducer.produce发送到Request Queue,调用端监听到消息后处理,如果是异步调用Callback方式完成调用,如果是同步通过CyclicBarrier的线程等待返回值,最后完成调用 4)当调用端通过Spring Aop进行广播远程调用时,把ProtocolRequest发布到Response Topic,服务端订阅监听该ProtocolRequest后,进行处理,不返回结果 5)Queue和Topic Destination名称是 destinationType + "-" + group + "-" + application + "-" + interfaze,防止不同类型的应用,重名接口接入到同一个MQ服务器

例如:request-queue-async-POA_EA_INF-APP_IOS-com.nepxion.thunder.test.service.UserService 6)为服务/调用的接口指定不同的Kafka服务器,前提是配置文件中必须出现两个以上的Kafka服务器配置,如果只有一个,可以不指定,默认取配置的那个Kafka服务器 7)同步/异步调用和广播调用的区别,是前者消费者必须在相同Group,后者消费者必须在不同Group 8)框架使用Kafka的High Level API,对于消息在分区上的分配,采用DefaultPartitioner的分区策略,见官方解释:

/**

* The default partitioning strategy:

* <ul>

* <li>If a partition is specified in the record, use it

* <li>If no partition is specified but a key is present choose a partition based on a hash of the key

* <li>If no partition or key is present choose a partition in a round-robin fashion

*/

对于Response Queue和Topic(名称),Partition的分区策略通过DefaultPartitioner第三种策略,即不指定Partition和Key,让Kafka做权重轮询放入不同的分区

对于Request Queue(名称),Partition的分区策略通过DefaultPartitioner第二种策略,即不指定Partition,但指定Key(IP:Port),通过Key的Hash值来决定放入哪个分区。这就意味着,一个服务调用端即享用一个独立的分区,这就为端到端的实现创造了条件9)框架使用Google Guava的EventBus,使框架内部事件异步发送到外部。无论调用端和提供端,在每次Produce和Consume成功或者失败,都会发送相关的Event。如果是失败事件,为Retry创造了条件。当然可以通过配置文件来关闭事件发送 点击查看大图类结构1)KafkaMQProducer.java - 消息生产者类,通过线程池发布消息到Kafka服务器指定Queue或者Topic,同时指定消息消费者消费完消息后,发送响应消息的指定Queue。调用端发送请求消息到Response Queue/Topic,实现对Request Queue监听,服务端发送响应消息到Request Queue,实现对Response Queue/Topic的监听2)KafkaMQConsumer.java - 消息消费类3)KafkaMQExecutorDelegate.java - KafkaExecutor的代理接口4)KafkaMQServerExecutor.java - 继承AbstractServerExecutor.java,实现KafkaMQExecutorDelegate.java,初始化KafkaContext,初始化跟Request Queue和Response Queue/Topic相关的事情5)KafkaMQServerHandler.java - 继承KafkaMQConsumer.java,实现服务端对消息监听(Poll)6)KafkaMQClientExecutor.java - 继承AbstractClientExecutor.java,实现KafkaMQExecutorDelegate.java,初始化KafkaContext,初始化跟Response Queue相关的事情7)KafkaMQClientHandler.java - 继承KafkaMQConsumer.java,实现客户端端对消息监听(Poll)8)KafkaMQClientInterceptor.java - 继承AbstractClientInterceptor.java,实现如下调用方式:

异步调用:通过KafkaMQProducer.produce方法发送异步请求ProtocolRequest。通过监听获取异步返回

同步调用:同异步调用,采用返回值返回

广播调用:通过KafkaMQProducer.produce方法发送广播请求ProtocolRequest。不返回

重复调用:支持,通过Google Guava的EventBus实现9)KafkaMQContext.java - 实现初始化消息队列(Queue/Topic),初始化消息监听(KafkaMQServerHandler/KafkaMQClientHandler)10)KafkaMQCacheContainer.java - 缓存容器11)KafkaMQDestinationUtil.java - Destination工具类

相关推荐:(十五) Thunder分布式RPC框架

Thunder(QQ 群 471164539)发布在淘宝代码基地 http://code.taobao.org/p/Thunder/框架支持两种方式的序列化,Java对象和字节数组的序列化和反序列化,Java对象和Json字符串的转换1)binary - Java实体类和字节数组的序列化/反序列化2)compression -

Thunder(QQ 群 471164539)发布在淘宝代码基地 http://code.taobao.org/p/Thunder/???1. JMS消息队列模型主要是用于ActiveMQ & Tibco等基于JMS标准的MQ。结构图如下:?点击查看大图...

------分隔线----------------------------